varia.website/plugins-custom/events-ics/events.py

246 lines
7.3 KiB
Python
Raw Normal View History

2021-01-19 22:33:28 +01:00
"""
events plugin for Pelican
=========================
This plugin looks for and parses an "events" directory and generates
blog posts with a user-defined event date. (typically in the future)
It also generates an ICalendar v2.0 calendar file.
https://en.wikipedia.org/wiki/ICalendar
Author: Federico Ceratto <federico.ceratto@gmail.com>
Released under AGPLv3+ license, see LICENSE
This is a custom Varia version of the event plugin.
Email: info@varia.zone
"""
import logging
import os.path
2021-01-19 22:36:20 +01:00
from collections import defaultdict, namedtuple
from datetime import datetime, timedelta
import icalendar
2021-01-19 22:33:28 +01:00
import pytz
2021-01-19 22:36:20 +01:00
from pelican import signals, utils
2021-01-19 22:33:28 +01:00
log = logging.getLogger(__name__)
TIME_MULTIPLIERS = {
2021-01-19 22:36:20 +01:00
"w": "weeks",
"d": "days",
"h": "hours",
"m": "minutes",
"s": "seconds",
2021-01-19 22:33:28 +01:00
}
events = []
localized_events = defaultdict(list)
Event = namedtuple("Event", "dtstart dtend metadata rrule")
def parse_tstamp(ev, field_name):
"""Parse a timestamp string in format "YYYY-MM-DD HH:MM"
:returns: datetime
"""
try:
2021-01-19 22:36:20 +01:00
return datetime.strptime(ev[field_name], "%Y-%m-%d %H:%M")
2021-01-19 22:33:28 +01:00
except Exception as e:
2021-01-19 22:36:20 +01:00
log.error(
"Unable to parse the '%s' field in the event named '%s': %s"
% (field_name, ev["title"], e)
)
2021-01-19 22:33:28 +01:00
raise
2021-01-19 22:36:20 +01:00
2021-01-19 22:33:28 +01:00
def parse_timedelta(ev):
"""Parse a timedelta string in format [<num><multiplier> ]*
e.g. 2h 30m
:returns: timedelta
"""
2021-01-19 22:36:20 +01:00
chunks = ev["event_duration"].split()
2021-01-19 22:33:28 +01:00
tdargs = {}
for c in chunks:
try:
m = TIME_MULTIPLIERS[c[-1]]
val = int(c[:-1])
tdargs[m] = val
except KeyError:
2021-01-19 22:36:20 +01:00
log.error(
"""Unknown time multiplier '%s' value in the \
2021-01-19 22:33:28 +01:00
'event_duration' field in the '%s' event. Supported multipliers \
2021-01-19 22:36:20 +01:00
are: '%s'."""
% (c, ev["title"], " ".join(TIME_MULTIPLIERS))
)
2021-01-19 22:33:28 +01:00
raise RuntimeError("Unknown time multiplier '%s'" % c)
except ValueError:
2021-01-19 22:36:20 +01:00
log.error(
"""Unable to parse '%s' value in the 'event_duration' \
field in the '%s' event."""
% (c, ev["title"])
)
2021-01-19 22:33:28 +01:00
raise ValueError("Unable to parse '%s'" % c)
return timedelta(**tdargs)
2021-01-19 22:36:20 +01:00
2021-01-19 22:33:28 +01:00
def parse_recursion(ev, field_name):
"""
Parse information about recurring events and return frequency of event and untill time.
"""
chunks = ev[field_name].split()
freq = chunks[0].upper()
2021-01-19 22:36:20 +01:00
if "until" in chunks:
until = datetime.strptime(chunks[-1], "%Y-%m-%d")
rrule = [freq, until]
2021-01-19 22:33:28 +01:00
else:
rrule = [freq]
return rrule
2021-01-19 22:36:20 +01:00
2021-01-19 22:33:28 +01:00
def parse_article(generator, metadata):
"""Collect articles metadata to be used for building the event calendar
:returns: None
"""
2021-01-19 22:36:20 +01:00
if "event_start" not in metadata:
2021-01-19 22:33:28 +01:00
return
2021-01-19 22:36:20 +01:00
dtstart = parse_tstamp(metadata, "event_start")
rrule = []
if "event_end" in metadata:
dtend = parse_tstamp(metadata, "event_end")
2021-01-19 22:33:28 +01:00
2021-01-19 22:36:20 +01:00
elif "event_duration" in metadata:
2021-01-19 22:33:28 +01:00
dtdelta = parse_timedelta(metadata)
dtend = dtstart + dtdelta
2021-01-19 22:36:20 +01:00
if "event_recurring" in metadata:
rrule = parse_recursion(metadata, "event_recurring")
2021-01-19 22:33:28 +01:00
else:
2021-01-19 22:36:20 +01:00
msg = (
"Either 'event_end' or 'event_duration' must be"
2021-02-09 08:04:17 +01:00
+ " specified in the event named '%s'" % metadata["title"]
2021-01-19 22:36:20 +01:00
)
2021-01-19 22:33:28 +01:00
log.error(msg)
raise ValueError(msg)
events.append(Event(dtstart, dtend, metadata, rrule))
def generate_ical_file(generator):
2021-01-19 22:36:20 +01:00
"""Generate an iCalendar file"""
2021-01-19 22:33:28 +01:00
from icalendar import vDatetime as vd
global events
2021-01-19 22:36:20 +01:00
ics_calendars = generator.settings["PLUGIN_EVENTS"]["ics_calendars"]
default_loc = generator.settings["PLUGIN_EVENTS"]["default_location"]
category_calendar = generator.settings["PLUGIN_EVENTS"]["calendar_per_category"]
2021-01-19 22:33:28 +01:00
if not ics_calendars:
return
if not category_calendar:
ics_calendars = ics_calendars[:1]
for calendar in ics_calendars:
2021-01-19 22:36:20 +01:00
ics_fname = os.path.join(generator.settings["OUTPUT_PATH"], calendar)
2021-01-19 22:33:28 +01:00
log.debug("Generating calendar at %s with %d events" % (ics_fname, len(events)))
2021-01-19 22:36:20 +01:00
tz = generator.settings.get("TIMEZONE", "UTC")
2021-01-19 22:33:28 +01:00
tz = pytz.timezone(tz)
ical = icalendar.Calendar()
2021-01-19 22:36:20 +01:00
ical.add("prodid", "-//My calendar product//mxm.dk//")
ical.add("version", "2.0")
2021-01-19 22:33:28 +01:00
2021-01-19 22:36:20 +01:00
DEFAULT_LANG = generator.settings["DEFAULT_LANG"]
2021-01-19 22:33:28 +01:00
curr_events = events if not localized_events else localized_events[DEFAULT_LANG]
for e in curr_events:
2021-01-19 22:36:20 +01:00
if str(e.metadata["category"]) in calendar or not category_calendar:
# force convert to ical format here, because otherwise it doesn't happen?
2021-01-19 22:33:28 +01:00
dtend = vd(e.dtend).to_ical()
dtstart = vd(e.dtstart).to_ical()
2021-01-19 22:36:20 +01:00
dtstamp = vd(e.metadata["date"]).to_ical()
2021-01-19 22:33:28 +01:00
# print('*'*10)
# print(e.metadata['category'])
ie = icalendar.Event(
2021-01-19 22:36:20 +01:00
summary=e.metadata["title"],
dtstart=dtstart,
2021-01-19 22:33:28 +01:00
dtend=dtend,
2021-01-19 22:36:20 +01:00
dtstamp=dtstamp,
2021-01-19 22:33:28 +01:00
priority=5,
2021-01-19 22:36:20 +01:00
uid=e.metadata["title"],
2021-01-19 22:33:28 +01:00
)
2021-01-19 22:36:20 +01:00
if "event_location" in e.metadata:
ie.add("location", e.metadata["event_location"])
2021-01-19 22:33:28 +01:00
elif default_loc:
2021-01-19 22:36:20 +01:00
ie.add("location", default_loc)
2021-01-19 22:33:28 +01:00
2021-01-19 22:36:20 +01:00
if "event_recurring" in e.metadata:
if len(e.rrule) >= 2:
ie.add("rrule", {"freq": e.rrule[0], "until": e.rrule[1]})
2021-01-19 22:33:28 +01:00
else:
2021-01-19 22:36:20 +01:00
ie.add("rrule", {"freq": e.rrule[0]})
2021-01-19 22:33:28 +01:00
ical.add_component(ie)
2021-01-19 22:36:20 +01:00
with open(ics_fname, "wb") as f:
2021-01-19 22:33:28 +01:00
f.write(ical.to_ical())
def generate_localized_events(generator):
""" Generates localized events dict if i18n_subsites plugin is active """
if "i18n_subsites" in generator.settings["PLUGINS"]:
2021-01-19 22:36:20 +01:00
if not os.path.exists(generator.settings["OUTPUT_PATH"]):
os.makedirs(generator.settings["OUTPUT_PATH"])
2021-01-19 22:33:28 +01:00
for e in events:
if "lang" in e.metadata:
localized_events[e.metadata["lang"]].append(e)
else:
2021-01-19 22:36:20 +01:00
log.debug(
"event %s contains no lang attribute" % (e.metadata["title"],)
)
2021-01-19 22:33:28 +01:00
def generate_events_list(generator):
"""Populate the event_list variable to be used in jinja templates"""
if not localized_events:
2021-01-19 22:36:20 +01:00
generator.context["events_list"] = sorted(
events, reverse=True, key=lambda ev: (ev.dtstart, ev.dtend)
)
2021-01-19 22:33:28 +01:00
else:
2021-01-19 22:36:20 +01:00
generator.context["events_list"] = {
k: sorted(v, reverse=True, key=lambda ev: (ev.dtstart, ev.dtend))
for k, v in localized_events.items()
}
2021-01-19 22:33:28 +01:00
def initialize_events(article_generator):
"""
Clears the events list before generating articles to properly support plugins with
multiple generation passes like i18n_subsites
"""
del events[:]
localized_events.clear()
2021-01-19 22:36:20 +01:00
2021-01-19 22:33:28 +01:00
def register():
signals.article_generator_init.connect(initialize_events)
signals.article_generator_context.connect(parse_article)
signals.article_generator_finalized.connect(generate_localized_events)
signals.article_generator_finalized.connect(generate_ical_file)
signals.article_generator_finalized.connect(generate_events_list)