Compare commits

...

1 Commits

  1. 2
      Makefile
  2. 9
      feeds.txt
  3. 298
      feedtools.py
  4. 25
      pyproject.toml
  5. 3
      requirements.txt
  6. 115
      simpledatabase.py
  7. 107
      start.py

2
Makefile

@ -1,7 +1,7 @@
default: run
setup:
@python3 -m venv .venv && \
@python3.9 -m venv .venv && \
.venv/bin/pip install -r requirements.txt
run:

9
feeds.txt

@ -1,9 +1,2 @@
https://vvvvvvaria.org/feeds/all-nl.rss.xml
https://vvvvvvaria.org/en/feeds/all-en.rss.xml
https://post.lurk.org/tags/varia.rss
https://post.lurk.org/tags/sometimes.rss
https://a-nourishing-network.radical-openness.org/feeds/feed.rss
https://vvvvvvaria.org/logs/x-y/feed.rss.xml
https://vvvvvvaria.org/logs/atnofs-varia/feed.rss.xml
https://vvvvvvaria.org/logs/pub.club/feed.rss.xml
https://vvvvvvaria.org/logs/hold-and-release/feed.rss.xml
https://post.lurk.org/users/cmos4040.rss

298
feedtools.py

@ -5,164 +5,186 @@ from datetime import date, timedelta
import pypandoc
import re
def update():
""" Update all feeds """
feeds = open('feeds.txt').readlines()
db = SimpleDatabase('feeds.json', 'feeds.log')
tmp = {}
tmp['feeds'] = {}
tmp['all_posts_sorted'] = {}
for x, feed in enumerate(feeds):
parsed = feedparser.parse(feed)
if parsed:
# print(f'\n\n-----------------\nAdding: { parsed.feed.title } ({ parsed.feed.link })')
x = str(x)
tmp['feeds'][x] = {}
if 'title' in parsed.feed:
tmp['feeds'][x]['title'] = parsed.feed.title
else:
tmp['feeds'][x]['title'] = ""
try:
tmp['feeds'][x]['link'] = parsed.feed.link
except:
tmp['feeds'][x]['link'] = ""
try:
tmp['feeds'][x]['rss'] = parsed.entries[0].title_detail.base
except:
tmp['feeds'][x]['rss'] = ""
try:
tmp['feeds'][x]['description'] = parsed.feed.description
except:
tmp['feeds'][x]['description'] = ""
for post in parsed.entries:
print(post)
try:
year = post['published_parsed'][0]
month = post['published_parsed'][1]
day = post['published_parsed'][2]
post_date = date(year, month, day)
except:
d, day, month, year, time = post['published'].split()
if month == "Jan": month = 1
if month == "Feb": month = 2
if month == "Mar": month = 3
if month == "Apr": month = 4
if month == "May": month = 5
if month == "Jun": month = 6
if month == "Jul": month = 7
if month == "Aug": month = 8
if month == "Sep": month = 9
if month == "Oct": month = 10
if month == "Nov": month = 11
if month == "Dec": month = 12
post_date = date(int(year), int(month), int(day))
if not str(post_date) in tmp['all_posts_sorted']:
tmp['all_posts_sorted'][str(post_date)] = []
post['feed_details'] = {}
post['feed_details']['title'] = parsed.feed.title
post['feed_details']['link'] = parsed.feed.link
post['feed_details']['rss'] = parsed.entries[0].title_detail.base
post['feed_details']['description'] = parsed.feed.description
tmp['all_posts_sorted'][str(post_date)].append(post)
db.update(tmp)
"""Update all feeds"""
feeds = open("feeds.txt").readlines()
db = SimpleDatabase("feeds.json", "feeds.log")
tmp = {}
tmp["feeds"] = {}
tmp["all_posts_sorted"] = {}
for x, feed in enumerate(feeds):
parsed = feedparser.parse(feed)
if parsed:
# print(f'\n\n-----------------\nAdding: { parsed.feed.title } ({ parsed.feed.link })')
x = str(x)
tmp["feeds"][x] = {}
if "title" in parsed.feed:
tmp["feeds"][x]["title"] = parsed.feed.title
else:
tmp["feeds"][x]["title"] = ""
try:
tmp["feeds"][x]["link"] = parsed.feed.link
except:
tmp["feeds"][x]["link"] = ""
try:
tmp["feeds"][x]["rss"] = parsed.entries[0].title_detail.base
except:
tmp["feeds"][x]["rss"] = ""
try:
tmp["feeds"][x]["description"] = parsed.feed.description
except:
tmp["feeds"][x]["description"] = ""
for post in parsed.entries:
print(post)
try:
year = post["published_parsed"][0]
month = post["published_parsed"][1]
day = post["published_parsed"][2]
post_date = date(year, month, day)
except:
d, day, month, year, time = post["published"].split()
if month == "Jan":
month = 1
if month == "Feb":
month = 2
if month == "Mar":
month = 3
if month == "Apr":
month = 4
if month == "May":
month = 5
if month == "Jun":
month = 6
if month == "Jul":
month = 7
if month == "Aug":
month = 8
if month == "Sep":
month = 9
if month == "Oct":
month = 10
if month == "Nov":
month = 11
if month == "Dec":
month = 12
post_date = date(int(year), int(month), int(day))
if not str(post_date) in tmp["all_posts_sorted"]:
tmp["all_posts_sorted"][str(post_date)] = []
post["feed_details"] = {}
post["feed_details"]["title"] = parsed.feed.title
post["feed_details"]["link"] = parsed.feed.link
post["feed_details"]["rss"] = parsed.entries[
0
].title_detail.base
post["feed_details"]["description"] = parsed.feed.description
tmp["all_posts_sorted"][str(post_date)].append(post)
db.update(tmp)
def load():
db = SimpleDatabase('feeds.json', 'feeds.log')
return db
db = SimpleDatabase("feeds.json", "feeds.log")
return db
def latest(num):
""" Collect the <num> latest published posts """
db = load()
"""Collect the <num> latest published posts"""
db = load()
dates = [key for key in db['all_posts_sorted'].keys()]
dates.sort(reverse=True)
feed = []
dates = [key for key in db["all_posts_sorted"].keys()]
dates.sort(reverse=True)
feed = []
for date in dates:
posts = db['all_posts_sorted'][date]
for post in posts:
if len(feed) < int(num):
feed.append(post)
else:
break
for date in dates:
posts = db["all_posts_sorted"][date]
for post in posts:
if len(feed) < int(num):
feed.append(post)
else:
break
return feed
return feed
def today():
""" Collect posts from today """
db = load()
today = date.today()
feed = []
"""Collect posts from today"""
db = load()
today = date.today()
feed = []
for date_str, posts in db["all_posts_sorted"].items():
year = int(date_str.split("-")[0])
month = int(date_str.split("-")[1])
day = int(date_str.split("-")[2])
d = date(year, month, day)
for date_str, posts in db['all_posts_sorted'].items():
year = int(date_str.split('-')[0])
month = int(date_str.split('-')[1])
day = int(date_str.split('-')[2])
d = date(year, month, day)
# Check if any posts are published today
if d == today:
for post in posts:
feed.append(post)
# Check if any posts are published today
if d == today:
for post in posts:
feed.append(post)
return feed
return feed
def past(days):
""" Collect posts from a number of past <days> """
db = load()
point_in_the_past = date.today() - timedelta(int(days))
feed = []
for date_str, posts in db['all_posts_sorted'].items():
year = int(date_str.split('-')[0])
month = int(date_str.split('-')[1])
day = int(date_str.split('-')[2])
d = date(year, month, day)
if d > point_in_the_past:
for post in posts:
feed.append(post)
feed.reverse()
return feed
"""Collect posts from a number of past <days>"""
db = load()
point_in_the_past = date.today() - timedelta(int(days))
feed = []
for date_str, posts in db["all_posts_sorted"].items():
year = int(date_str.split("-")[0])
month = int(date_str.split("-")[1])
day = int(date_str.split("-")[2])
d = date(year, month, day)
if d > point_in_the_past:
for post in posts:
feed.append(post)
feed.reverse()
return feed
def md(feed):
md_feed = ''
for post in feed:
post_content = pypandoc.convert_text(
post['summary'], 'md', format='html'
)
if post['links']:
for link in post['links']:
if link['rel'] == 'enclosure':
if 'pdf' in link['type']:
post_content += f"\n<{ link['href'] }>\n"
post_content = re.sub(r'\n.*(-)\1{5,}.*\n', "", post_content) # remove all ------ lines from varia website posts
len_link = len(post['link']) + 4
len_line_dash = len_link * '-'
len_line_space = len_link * ' '
len_date_space = (len_link - len(post['published']) - 2 ) * ' '
md_feed += "------------------------- \n\n"
md_feed += f"# { post['title'] }" + "{.post_title} \n\n"
md_feed += f"""| |{ len_line_space }|
md_feed = ""
for post in feed:
post_content = pypandoc.convert_text(
post["summary"], "md", format="html"
)
if post["links"]:
for link in post["links"]:
if link["rel"] == "enclosure":
if "pdf" in link["type"]:
post_content += f"\n<{ link['href'] }>\n"
post_content = re.sub(
r"\n.*(-)\1{5,}.*\n", "", post_content
) # remove all ------ lines from varia website posts
len_link = len(post["link"]) + 4
len_line_dash = len_link * "-"
len_line_space = len_link * " "
len_date_space = (len_link - len(post["published"]) - 2) * " "
md_feed += "------------------------- \n\n"
md_feed += f"# { post['title'] }" + "{.post_title} \n\n"
md_feed += f"""| |{ len_line_space }|
|-----------:|{ len_line_dash }|
| **posted** | { post['published'] }{ len_date_space } |
| **from** | <{ post['link'] }> |
"""
md_feed += f"{ post_content } \n\n"
md_feed += f"{ post_content } \n\n"
return md_feed
return md_feed

25
pyproject.toml

@ -0,0 +1,25 @@
[tool.black]
line-length = 79
target-version = ['py37', 'py38', 'py39']
include = '\.pyi?$'
exclude = '''
/(
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| _build
| buck-out
| build
| dist
# The following are specific to Black, you probably don't want those.
| blib2to3
| tests/data
| profiling
)/
'''

3
requirements.txt

@ -2,5 +2,4 @@ flask
feedparser
pathlib
Flask-APScheduler
backports.zoneinfo
pypandoc
pypandoc

115
simpledatabase.py

@ -1,61 +1,62 @@
from os import environ, mkdir
from os.path import exists
from pathlib import Path
from logging import DEBUG, INFO, basicConfig, getLogger
from json import dumps, loads
from pathlib import Path
from logging import DEBUG, INFO, basicConfig, getLogger
from json import dumps, loads
class SimpleDatabase(dict):
"""A simple database.
It is a dictionary which saves to disk on all writes. It is optimised for
ease of hacking and accessibility and not for performance or efficiency.
Written by decentral1se, as part of:
https://git.vvvvvvaria.org/decentral1se/xbotlib/src/branch/main/xbotlib.py
"""
def __init__(self, filename, log, *args, **kwargs):
"""Initialise the object."""
self.filename = Path(filename).absolute()
self.log = getLogger(__name__)
self._loads()
self.update(*args, **kwargs)
def _loads(self):
"""Load the database."""
if not exists(self.filename):
return
try:
with open(self.filename, "r") as handle:
self.update(loads(handle.read()))
except Exception as exception:
message = f"Loading file storage failed: {exception}"
self.log.error(message, exc_info=exception)
exit(1)
def _dumps(self):
"""Save the databse to disk."""
try:
with open(self.filename, "w") as handle:
handle.write(dumps(self, indent=4, sort_keys=True))
except Exception as exception:
message = f"Saving file storage failed: {exception}"
self.log.error(message, exc_info=exception)
exit(1)
def __setitem__(self, key, val):
"""Write data to the database."""
super().__setitem__(key, val)
self._dumps()
def __delitem__(self, key):
"""Remove data from the database."""
super().__delitem__(key)
self._dumps()
def update(self, *args, **kwargs):
"""Update the database."""
for k, v in dict(*args, **kwargs).items():
self[k] = v
self._dumps()
"""A simple database.
It is a dictionary which saves to disk on all writes. It is optimised for
ease of hacking and accessibility and not for performance or efficiency.
Written by decentral1se, as part of:
https://git.vvvvvvaria.org/decentral1se/xbotlib/src/branch/main/xbotlib.py
"""
def __init__(self, filename, log, *args, **kwargs):
"""Initialise the object."""
self.filename = Path(filename).absolute()
self.log = getLogger(__name__)
self._loads()
self.update(*args, **kwargs)
def _loads(self):
"""Load the database."""
if not exists(self.filename):
return
try:
with open(self.filename, "r") as handle:
self.update(loads(handle.read()))
except Exception as exception:
message = f"Loading file storage failed: {exception}"
self.log.error(message, exc_info=exception)
exit(1)
def _dumps(self):
"""Save the databse to disk."""
try:
with open(self.filename, "w") as handle:
handle.write(dumps(self, indent=4, sort_keys=True))
except Exception as exception:
message = f"Saving file storage failed: {exception}"
self.log.error(message, exc_info=exception)
exit(1)
def __setitem__(self, key, val):
"""Write data to the database."""
super().__setitem__(key, val)
self._dumps()
def __delitem__(self, key):
"""Remove data from the database."""
super().__delitem__(key)
self._dumps()
def update(self, *args, **kwargs):
"""Update the database."""
for k, v in dict(*args, **kwargs).items():
self[k] = v
self._dumps()

107
start.py

@ -3,10 +3,12 @@ import flask_apscheduler
import feedtools
import json
APP = flask.Flask(__name__,
static_url_path="",
static_folder="static",
template_folder="templates")
APP = flask.Flask(
__name__,
static_url_path="",
static_folder="static",
template_folder="templates",
)
# Initialize Flask-APScheduler
# https://github.com/viniciuschiele/flask-apscheduler
@ -16,68 +18,67 @@ scheduler.api_enabled = False
scheduler.init_app(APP)
scheduler.start()
@scheduler.task('interval', id='update', minutes=10)
@scheduler.task("interval", id="update", minutes=10)
def update():
print('Updating the Multifeeder!')
feedtools.update()
print("Updating the Multifeeder!")
feedtools.update()
@APP.route("/")
def index():
db = feedtools.load()
template = flask.render_template(
"index.html",
db=db,
)
return template
db = feedtools.load()
template = flask.render_template(
"index.html",
db=db,
)
return template
@APP.route("/API/latest/<num>")
def latest(num):
feed = feedtools.latest(num)
if flask.request.values.get("format") == 'md':
response_data = feedtools.md(feed)
mimetype_data = 'text/plain'
else:
response_data = json.dumps(feed)
mimetype_data = 'application/json'
feed = feedtools.latest(num)
if flask.request.values.get("format") == "md":
response_data = feedtools.md(feed)
mimetype_data = "text/plain"
else:
response_data = json.dumps(feed)
mimetype_data = "application/json"
return APP.response_class(
response=response_data, status=200, mimetype=mimetype_data
)
return APP.response_class(
response=response_data,
status=200,
mimetype=mimetype_data
)
@APP.route("/API/today")
def today():
feed = feedtools.today()
if flask.request.values.get("format") == 'md':
response_data = feedtools.md(feed)
mimetype_data = 'text/plain'
else:
response_data = json.dumps(feed)
mimetype_data = 'application/json'
return APP.response_class(
response=response_data,
status=200,
mimetype=mimetype_data
)
feed = feedtools.today()
if flask.request.values.get("format") == "md":
response_data = feedtools.md(feed)
mimetype_data = "text/plain"
else:
response_data = json.dumps(feed)
mimetype_data = "application/json"
return APP.response_class(
response=response_data, status=200, mimetype=mimetype_data
)
@APP.route("/API/past/<days>")
def past(days):
feed = feedtools.past(days)
if flask.request.values.get("format") == 'md':
response_data = feedtools.md(feed)
mimetype_data = 'text/plain'
else:
response_data = json.dumps(feed)
mimetype_data = 'application/json'
return APP.response_class(
response=response_data,
status=200,
mimetype=mimetype_data
)
feed = feedtools.past(days)
if flask.request.values.get("format") == "md":
response_data = feedtools.md(feed)
mimetype_data = "text/plain"
else:
response_data = json.dumps(feed)
mimetype_data = "application/json"
return APP.response_class(
response=response_data, status=200, mimetype=mimetype_data
)
if __name__ == "__main__":
feedtools.update()
APP.debug = True
APP.run(port=5678)
if __name__ == "__main__":
feedtools.update()
APP.debug = True
APP.run(port=5678)

Loading…
Cancel
Save