import os import shutil import time from hashlib import md5 from ast import literal_eval as make_tuple from pathlib import Path from urllib.parse import urlparse from re import sub import arrow import feedparser import jinja2 import requests from bs4 import BeautifulSoup from slugify import slugify from re import compile as re_compile yamlre = re_compile('"') import pprint db = {} template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) def write_etag(feed_name, feed_data): """ save timestamp of when feed was last modified """ etag = "" modified = "" if "etag" in feed_data: etag = feed_data.etag if "modified" in feed_data: modified = feed_data.modified if etag or modified: with open(os.path.join("etags", feed_name + ".txt"), "w") as f: f.write(str((etag, modified))) def get_etag(feed_name): """ return timestamp of when feed was last modified """ fn = os.path.join("etags", feed_name + ".txt") etag = "" modified = "" if os.path.exists(fn): etag, modified = make_tuple(open(fn, "r").read()) return etag, modified def create_frontmatter(entry): """ parse RSS metadata and return as frontmatter """ if 'published' in entry: published = entry.published_parsed elif 'updated' in entry: published = entry.updated_parsed if not published: published = "2023-03-09T16:31:47.294841" # !!! placeholder hack for now, to make this whole script work published = arrow.get(published) if 'author' in entry: author = entry.author else: author = '' if 'authors' in entry: authors = [] for a in entry.authors: authors.append(a['name']) if 'summary' in entry: summary = entry.summary else: summary = '' if 'publisher' in entry: publisher = entry.publisher else: publisher = '' tags = [] if 'tags' in entry: #TODO finish categories for t in entry.tags: tags.append(t['term']) if "featured_image" in entry: featured_image = entry.featured_image else: featured_image = '' card_type = "network" if entry.feed_name == "pen.lumbung.space": card_type = "pen" if "opds" in entry: frontmatter = { 'title':entry.title, 'date': published.format(), 'summary': summary, 'author': ",".join(authors), 'publisher': publisher, 'original_link': entry.links[0]['href'].replace('opds/cover/','books/'), 'feed_name': entry['feed_name'], 'tags': tags, 'category': "books" } else: frontmatter = { 'title':entry.title, 'date': published.format(), 'summary': '', 'author': author, 'original_link': entry.link, 'feed_name': entry['feed_name'], 'tags': tags, 'card_type': card_type, 'featured_image': featured_image } return frontmatter def sanitize_yaml (frontmatter): """ Escapes any occurences of double quotes in any of the frontmatter fields See: https://docs.octoprint.org/en/master/configuration/yaml.html#interesting-data-types """ for k, v in frontmatter.items(): if type(v) == type([]): #some fields are lists l = [] for i in v: i = yamlre.sub('\\"', i) l.append(i) frontmatter[k] = l else: v = yamlre.sub('\\"', v) frontmatter[k] = v return frontmatter def parse_enclosures(post_dir, entry): """ Parses feed enclosures which are featured media Can be featured image but also podcast entries https://pythonhosted.org/feedparser/reference-entry-enclosures.html """ #TODO parse more than images #TODO handle the fact it could be multiple items for e in entry.enclosures: if "type" in e: print("found enclosed media", e.type) if "image/" in e.type: featured_image = grab_media(post_dir, e.href) entry["featured_image"] = featured_image else: print("FIXME:ignoring enclosed", e.type) return entry def parse_content(post_dir, entry): if "enclosures" in entry: entry = parse_enclosures(post_dir, entry) frontmatter = create_frontmatter(entry) print(">>> frontmatter:", frontmatter) if not os.path.exists(post_dir): os.makedirs(post_dir) if "content" in entry: post_content = entry.content[0].value else: post_content = entry.summary parsed_content = parse_posts(post_dir, post_content) return parsed_content, frontmatter def create_post(post_dir, parsed_content, frontmatter): """ write hugo post based on RSS entry """ template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) template = env.get_template("post.template.html") with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html post = template.render(frontmatter=sanitize_yaml(frontmatter), content=parsed_content) f.write(post) print("created post for " + frontmatter["title"] + " (" + frontmatter["original_link"] + ")") def add_to_db(post_dir, parsed_content, frontmatter): db[post_dir] = {} db[post_dir]["content"] = parsed_content db[post_dir]["frontmatter"] = frontmatter def grab_media(post_directory, url, prefered_name=None): """ download media linked in post to have local copy if download succeeds return new local path otherwise return url """ media_item = urlparse(url).path.split('/')[-1] headers = { 'User-Agent': 'https://git.autonomic.zone/ruangrupa/lumbunglib', 'From': 'info@lumbung.space' # This is another valid field } if prefered_name: media_item = prefered_name try: if not os.path.exists(os.path.join(post_directory, media_item)): #TODO: stream is true is a conditional so we could check the headers for things, mimetype etc response = requests.get(url, headers=headers, stream=True) if response.ok: with open(os.path.join(post_directory, media_item), 'wb') as media_file: shutil.copyfileobj(response.raw, media_file) print('Downloaded media item', media_item) return media_item else: print("Download failed", response.status_code) return url return media_item elif os.path.exists(os.path.join(post_directory, media_item)): return media_item except Exception as e: print('Failed to download image', url) print(e) return url def parse_posts(post_dir, post_content): """ parse the post content to for media items replace foreign image with local copy filter out iframe sources not in allowlist """ soup = BeautifulSoup(post_content, "html.parser") allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"] for img in soup(["img", "object"]): if img.get("src") != None: local_image = grab_media(post_dir, img["src"]) if img["src"] != local_image: img["src"] = local_image for iframe in soup(["iframe"]): if not any(source in iframe["src"] for source in allowed_iframe_sources): print("filtered iframe: {}...".format(iframe["src"][:25])) iframe.decompose() return soup.decode() def grab_feed(feed_url): """ check whether feed has been updated download & return it if it has """ feed_name = urlparse(feed_url).netloc etag, modified = get_etag(feed_name) # !!! disabled for now, for testing # try: # if modified: # data = feedparser.parse(feed_url, modified=modified) # elif etag: # data = feedparser.parse(feed_url, etag=etag) # else: # data = feedparser.parse(feed_url) # except Exception as e: # print("Error grabbing feed") # print(feed_name) # print(e) # return False data = feedparser.parse(feed_url) if "status" in data: print(data.status, feed_url) if data.status == 200: # 304 means the feed has not been modified since we last checked write_etag(feed_name, data) return data return False def main(output_dir): feed_urls = open("feeds.txt", "r").read().splitlines() start = time.time() if not os.path.exists("etags"): os.mkdir("etags") if not os.path.exists(output_dir): os.makedirs(output_dir) feed_dict = dict() for url in feed_urls: feed_name = urlparse(url).netloc feed_dict[url] = feed_name feed_names = feed_dict.values() content_dirs = os.listdir(output_dir) # for i in content_dirs: # if i not in feed_names: # shutil.rmtree(os.path.join(output_dir, i)) # print("%s not in feeds_list.txt, removing local data" %(i)) # add iframe to the allowlist of feedparser's sanitizer, # this is now handled in parse_post() # !!! disabled for now # feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"} for feed_url in feed_urls: print("\n>>>>>>>>>>>>>>>>>>>>>>\n") feed_name = feed_dict[feed_url] feed_dir = os.path.join(output_dir, feed_name) if not os.path.exists(feed_dir): os.makedirs(feed_dir) existing_posts = os.listdir(feed_dir) data = grab_feed(feed_url) if data: opds_feed = False for i in data.feed['links']: if i['rel'] == 'self': if 'opds' in i['type']: opds_feed = True print("OPDS type feed!") for entry in data.entries: # if 'tags' in entry: # for tag in entry.tags: # for x in ['lumbung.space', 'D15', 'lumbung']: # if x in tag['term']: # print(entry.title) entry["feed_name"] = feed_name post_name = slugify(entry.title) # pixelfed returns the whole post text as the post name. max # filename length is 255 on many systems. here we're shortening # the name and adding a hash to it to avoid a conflict in a # situation where 2 posts start with exactly the same text. if len(post_name) > 150: post_hash = md5(bytes(post_name, "utf-8")) post_name = post_name[:150] + "-" + post_hash.hexdigest() if opds_feed: entry['opds'] = True #format: Beyond-Debiasing-Report_Online-75535a4886e3 post_name = slugify(entry['title'])+'-'+entry['id'].split('-')[-1] post_dir = os.path.join(output_dir, feed_name, post_name) post_dirs.append(post_dir) if post_name not in existing_posts: # if there is a blog entry we dont already have, make it parsed_content, frontmatter = parse_content(post_dir, entry) create_post(post_dir, parsed_content, frontmatter) elif post_name in existing_posts: # if we already have it, update it parsed_content, frontmatter = parse_content(post_dir, entry) create_post(post_dir, parsed_content, frontmatter) # create list of posts which have not been returned by the feed existing_posts.remove(post_name) # add this post to the db add_to_db(post_dir, parsed_content, frontmatter) # !!! disabled for now for testing # for post in existing_posts: # # remove blog posts no longer returned by the RSS feed # print("deleted", post) # shutil.rmtree(os.path.join(feed_dir, slugify(post))) print("\n----------------------\n") end = time.time() print(end - start) if __name__ == "__main__": post_dirs = [] output_dir = "feed-materials" main(output_dir) print("\n>>> db:") pprint.pprint(db) template = env.get_template("index.template.html") output_file = 'index.html' with open(output_file,'w') as f: index = template.render(db=db) f.write(index) print('>>> written:', output_file)