#!/bin/python3 #lumbung.space rss & opds feed parser #© 2022 roel roscam abbing agplv3 etc import requests import jinja2 import os import shutil import feedparser from urllib.parse import urlparse from ast import literal_eval as make_tuple from slugify import slugify from bs4 import BeautifulSoup import time import arrow from re import sub def write_etag(feed_name, feed_data): """ save timestamp of when feed was last modified """ etag = '' modified = '' if 'etag' in feed_data: etag = feed_data.etag if 'modified' in feed_data: modified = feed_data.modified if etag or modified: with open(os.path.join('etags',feed_name +'.txt'),'w') as f: f.write(str((etag, modified))) def get_etag(feed_name): """ return timestamp of when feed was last modified """ fn = os.path.join('etags',feed_name +'.txt') etag = '' modified = '' if os.path.exists(fn): etag, modified = make_tuple(open(fn,'r').read()) return etag, modified def create_frontmatter(entry): """ parse RSS metadata and return as frontmatter """ if 'published' in entry: published = entry.published_parsed if 'updated' in entry: published = entry.updated_parsed published = arrow.get(published) if 'author' in entry: author = entry.author else: author = '' if 'authors' in entry: authors = [] for a in entry.authors: authors.append(a['name']) if 'summary' in entry: summary = entry.summary else: summary = '' if 'publisher' in entry: publisher = entry.publisher else: publisher = '' tags = [] if 'tags' in entry: #TODO finish categories for t in entry.tags: tags.append(t['term']) if "opds" in entry: frontmatter = { 'title':entry.title, 'date': published.format(), 'summary': summary, 'author': ",".join(authors), 'publisher': publisher, 'original_link': entry.links[0]['href'].replace('opds/cover/','books/'), 'feed_name': entry['feed_name'], 'tags': str(tags), 'category': "books" } else: frontmatter = { 'title':entry.title, 'date': published.format(), 'summary': '', 'author': author, 'original_link': entry.link, 'feed_name': entry['feed_name'], 'tags': str(tags) } return frontmatter def sanitize_yaml (frontmatter): """ Escapes any occurences of double quotes in any of the frontmatter fields See: https://docs.octoprint.org/en/master/configuration/yaml.html#interesting-data-types """ for k, v in frontmatter.items(): if type(v) == type([]): #some fields are lists l = [] for i in v: i = sub('"', '\\"', i) l.append(i) frontmatter[k] = l else: v = sub('"', '\\"', v) frontmatter[k] = v return frontmatter def create_post(post_dir, entry): """ write hugo post based on RSS entry """ frontmatter = create_frontmatter(entry) if not os.path.exists(post_dir): os.makedirs(post_dir) if 'content' in entry: post_content = entry.content[0].value else: post_content = entry.summary parsed_content = parse_posts(post_dir, post_content) with open(os.path.join(post_dir,'index.html'),'w') as f: #n.b. .html post = template.render(frontmatter=sanitize_yaml(frontmatter), content=parsed_content) f.write(post) print('created post for', entry.title, '({})'.format(entry.link)) def grab_media(post_directory, url, prefered_name=None): """ download media linked in post to have local copy if download succeeds return new local path otherwise return url """ media_item = urlparse(url).path.split('/')[-1] if prefered_name: media_item = prefered_name try: if not os.path.exists(os.path.join(post_directory, media_item)): #TODO: stream is true is a conditional so we could check the headers for things, mimetype etc response = requests.get(url, stream=True) if response.ok: with open(os.path.join(post_directory, media_item), 'wb') as media_file: shutil.copyfileobj(response.raw, media_file) print('Downloaded media item', media_item) return media_item return media_item elif os.path.exists(os.path.join(post_directory, media_item)): return media_item except Exception as e: print('Failed to download image', url) print(e) return url def parse_posts(post_dir, post_content): """ parse the post content to for media items replace foreign image with local copy filter out iframe sources not in allowlist """ soup = BeautifulSoup(post_content, "html.parser") allowed_iframe_sources = ['youtube.com', 'vimeo.com', 'tv.lumbung.space'] media = [] for img in soup(['img','object']): local_image = grab_media(post_dir, img['src']) if img['src'] != local_image: img['src'] = local_image for iframe in soup(['iframe']): if not any(source in iframe['src'] for source in allowed_iframe_sources): print('filtered iframe: {}...'.format(iframe['src'][:25])) iframe.decompose() return soup.decode() def create_opds_post(post_dir, entry): """ create a HUGO post based on OPDS entry or update it if the timestamp is newer Downloads the cover & file """ frontmatter = create_frontmatter(entry) if not os.path.exists(post_dir): os.makedirs(post_dir) if os.path.exists(os.path.join(post_dir, '.timestamp')): old_timestamp = open(os.path.join(post_dir, '.timestamp')).read() old_timestamp = arrow.get(float(old_timestamp)) current_timestamp = arrow.get(entry['updated_parsed']) if current_timestamp > old_timestamp: pass else: print('Book "{}..." already up to date'.format(entry['title'][:32])) return for item in entry.links: ft = item['type'].split('/')[-1] fn = item['rel'].split('/')[-1] if fn == "acquisition": fn = "publication" #calling the publications acquisition is weird prefered_name = "{}-{}.{}".format(fn, slugify(entry['title']), ft) grab_media(post_dir, item['href'], prefered_name) if "summary" in entry: summary = entry.summary else: summary = "" with open(os.path.join(post_dir,'index.md'),'w') as f: post = template.render(frontmatter=sanitize_yaml(frontmatter), content=summary) f.write(post) print('created post for Book', entry.title) with open(os.path.join(post_dir, '.timestamp'), 'w') as f: timestamp = arrow.get(entry['updated_parsed']) f.write(timestamp.format('X')) def grab_feed(feed_url): """ check whether feed has been updated download & return it if it has """ feed_name = urlparse(feed_url).netloc etag, modified = get_etag(feed_name) try: if modified: data = feedparser.parse(feed_url, modified=modified) elif etag: data = feedparser.parse(feed_url, etag=etag) else: data = feedparser.parse(feed_url) except Exception as e: print('Error grabbing feed') print(feed_name) print(e) return False if not data.bozo: print(data.status, feed_url) if data.status == 200: #304 means the feed has not been modified since we last checked write_etag(feed_name, data) return data return False else: print(data.bozo_exception, feed_url) return False feed_urls = open('feeds_list.txt','r').read().splitlines() start = time.time() if not os.path.exists('etags'): os.mkdir('etags') env = jinja2.Environment( loader=jinja2.FileSystemLoader(os.path.curdir) ) output_dir = os.environ.get('OUTPUT_DIR', '/home/r/Programming/lumbung.space/lumbung.space-web/content/posts/') #output_dir = os.environ.get('OUTPUT_DIR', 'network/') if not os.path.exists(output_dir): os.makedirs(output_dir) template = env.get_template('post_template.md') #add iframe to the allowlist of feedparser's sanitizer, #this is now handled in parse_post() feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {'iframe'} for feed_url in feed_urls: feed_name = urlparse(feed_url).netloc feed_dir = os.path.join(output_dir, feed_name) if not os.path.exists(feed_dir): os.makedirs(feed_dir) existing_posts = os.listdir(feed_dir) data = grab_feed(feed_url) if data: opds_feed = False for i in data.feed['links']: if i['rel'] == 'self': if 'opds' in i['type']: opds_feed = True print("opds!") for entry in data.entries: # if 'tags' in entry: # for tag in entry.tags: # for x in ['lumbung.space', 'D15', 'lumbung']: # if x in tag['term']: # print(entry.title) entry['feed_name'] = feed_name post_name = slugify(entry.title) if opds_feed: entry['opds'] = True #format: Beyond-Debiasing-Report_Online-75535a4886e3 post_name = slugify(entry['title'])+'-'+entry['id'].split('-')[-1] post_dir = os.path.join(output_dir, feed_name, post_name) if post_name not in existing_posts: #if there is a blog entry we dont already have, make it if opds_feed: create_opds_post(post_dir, entry) else: create_post(post_dir, entry) elif post_name in existing_posts: #if we already have it, update it if opds_feed: create_opds_post(post_dir, entry) else: create_post(post_dir, entry) existing_posts.remove(post_name) # create list of posts which have not been returned by the feed for post in existing_posts: #remove blog posts no longer returned by the RSS feed print('deleted', post) shutil.rmtree(os.path.join(feed_dir, slugify(post))) end = time.time() print(end - start)