#!/bin/python3
#lumbung.space rss & opds feed parser
#© 2022 roel roscam abbing agplv3 etc
import requests
import jinja2
import os
import shutil
import feedparser
from urllib . parse import urlparse
from ast import literal_eval as make_tuple
from slugify import slugify
from bs4 import BeautifulSoup
import time
import arrow
from re import sub
def write_etag ( feed_name , feed_data ) :
"""
save timestamp of when feed was last modified
"""
etag = ' '
modified = ' '
if ' etag ' in feed_data :
etag = feed_data . etag
if ' modified ' in feed_data :
modified = feed_data . modified
if etag or modified :
with open ( os . path . join ( ' etags ' , feed_name + ' .txt ' ) , ' w ' ) as f :
f . write ( str ( ( etag , modified ) ) )
def get_etag ( feed_name ) :
"""
return timestamp of when feed was last modified
"""
fn = os . path . join ( ' etags ' , feed_name + ' .txt ' )
etag = ' '
modified = ' '
if os . path . exists ( fn ) :
etag , modified = make_tuple ( open ( fn , ' r ' ) . read ( ) )
return etag , modified
def create_frontmatter ( entry ) :
"""
parse RSS metadata and return as frontmatter
"""
if ' published ' in entry :
published = entry . published_parsed
if ' updated ' in entry :
published = entry . updated_parsed
published = arrow . get ( published )
if ' author ' in entry :
author = entry . author
else :
author = ' '
if ' authors ' in entry :
authors = [ ]
for a in entry . authors :
authors . append ( a [ ' name ' ] )
if ' summary ' in entry :
summary = entry . summary
else :
summary = ' '
if ' publisher ' in entry :
publisher = entry . publisher
else :
publisher = ' '
tags = [ ]
if ' tags ' in entry :
#TODO finish categories
for t in entry . tags :
tags . append ( t [ ' term ' ] )
if " opds " in entry :
frontmatter = {
' title ' : entry . title ,
' date ' : published . format ( ) ,
' summary ' : summary ,
' author ' : " , " . join ( authors ) ,
' publisher ' : publisher ,
' original_link ' : entry . links [ 0 ] [ ' href ' ] . replace ( ' opds/cover/ ' , ' books/ ' ) ,
' feed_name ' : entry [ ' feed_name ' ] ,
' tags ' : str ( tags ) ,
' category ' : " books "
}
else :
frontmatter = {
' title ' : entry . title ,
' date ' : published . format ( ) ,
' summary ' : ' ' ,
' author ' : author ,
' original_link ' : entry . link ,
' feed_name ' : entry [ ' feed_name ' ] ,
' tags ' : str ( tags )
}
return frontmatter
def sanitize_yaml ( frontmatter ) :
"""
Escapes any occurences of double quotes
in any of the frontmatter fields
See : https : / / docs . octoprint . org / en / master / configuration / yaml . html #interesting-data-types
"""
for k , v in frontmatter . items ( ) :
if type ( v ) == type ( [ ] ) :
#some fields are lists
l = [ ]
for i in v :
i = sub ( ' " ' , ' \\ " ' , i )
l . append ( i )
frontmatter [ k ] = l
else :
v = sub ( ' " ' , ' \\ " ' , v )
frontmatter [ k ] = v
return frontmatter
def create_post ( post_dir , entry ) :
"""
write hugo post based on RSS entry
"""
frontmatter = create_frontmatter ( entry )
if not os . path . exists ( post_dir ) :
os . makedirs ( post_dir )
if ' content ' in entry :
post_content = entry . content [ 0 ] . value
else :
post_content = entry . summary
parsed_content = parse_posts ( post_dir , post_content )
with open ( os . path . join ( post_dir , ' index.html ' ) , ' w ' ) as f : #n.b. .html
post = template . render ( frontmatter = sanitize_yaml ( frontmatter ) , content = parsed_content )
f . write ( post )
print ( ' created post for ' , entry . title , ' ( {} ) ' . format ( entry . link ) )
def grab_media ( post_directory , url , prefered_name = None ) :
"""
download media linked in post to have local copy
if download succeeds return new local path otherwise return url
"""
media_item = urlparse ( url ) . path . split ( ' / ' ) [ - 1 ]
if prefered_name :
media_item = prefered_name
try :
if not os . path . exists ( os . path . join ( post_directory , media_item ) ) :
#TODO: stream is true is a conditional so we could check the headers for things, mimetype etc
response = requests . get ( url , stream = True )
if response . ok :
with open ( os . path . join ( post_directory , media_item ) , ' wb ' ) as media_file :
shutil . copyfileobj ( response . raw , media_file )
print ( ' Downloaded media item ' , media_item )
return media_item
return media_item
elif os . path . exists ( os . path . join ( post_directory , media_item ) ) :
return media_item
except Exception as e :
print ( ' Failed to download image ' , url )
print ( e )
return url
def parse_posts ( post_dir , post_content ) :
"""
parse the post content to for media items
replace foreign image with local copy
filter out iframe sources not in allowlist
"""
soup = BeautifulSoup ( post_content , " html.parser " )
allowed_iframe_sources = [ ' youtube.com ' , ' vimeo.com ' , ' tv.lumbung.space ' ]
media = [ ]
for img in soup ( [ ' img ' , ' object ' ] ) :
local_image = grab_media ( post_dir , img [ ' src ' ] )
if img [ ' src ' ] != local_image :
img [ ' src ' ] = local_image
for iframe in soup ( [ ' iframe ' ] ) :
if not any ( source in iframe [ ' src ' ] for source in allowed_iframe_sources ) :
print ( ' filtered iframe: {} ... ' . format ( iframe [ ' src ' ] [ : 25 ] ) )
iframe . decompose ( )
return soup . decode ( )
def create_opds_post ( post_dir , entry ) :
"""
create a HUGO post based on OPDS entry
or update it if the timestamp is newer
Downloads the cover & file
"""
frontmatter = create_frontmatter ( entry )
if not os . path . exists ( post_dir ) :
os . makedirs ( post_dir )
if os . path . exists ( os . path . join ( post_dir , ' .timestamp ' ) ) :
old_timestamp = open ( os . path . join ( post_dir , ' .timestamp ' ) ) . read ( )
old_timestamp = arrow . get ( float ( old_timestamp ) )
current_timestamp = arrow . get ( entry [ ' updated_parsed ' ] )
if current_timestamp > old_timestamp :
pass
else :
print ( ' Book " {} ... " already up to date ' . format ( entry [ ' title ' ] [ : 32 ] ) )
return
for item in entry . links :
ft = item [ ' type ' ] . split ( ' / ' ) [ - 1 ]
fn = item [ ' rel ' ] . split ( ' / ' ) [ - 1 ]
if fn == " acquisition " :
fn = " publication " #calling the publications acquisition is weird
prefered_name = " {} - {} . {} " . format ( fn , slugify ( entry [ ' title ' ] ) , ft )
grab_media ( post_dir , item [ ' href ' ] , prefered_name )
if " summary " in entry :
summary = entry . summary
else :
summary = " "
with open ( os . path . join ( post_dir , ' index.md ' ) , ' w ' ) as f :
post = template . render ( frontmatter = sanitize_yaml ( frontmatter ) , content = summary )
f . write ( post )
print ( ' created post for Book ' , entry . title )
with open ( os . path . join ( post_dir , ' .timestamp ' ) , ' w ' ) as f :
timestamp = arrow . get ( entry [ ' updated_parsed ' ] )
f . write ( timestamp . format ( ' X ' ) )
def grab_feed ( feed_url ) :
"""
check whether feed has been updated
download & return it if it has
"""
feed_name = urlparse ( feed_url ) . netloc
etag , modified = get_etag ( feed_name )
try :
if modified :
data = feedparser . parse ( feed_url , modified = modified )
elif etag :
data = feedparser . parse ( feed_url , etag = etag )
else :
data = feedparser . parse ( feed_url )
except Exception as e :
print ( ' Error grabbing feed ' )
print ( feed_name )
print ( e )
return False
if not data . bozo :
print ( data . status , feed_url )
if data . status == 200 :
#304 means the feed has not been modified since we last checked
write_etag ( feed_name , data )
return data
return False
else :
print ( data . bozo_exception , feed_url )
return False
feed_urls = open ( ' feeds_list.txt ' , ' r ' ) . read ( ) . splitlines ( )
start = time . time ( )
if not os . path . exists ( ' etags ' ) :
os . mkdir ( ' etags ' )
env = jinja2 . Environment (
loader = jinja2 . FileSystemLoader ( os . path . curdir )
)
output_dir = os . environ . get ( ' OUTPUT_DIR ' , ' /home/r/Programming/lumbung.space/lumbung.space-web/content/posts/ ' )
#output_dir = os.environ.get('OUTPUT_DIR', 'network/')
if not os . path . exists ( output_dir ) :
os . makedirs ( output_dir )
template = env . get_template ( ' post_template.md ' )
#add iframe to the allowlist of feedparser's sanitizer,
#this is now handled in parse_post()
feedparser . sanitizer . _HTMLSanitizer . acceptable_elements | = { ' iframe ' }
for feed_url in feed_urls :
feed_name = urlparse ( feed_url ) . netloc
feed_dir = os . path . join ( output_dir , feed_name )
if not os . path . exists ( feed_dir ) :
os . makedirs ( feed_dir )
existing_posts = os . listdir ( feed_dir )
data = grab_feed ( feed_url )
if data :
opds_feed = False
for i in data . feed [ ' links ' ] :
if i [ ' rel ' ] == ' self ' :
if ' opds ' in i [ ' type ' ] :
opds_feed = True
print ( " opds! " )
for entry in data . entries :
# if 'tags' in entry:
# for tag in entry.tags:
# for x in ['lumbung.space', 'D15', 'lumbung']:
# if x in tag['term']:
# print(entry.title)
entry [ ' feed_name ' ] = feed_name
post_name = slugify ( entry . title )
if opds_feed :
entry [ ' opds ' ] = True
#format: Beyond-Debiasing-Report_Online-75535a4886e3
post_name = slugify ( entry [ ' title ' ] ) + ' - ' + entry [ ' id ' ] . split ( ' - ' ) [ - 1 ]
post_dir = os . path . join ( output_dir , feed_name , post_name )
if post_name not in existing_posts :
#if there is a blog entry we dont already have, make it
if opds_feed :
create_opds_post ( post_dir , entry )
else :
create_post ( post_dir , entry )
elif post_name in existing_posts :
#if we already have it, update it
if opds_feed :
create_opds_post ( post_dir , entry )
else :
create_post ( post_dir , entry )
existing_posts . remove ( post_name ) # create list of posts which have not been returned by the feed
for post in existing_posts :
#remove blog posts no longer returned by the RSS feed
print ( ' deleted ' , post )
shutil . rmtree ( os . path . join ( feed_dir , slugify ( post ) ) )
end = time . time ( )
print ( end - start )