Browse Source

handle OPDS feeds from calibre-web

master
rra 3 years ago
parent
commit
f950f03792
  1. 149
      rss_aggregator.py

149
rss_aggregator.py

@ -1,7 +1,7 @@
#!/bin/python3 #!/bin/python3
#lumbung.space rss feed aggregator #lumbung.space rss & opds feed parser
#© 2021 roel roscam abbing gplv3 etc #© 2022 roel roscam abbing agplv3 etc
import requests import requests
import jinja2 import jinja2
@ -60,6 +60,21 @@ def create_frontmatter(entry):
author = entry.author author = entry.author
else: else:
author = '' author = ''
if 'authors' in entry:
authors = []
for a in entry.authors:
authors.append(a['name'])
if 'summary' in entry:
summary = entry.summary
else:
summary = ''
if 'publisher' in entry:
publisher = entry.publisher
else:
publisher = ''
tags = [] tags = []
if 'tags' in entry: if 'tags' in entry:
@ -67,15 +82,28 @@ def create_frontmatter(entry):
for t in entry.tags: for t in entry.tags:
tags.append(t['term']) tags.append(t['term'])
frontmatter = { if "opds" in entry:
'title':entry.title, frontmatter = {
'date': published.format(), 'title':entry.title,
'summary': '', 'date': published.format(),
'author': author, 'summary': summary,
'original_link': entry.link, 'author': ",".join(authors),
'feed_name': entry['feed_name'], 'publisher': publisher,
'tags': str(tags) 'original_link': entry.links[0]['href'].replace('opds/cover/','books/'),
} 'feed_name': entry['feed_name'],
'tags': str(tags),
'category': "books"
}
else:
frontmatter = {
'title':entry.title,
'date': published.format(),
'summary': '',
'author': author,
'original_link': entry.link,
'feed_name': entry['feed_name'],
'tags': str(tags)
}
return frontmatter return frontmatter
@ -100,32 +128,34 @@ def create_post(post_dir, entry):
f.write(post) f.write(post)
print('created post for', entry.title, '({})'.format(entry.link)) print('created post for', entry.title, '({})'.format(entry.link))
def grab_media(post_directory, url): def grab_media(post_directory, url, prefered_name):
""" """
download media linked in post to have local copy download media linked in post to have local copy
if download succeeds return new local path otherwise return url if download succeeds return new local path otherwise return url
""" """
image = urlparse(url).path.split('/')[-1] media_item = urlparse(url).path.split('/')[-1]
if prefered_name:
media_item = prefered_name
try: try:
if not os.path.exists(os.path.join(post_directory, image)): if not os.path.exists(os.path.join(post_directory, media_item)):
#TODO: stream is true is a conditional so we could check the headers for things, mimetype etc #TODO: stream is true is a conditional so we could check the headers for things, mimetype etc
response = requests.get(url, stream=True) response = requests.get(url, stream=True)
if response.ok: if response.ok:
with open(os.path.join(post_directory, image), 'wb') as img_file: with open(os.path.join(post_directory, media_item), 'wb') as media_file:
shutil.copyfileobj(response.raw, img_file) shutil.copyfileobj(response.raw, media_file)
print('Downloaded cover image', image) print('Downloaded media item', media_item)
return image return media_item
return image return media_item
elif os.path.exists(os.path.join(post_directory, image)): elif os.path.exists(os.path.join(post_directory, media_item)):
return image return media_item
except Exception as e: except Exception as e:
print('Failed to download image', url) print('Failed to download image', url)
print(e) print(e)
return url return url
def parse_posts(post_dir, post_content): def parse_posts(post_dir, post_content):
""" """
parse the post content to for media items parse the post content to for media items
@ -147,6 +177,54 @@ def parse_posts(post_dir, post_content):
iframe.decompose() iframe.decompose()
return soup.decode() return soup.decode()
def create_opds_post(post_dir, entry):
"""
create a HUGO post based on OPDS entry
or update it if the timestamp is newer
Downloads the cover & file
"""
frontmatter = create_frontmatter(entry)
if not os.path.exists(post_dir):
os.makedirs(post_dir)
if os.path.exists(os.path.join(post_dir, '.timestamp')):
old_timestamp = open(os.path.join(post_dir, '.timestamp')).read()
old_timestamp = arrow.get(float(old_timestamp))
current_timestamp = arrow.get(entry['updated_parsed'])
if current_timestamp > old_timestamp:
pass
else:
print('Book "{}..." already up to date'.format(entry['title'][:32]))
return
for item in entry.links:
ft = item['type'].split('/')[-1]
fn = item['rel'].split('/')[-1]
if fn == "acquisition":
fn = "publication" #calling the publications acquisition is weird
prefered_name = "{}-{}.{}".format(fn, slugify(entry['title']), ft)
grab_media(post_dir, item['href'], prefered_name)
if "summary" in entry:
summary = entry.summary
else:
summary = ""
with open(os.path.join(post_dir,'index.md'),'w') as f:
post = template.render(frontmatter=frontmatter, content=summary)
f.write(post)
print('created post for Book', entry.title)
with open(os.path.join(post_dir, '.timestamp'), 'w') as f:
timestamp = arrow.get(entry['updated_parsed'])
f.write(timestamp.format('X'))
def grab_feed(feed_url): def grab_feed(feed_url):
""" """
check whether feed has been updated check whether feed has been updated
@ -219,6 +297,14 @@ for feed_url in feed_urls:
data = grab_feed(feed_url) data = grab_feed(feed_url)
if data: if data:
opds_feed = False
for i in data.feed['links']:
if i['rel'] == 'self':
if 'opds' in i['type']:
opds_feed = True
print("opds!")
for entry in data.entries: for entry in data.entries:
# if 'tags' in entry: # if 'tags' in entry:
# for tag in entry.tags: # for tag in entry.tags:
@ -228,15 +314,28 @@ for feed_url in feed_urls:
entry['feed_name'] = feed_name entry['feed_name'] = feed_name
post_name = slugify(entry.title) post_name = slugify(entry.title)
if opds_feed:
entry['opds'] = True
#format: Beyond-Debiasing-Report_Online-75535a4886e3
post_name = slugify(entry['title'])+'-'+entry['id'].split('-')[-1]
post_dir = os.path.join(output_dir, feed_name, post_name) post_dir = os.path.join(output_dir, feed_name, post_name)
if post_name not in existing_posts: if post_name not in existing_posts:
#if there is a blog entry we dont already have, make it #if there is a blog entry we dont already have, make it
create_post(post_dir, entry) if opds_feed:
create_opds_post(post_dir, entry)
else:
create_post(post_dir, entry)
elif post_name in existing_posts: elif post_name in existing_posts:
#if we already have it, update it #if we already have it, update it
create_post(post_dir, entry) if opds_feed:
create_opds_post(post_dir, entry)
else:
create_post(post_dir, entry)
existing_posts.remove(post_name) # create list of posts which have not been returned by the feed existing_posts.remove(post_name) # create list of posts which have not been returned by the feed
for post in existing_posts: for post in existing_posts:

Loading…
Cancel
Save