maximum syndication: turns a list of RSS feeds in to HUGO posts. Allows lumbung-interlokal members to post from their own blog/website to
#!/bin/python3 rss feed aggregator
#© 2021 roel roscam abbing gplv3 etc
import requests
import jinja2
import os
import shutil
import feedparser
from urllib.parse import urlparse
from ast import literal_eval as make_tuple
from slugify import slugify
from bs4 import BeautifulSoup
import time
import arrow
def write_etag(feed_name, feed_data):
save timestamp of when feed was last modified
etag = ''
modified = ''
if 'etag' in data:
etag = data.etag
if 'modified' in data:
modified = data.modified
if etag or modified:
with open(os.path.join('etags',feed_name +'.txt'),'w') as f:
f.write(str((etag, modified)))
def get_etag(feed_name):
return timestamp of when feed was last modified
fn = os.path.join('etags',feed_name +'.txt')
etag = ''
modified = ''
if os.path.exists(fn):
etag, modified = make_tuple(open(fn,'r').read())
return etag, modified
def create_frontmatter(entry):
parse RSS metadata and return as frontmatter
if 'published' in entry:
published = entry.published_parsed
if 'updated' in entry:
published = entry.updated_parsed
published = arrow.get(published)
frontmatter = {
'date': published.format(),
'summary': '',
return frontmatter
def create_post(post_dir, entry):
write hugo post based on RSS entry
frontmatter = create_frontmatter(entry)
if not os.path.exists(post_dir):
post_content = entry.content[0].value
parsed_content = parse_posts(post_dir, post_content)
with open(os.path.join(post_dir,'index.html'),'w') as f:
post = template.render(frontmatter=frontmatter, content=parsed_content)
print('created post for', entry.title, '({})'.format(
def grab_media(post_directory, url):
download media linked in post to have local copy
if download succeeds return new local path otherwise return url
image = urlparse(url).path.split('/')[-1]
#TODO: stream is true is a conditional so we could check the headers for things, mimetype etc
response = requests.get(url, stream=True)
except Exception as e:
return url
if not os.path.exists(os.path.join(post_directory, image)):
with open(os.path.join(post_directory, image), 'wb') as img_file:
shutil.copyfileobj(response.raw, img_file)
print('Downloaded cover image', image)
return image
return image
except Exception as e:
print('Failed to download cover image', url)
return url
def parse_posts(post_direntry, post_content):
parse the post content to for media items
replace foreign media item with local copy
soup = BeautifulSoup(post_content, "html.parser")
media = []
for img in soup(['img','object']):
local_image = grab_media(post_dir, img['src'])
if img['src'] != local_image:
print(img['src'], '->', local_image)
img['src'] = local_image
return soup.decode()
feed_urls = open('feeds_list.txt','r').read().splitlines()
start = time.time()
if not os.path.exists('etags'):
env = jinja2.Environment(
output_dir = os.environ.get('OUTPUT_DIR', '/home/r/Programming/')
#output_dir = os.environ.get('OUTPUT_DIR', 'network/')
if not os.path.exists(output_dir):
template = env.get_template('')
for feed_url in feed_urls[7:]:
feed_name = urlparse(feed_url).netloc
etag, modified = get_etag(feed_name)
if modified:
data = feedparser.parse(feed_url, modified=modified)
elif etag:
data = feedparser.parse(feed_url, etag=etag)
data = feedparser.parse(feed_url)
print(data.status, feed_url)
if data.status == 200:
#write_etag(feed_url, data)
# if 'title' in data.feed:
# print('#'*10)
# print(data.feed.title)
# print('#'*10)
# print('\n')
# print('FEED KEYS')
# print(data.keys())
# print('\n')
for entry in data.entries:
# print(entry.title)
# print(entry.keys())
# print('\n')
# # if 'tags' in entry:
# # print(entry.title, entry.tags)
post_dir = os.path.join(output_dir, feed_name, slugify(entry.title))
create_post(post_dir, entry)
end = time.time()
print(end - start)