""" events plugin for Pelican ========================= This plugin looks for and parses an "events" directory and generates blog posts with a user-defined event date. (typically in the future) It also generates an ICalendar v2.0 calendar file. https://en.wikipedia.org/wiki/ICalendar Author: Federico Ceratto <federico.ceratto@gmail.com> Released under AGPLv3+ license, see LICENSE This is a custom Varia version of the event plugin. Email: info@varia.zone """ import logging import os.path from collections import defaultdict, namedtuple from datetime import datetime, timedelta import icalendar import pytz from pelican import signals, utils log = logging.getLogger(__name__) TIME_MULTIPLIERS = { "w": "weeks", "d": "days", "h": "hours", "m": "minutes", "s": "seconds", } events = [] localized_events = defaultdict(list) Event = namedtuple("Event", "dtstart dtend metadata rrule") def parse_tstamp(ev, field_name): """Parse a timestamp string in format "YYYY-MM-DD HH:MM" :returns: datetime """ try: return datetime.strptime(ev[field_name], "%Y-%m-%d %H:%M") except Exception as e: log.error( "Unable to parse the '%s' field in the event named '%s': %s" % (field_name, ev["title"], e) ) raise def parse_timedelta(ev): """Parse a timedelta string in format [<num><multiplier> ]* e.g. 2h 30m :returns: timedelta """ chunks = ev["event_duration"].split() tdargs = {} for c in chunks: try: m = TIME_MULTIPLIERS[c[-1]] val = int(c[:-1]) tdargs[m] = val except KeyError: log.error( """Unknown time multiplier '%s' value in the \ 'event_duration' field in the '%s' event. Supported multipliers \ are: '%s'.""" % (c, ev["title"], " ".join(TIME_MULTIPLIERS)) ) raise RuntimeError("Unknown time multiplier '%s'" % c) except ValueError: log.error( """Unable to parse '%s' value in the 'event_duration' \ field in the '%s' event.""" % (c, ev["title"]) ) raise ValueError("Unable to parse '%s'" % c) return timedelta(**tdargs) def parse_recursion(ev, field_name): """ Parse information about recurring events and return frequency of event and untill time. """ chunks = ev[field_name].split() freq = chunks[0].upper() if "until" in chunks: until = datetime.strptime(chunks[-1], "%Y-%m-%d") rrule = [freq, until] else: rrule = [freq] return rrule def parse_article(generator, metadata): """Collect articles metadata to be used for building the event calendar :returns: None """ if "event_start" not in metadata: return dtstart = parse_tstamp(metadata, "event_start") rrule = [] if "event_end" in metadata: dtend = parse_tstamp(metadata, "event_end") elif "event_duration" in metadata: dtdelta = parse_timedelta(metadata) dtend = dtstart + dtdelta if "event_recurring" in metadata: rrule = parse_recursion(metadata, "event_recurring") else: msg = ( "Either 'event_end' or 'event_duration' must be" + " specified in the event named '%s'" % metadata["title"] ) log.error(msg) raise ValueError(msg) events.append(Event(dtstart, dtend, metadata, rrule)) def generate_ical_file(generator): """Generate an iCalendar file""" from icalendar import vDatetime as vd global events ics_calendars = generator.settings["PLUGIN_EVENTS"]["ics_calendars"] default_loc = generator.settings["PLUGIN_EVENTS"]["default_location"] category_calendar = generator.settings["PLUGIN_EVENTS"]["calendar_per_category"] if not ics_calendars: return if not category_calendar: ics_calendars = ics_calendars[:1] for calendar in ics_calendars: ics_fname = os.path.join(generator.settings["OUTPUT_PATH"], calendar) log.debug("Generating calendar at %s with %d events" % (ics_fname, len(events))) tz = generator.settings.get("TIMEZONE", "UTC") tz = pytz.timezone(tz) ical = icalendar.Calendar() ical.add("prodid", "-//My calendar product//mxm.dk//") ical.add("version", "2.0") DEFAULT_LANG = generator.settings["DEFAULT_LANG"] curr_events = events if not localized_events else localized_events[DEFAULT_LANG] for e in curr_events: if str(e.metadata["category"]) in calendar or not category_calendar: # force convert to ical format here, because otherwise it doesn't happen? dtend = vd(e.dtend).to_ical() dtstart = vd(e.dtstart).to_ical() dtstamp = vd(e.metadata["date"]).to_ical() # print('*'*10) # print(e.metadata['category']) ie = icalendar.Event( summary=e.metadata["title"], dtstart=dtstart, dtend=dtend, dtstamp=dtstamp, priority=5, uid=e.metadata["title"], ) if "event_location" in e.metadata: ie.add("location", e.metadata["event_location"]) elif default_loc: ie.add("location", default_loc) if "event_recurring" in e.metadata: if len(e.rrule) >= 2: ie.add("rrule", {"freq": e.rrule[0], "until": e.rrule[1]}) else: ie.add("rrule", {"freq": e.rrule[0]}) ical.add_component(ie) with open(ics_fname, "wb") as f: f.write(ical.to_ical()) def generate_localized_events(generator): """ Generates localized events dict if i18n_subsites plugin is active """ if "i18n_subsites" in generator.settings["PLUGINS"]: if not os.path.exists(generator.settings["OUTPUT_PATH"]): os.makedirs(generator.settings["OUTPUT_PATH"]) for e in events: if "lang" in e.metadata: localized_events[e.metadata["lang"]].append(e) else: log.debug( "event %s contains no lang attribute" % (e.metadata["title"],) ) def generate_events_list(generator): """Populate the event_list variable to be used in jinja templates""" if not localized_events: generator.context["events_list"] = sorted( events, reverse=True, key=lambda ev: (ev.dtstart, ev.dtend) ) else: generator.context["events_list"] = { k: sorted(v, reverse=True, key=lambda ev: (ev.dtstart, ev.dtend)) for k, v in localized_events.items() } def initialize_events(article_generator): """ Clears the events list before generating articles to properly support plugins with multiple generation passes like i18n_subsites """ del events[:] localized_events.clear() def register(): signals.article_generator_init.connect(initialize_events) signals.article_generator_context.connect(parse_article) signals.article_generator_finalized.connect(generate_localized_events) signals.article_generator_finalized.connect(generate_ical_file) signals.article_generator_finalized.connect(generate_events_list)