Multifeeding RSS streams into points of access. https://multi.vvvvvvaria.org/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

102 lines
2.6 KiB

import feedparser
from simpledatabase import SimpleDatabase
import json
from datetime import datetime, date, timedelta
from backports.zoneinfo import ZoneInfo
def update():
""" Update all feeds """
feeds = open('feeds.txt').readlines()
db = SimpleDatabase('feeds.json', 'feeds.log')
tmp = {}
tmp['feeds'] = {}
tmp['all_posts_sorted'] = {}
for x, feed in enumerate(feeds):
parsed = feedparser.parse(feed)
# print(parsed)
x = str(x)
tmp['feeds'][x] = {}
tmp['feeds'][x]['title'] = parsed.feed.title
tmp['feeds'][x]['link'] = parsed.feed.link
tmp['feeds'][x]['rss'] = parsed.entries[0].title_detail.base
tmp['feeds'][x]['description'] = parsed.feed.description
for post in parsed.entries:
year = post['published_parsed'][0]
month = post['published_parsed'][1]
day = post['published_parsed'][2]
post_date = datetime(year, month, day, tzinfo=ZoneInfo("Europe/Amsterdam"))
if not str(post_date) in tmp['all_posts_sorted']:
tmp['all_posts_sorted'][str(post_date)] = []
post['feed_details'] = {}
post['feed_details']['title'] = parsed.feed.title
post['feed_details']['link'] = parsed.feed.link
post['feed_details']['rss'] = parsed.entries[0].title_detail.base
post['feed_details']['description'] = parsed.feed.description
tmp['all_posts_sorted'][str(post_date)].append(post)
db.update(tmp)
def load():
db = SimpleDatabase('feeds.json', 'feeds.log')
return db
def latest(num):
""" Collect the <num> latest published posts """
db = load()
dates = [key for key in db['all_posts_sorted'].keys()]
dates.sort(reverse=True)
request = []
for date in dates:
posts = db['all_posts_sorted'][date]
for post in posts:
if len(request) < int(num):
request.append(post)
else:
break
return request
def today():
""" Collect posts from today """
db = load()
today = date.today()
request = []
for date_str, posts in db['all_posts_sorted'].items():
year = int(date_str.split('-')[0])
month = int(date_str.split('-')[1])
day = int(date_str.split('-')[2])
d = date(year, month, day)
# Check if any posts are published today
if d == today:
for post in posts:
request.append(post)
return request
def past(days):
""" Collect posts from a number of past <days> """
db = load()
point_in_the_past = date.today() - timedelta(int(days))
request = []
for date_str, posts in db['all_posts_sorted'].items():
year = int(date_str.split('-')[0])
month = int(date_str.split('-')[1])
day = int(date_str.split('-')[2])
d = date(year, month, day)
if d > point_in_the_past:
for post in posts:
request.append(post)
return request