mb
2 years ago
commit
6aaad14ff0
6 changed files with 570 additions and 0 deletions
@ -0,0 +1,431 @@ |
|||
import os |
|||
import shutil |
|||
import time |
|||
from hashlib import md5 |
|||
from ast import literal_eval as make_tuple |
|||
from pathlib import Path |
|||
from urllib.parse import urlparse |
|||
from re import sub |
|||
|
|||
import arrow |
|||
import feedparser |
|||
import jinja2 |
|||
import requests |
|||
from bs4 import BeautifulSoup |
|||
from slugify import slugify |
|||
from re import compile as re_compile |
|||
yamlre = re_compile('"') |
|||
|
|||
import pprint |
|||
|
|||
db = {} |
|||
|
|||
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") |
|||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) |
|||
|
|||
def write_etag(feed_name, feed_data): |
|||
""" |
|||
save timestamp of when feed was last modified |
|||
""" |
|||
etag = "" |
|||
modified = "" |
|||
|
|||
if "etag" in feed_data: |
|||
etag = feed_data.etag |
|||
if "modified" in feed_data: |
|||
modified = feed_data.modified |
|||
|
|||
if etag or modified: |
|||
with open(os.path.join("etags", feed_name + ".txt"), "w") as f: |
|||
f.write(str((etag, modified))) |
|||
|
|||
|
|||
def get_etag(feed_name): |
|||
""" |
|||
return timestamp of when feed was last modified |
|||
""" |
|||
fn = os.path.join("etags", feed_name + ".txt") |
|||
etag = "" |
|||
modified = "" |
|||
|
|||
if os.path.exists(fn): |
|||
etag, modified = make_tuple(open(fn, "r").read()) |
|||
|
|||
return etag, modified |
|||
|
|||
|
|||
def create_frontmatter(entry): |
|||
""" |
|||
parse RSS metadata and return as frontmatter |
|||
""" |
|||
if 'published' in entry: |
|||
published = entry.published_parsed |
|||
elif 'updated' in entry: |
|||
published = entry.updated_parsed |
|||
|
|||
if not published: |
|||
published = "2023-03-09T16:31:47.294841" |
|||
# !!! placeholder hack for now, to make this whole script work |
|||
|
|||
published = arrow.get(published) |
|||
|
|||
if 'author' in entry: |
|||
author = entry.author |
|||
else: |
|||
author = '' |
|||
|
|||
if 'authors' in entry: |
|||
authors = [] |
|||
for a in entry.authors: |
|||
authors.append(a['name']) |
|||
|
|||
if 'summary' in entry: |
|||
summary = entry.summary |
|||
else: |
|||
summary = '' |
|||
|
|||
if 'publisher' in entry: |
|||
publisher = entry.publisher |
|||
else: |
|||
publisher = '' |
|||
|
|||
tags = [] |
|||
if 'tags' in entry: |
|||
#TODO finish categories |
|||
for t in entry.tags: |
|||
tags.append(t['term']) |
|||
|
|||
if "featured_image" in entry: |
|||
featured_image = entry.featured_image |
|||
else: |
|||
featured_image = '' |
|||
|
|||
card_type = "network" |
|||
if entry.feed_name == "pen.lumbung.space": |
|||
card_type = "pen" |
|||
|
|||
if "opds" in entry: |
|||
frontmatter = { |
|||
'title':entry.title, |
|||
'date': published.format(), |
|||
'summary': summary, |
|||
'author': ",".join(authors), |
|||
'publisher': publisher, |
|||
'original_link': entry.links[0]['href'].replace('opds/cover/','books/'), |
|||
'feed_name': entry['feed_name'], |
|||
'tags': str(tags), |
|||
'category': "books" |
|||
} |
|||
else: |
|||
frontmatter = { |
|||
'title':entry.title, |
|||
'date': published.format(), |
|||
'summary': '', |
|||
'author': author, |
|||
'original_link': entry.link, |
|||
'feed_name': entry['feed_name'], |
|||
'tags': str(tags), |
|||
'card_type': card_type, |
|||
'featured_image': featured_image |
|||
} |
|||
|
|||
return frontmatter |
|||
|
|||
def sanitize_yaml (frontmatter): |
|||
""" |
|||
Escapes any occurences of double quotes |
|||
in any of the frontmatter fields |
|||
See: https://docs.octoprint.org/en/master/configuration/yaml.html#interesting-data-types |
|||
""" |
|||
for k, v in frontmatter.items(): |
|||
if type(v) == type([]): |
|||
#some fields are lists |
|||
l = [] |
|||
for i in v: |
|||
i = yamlre.sub('\\"', i) |
|||
l.append(i) |
|||
frontmatter[k] = l |
|||
|
|||
else: |
|||
v = yamlre.sub('\\"', v) |
|||
frontmatter[k] = v |
|||
|
|||
return frontmatter |
|||
|
|||
def parse_enclosures(post_dir, entry): |
|||
""" |
|||
Parses feed enclosures which are featured media |
|||
Can be featured image but also podcast entries |
|||
https://pythonhosted.org/feedparser/reference-entry-enclosures.html |
|||
""" |
|||
#TODO parse more than images |
|||
#TODO handle the fact it could be multiple items |
|||
|
|||
for e in entry.enclosures: |
|||
if "type" in e: |
|||
print("found enclosed media", e.type) |
|||
if "image/" in e.type: |
|||
featured_image = grab_media(post_dir, e.href) |
|||
entry["featured_image"] = featured_image |
|||
else: |
|||
print("FIXME:ignoring enclosed", e.type) |
|||
return entry |
|||
|
|||
def parse_content(post_dir, entry): |
|||
if "enclosures" in entry: |
|||
entry = parse_enclosures(post_dir, entry) |
|||
|
|||
frontmatter = create_frontmatter(entry) |
|||
print(">>> frontmatter:", frontmatter) |
|||
|
|||
if not os.path.exists(post_dir): |
|||
os.makedirs(post_dir) |
|||
|
|||
if "content" in entry: |
|||
post_content = entry.content[0].value |
|||
else: |
|||
post_content = entry.summary |
|||
|
|||
parsed_content = parse_posts(post_dir, post_content) |
|||
|
|||
return parsed_content, frontmatter |
|||
|
|||
def create_post(post_dir, parsed_content, frontmatter): |
|||
""" |
|||
write hugo post based on RSS entry |
|||
""" |
|||
template_dir = os.path.join(Path(__file__).parent.resolve(), "templates") |
|||
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir)) |
|||
template = env.get_template("post.template.html") |
|||
|
|||
with open(os.path.join(post_dir, "index.html"), "w") as f: # n.b. .html |
|||
post = template.render(frontmatter=sanitize_yaml(frontmatter), content=parsed_content) |
|||
f.write(post) |
|||
print("created post for " + frontmatter["title"] + " (" + frontmatter["original_link"] + ")") |
|||
|
|||
|
|||
def add_to_db(post_dir, parsed_content, frontmatter): |
|||
db[post_dir] = {} |
|||
db[post_dir]["content"] = parsed_content |
|||
db[post_dir]["frontmatter"] = frontmatter |
|||
|
|||
|
|||
def grab_media(post_directory, url, prefered_name=None): |
|||
""" |
|||
download media linked in post to have local copy |
|||
if download succeeds return new local path otherwise return url |
|||
""" |
|||
media_item = urlparse(url).path.split('/')[-1] |
|||
|
|||
headers = { |
|||
'User-Agent': 'https://git.autonomic.zone/ruangrupa/lumbunglib', |
|||
'From': 'info@lumbung.space' # This is another valid field |
|||
} |
|||
if prefered_name: |
|||
media_item = prefered_name |
|||
|
|||
try: |
|||
if not os.path.exists(os.path.join(post_directory, media_item)): |
|||
#TODO: stream is true is a conditional so we could check the headers for things, mimetype etc |
|||
response = requests.get(url, headers=headers, stream=True) |
|||
if response.ok: |
|||
with open(os.path.join(post_directory, media_item), 'wb') as media_file: |
|||
shutil.copyfileobj(response.raw, media_file) |
|||
print('Downloaded media item', media_item) |
|||
return media_item |
|||
else: |
|||
print("Download failed", response.status_code) |
|||
return url |
|||
return media_item |
|||
elif os.path.exists(os.path.join(post_directory, media_item)): |
|||
return media_item |
|||
|
|||
except Exception as e: |
|||
print('Failed to download image', url) |
|||
print(e) |
|||
|
|||
return url |
|||
|
|||
|
|||
def parse_posts(post_dir, post_content): |
|||
""" |
|||
parse the post content to for media items |
|||
replace foreign image with local copy |
|||
filter out iframe sources not in allowlist |
|||
""" |
|||
soup = BeautifulSoup(post_content, "html.parser") |
|||
allowed_iframe_sources = ["youtube.com", "vimeo.com", "tv.lumbung.space"] |
|||
|
|||
for img in soup(["img", "object"]): |
|||
if img.get("src") != None: |
|||
local_image = grab_media(post_dir, img["src"]) |
|||
if img["src"] != local_image: |
|||
img["src"] = local_image |
|||
|
|||
for iframe in soup(["iframe"]): |
|||
if not any(source in iframe["src"] for source in allowed_iframe_sources): |
|||
print("filtered iframe: {}...".format(iframe["src"][:25])) |
|||
iframe.decompose() |
|||
|
|||
return soup.decode() |
|||
|
|||
|
|||
def grab_feed(feed_url): |
|||
""" |
|||
check whether feed has been updated |
|||
download & return it if it has |
|||
""" |
|||
feed_name = urlparse(feed_url).netloc |
|||
|
|||
etag, modified = get_etag(feed_name) |
|||
|
|||
# !!! disabled for now, for testing |
|||
# try: |
|||
# if modified: |
|||
# data = feedparser.parse(feed_url, modified=modified) |
|||
# elif etag: |
|||
# data = feedparser.parse(feed_url, etag=etag) |
|||
# else: |
|||
# data = feedparser.parse(feed_url) |
|||
# except Exception as e: |
|||
# print("Error grabbing feed") |
|||
# print(feed_name) |
|||
# print(e) |
|||
# return False |
|||
|
|||
data = feedparser.parse(feed_url) |
|||
|
|||
if "status" in data: |
|||
print(data.status, feed_url) |
|||
if data.status == 200: |
|||
# 304 means the feed has not been modified since we last checked |
|||
write_etag(feed_name, data) |
|||
return data |
|||
return False |
|||
|
|||
|
|||
def main(output_dir): |
|||
feed_urls = open("feeds.txt", "r").read().splitlines() |
|||
|
|||
start = time.time() |
|||
|
|||
if not os.path.exists("etags"): |
|||
os.mkdir("etags") |
|||
|
|||
if not os.path.exists(output_dir): |
|||
os.makedirs(output_dir) |
|||
|
|||
feed_dict = dict() |
|||
for url in feed_urls: |
|||
feed_name = urlparse(url).netloc |
|||
feed_dict[url] = feed_name |
|||
|
|||
feed_names = feed_dict.values() |
|||
content_dirs = os.listdir(output_dir) |
|||
# for i in content_dirs: |
|||
# if i not in feed_names: |
|||
# shutil.rmtree(os.path.join(output_dir, i)) |
|||
# print("%s not in feeds_list.txt, removing local data" %(i)) |
|||
|
|||
# add iframe to the allowlist of feedparser's sanitizer, |
|||
# this is now handled in parse_post() |
|||
|
|||
# !!! disabled for now |
|||
# feedparser.sanitizer._HTMLSanitizer.acceptable_elements |= {"iframe"} |
|||
|
|||
for feed_url in feed_urls: |
|||
|
|||
print("\n>>>>>>>>>>>>>>>>>>>>>>\n") |
|||
|
|||
feed_name = feed_dict[feed_url] |
|||
|
|||
feed_dir = os.path.join(output_dir, feed_name) |
|||
|
|||
if not os.path.exists(feed_dir): |
|||
os.makedirs(feed_dir) |
|||
|
|||
existing_posts = os.listdir(feed_dir) |
|||
|
|||
data = grab_feed(feed_url) |
|||
|
|||
if data: |
|||
|
|||
opds_feed = False |
|||
for i in data.feed['links']: |
|||
if i['rel'] == 'self': |
|||
if 'opds' in i['type']: |
|||
opds_feed = True |
|||
print("OPDS type feed!") |
|||
|
|||
|
|||
for entry in data.entries: |
|||
# if 'tags' in entry: |
|||
# for tag in entry.tags: |
|||
# for x in ['lumbung.space', 'D15', 'lumbung']: |
|||
# if x in tag['term']: |
|||
# print(entry.title) |
|||
entry["feed_name"] = feed_name |
|||
|
|||
post_name = slugify(entry.title) |
|||
|
|||
# pixelfed returns the whole post text as the post name. max |
|||
# filename length is 255 on many systems. here we're shortening |
|||
# the name and adding a hash to it to avoid a conflict in a |
|||
# situation where 2 posts start with exactly the same text. |
|||
if len(post_name) > 150: |
|||
post_hash = md5(bytes(post_name, "utf-8")) |
|||
post_name = post_name[:150] + "-" + post_hash.hexdigest() |
|||
|
|||
if opds_feed: |
|||
entry['opds'] = True |
|||
#format: Beyond-Debiasing-Report_Online-75535a4886e3 |
|||
post_name = slugify(entry['title'])+'-'+entry['id'].split('-')[-1] |
|||
|
|||
post_dir = os.path.join(output_dir, feed_name, post_name) |
|||
post_dirs.append(post_dir) |
|||
|
|||
if post_name not in existing_posts: |
|||
# if there is a blog entry we dont already have, make it |
|||
parsed_content, frontmatter = parse_content(post_dir, entry) |
|||
create_post(post_dir, parsed_content, frontmatter) |
|||
|
|||
elif post_name in existing_posts: |
|||
# if we already have it, update it |
|||
parsed_content, frontmatter = parse_content(post_dir, entry) |
|||
create_post(post_dir, parsed_content, frontmatter) |
|||
|
|||
# create list of posts which have not been returned by the feed |
|||
existing_posts.remove(post_name) |
|||
|
|||
# add this post to the db |
|||
add_to_db(post_dir, parsed_content, frontmatter) |
|||
|
|||
# !!! disabled for now for testing |
|||
# for post in existing_posts: |
|||
# # remove blog posts no longer returned by the RSS feed |
|||
# print("deleted", post) |
|||
# shutil.rmtree(os.path.join(feed_dir, slugify(post))) |
|||
|
|||
|
|||
print("\n----------------------\n") |
|||
|
|||
end = time.time() |
|||
print(end - start) |
|||
|
|||
if __name__ == "__main__": |
|||
|
|||
post_dirs = [] |
|||
output_dir = "feed-materials" |
|||
|
|||
main(output_dir) |
|||
|
|||
print("\n>>> db:") |
|||
pprint.pprint(db) |
|||
|
|||
template = env.get_template("index.template.html") |
|||
|
|||
output_file = 'index.html' |
|||
with open(output_file,'w') as f: |
|||
index = template.render(db=db) |
|||
f.write(index) |
|||
print('>>> written:', output_file) |
@ -0,0 +1,3 @@ |
|||
https://vvvvvvaria.org/logs/dislog/feed.rss.xml |
|||
https://etherdump.constantvzw.org/recentchanges.rss |
|||
http://darkwiki.stuff2233.club/dislogging/index.rss |
@ -0,0 +1,9 @@ |
|||
Jinja2>=3.0.3<4.0.0 |
|||
Mastodon.py>=1.5.1<2.0.0 |
|||
bs4>=0.0.1<0.0.2 |
|||
feedparser>=6.0.8<7.0.0 |
|||
ics>=0.7<0.8 |
|||
natural>=0.2.0<0.3.0 |
|||
python-slugify>=5.0.2<6.0.0 |
|||
requests>=2.26.0<3.0.0 |
|||
pprintpp==0.4.0 |
@ -0,0 +1,20 @@ |
|||
summary:hover{ |
|||
cursor: pointer; |
|||
} |
|||
iframe{ |
|||
width: calc(100% - 25px); |
|||
height: 500px; |
|||
border: 0; |
|||
background-color: rgba(220,220,220,0.4); |
|||
} |
|||
table{ |
|||
width: 100%; |
|||
} |
|||
table, |
|||
th, |
|||
td { |
|||
border: 1px solid; |
|||
} |
|||
th:hover{ |
|||
cursor: pointer; |
|||
} |
@ -0,0 +1,105 @@ |
|||
<DOCTYPE html> |
|||
<html> |
|||
<head> |
|||
<meta charset="utf-8"> |
|||
<link rel="stylesheet" type="text/css" href="stylesheet.css"> |
|||
</head> |
|||
<body> |
|||
<div id="wrapper"> |
|||
|
|||
<!-- --> |
|||
<h1>circulations (1)</h1> |
|||
{% for post_dir, post in db.items() %} |
|||
<div class="post"> |
|||
<pre>---</pre> |
|||
<strong>{{ post.frontmatter.title }}</strong> |
|||
<div> |
|||
<a href="{{ post_dir }}">aggregated</a> |
|||
<a href="{{ post.frontmatter.original_link }}">source</a> |
|||
</div> |
|||
<small>{{ post.frontmatter.feed_name }}</small><br> |
|||
<small>{{ post.frontmatter.date }}</small><br> |
|||
<small>{{ post.frontmatter.author }}</small> |
|||
<details> |
|||
<summary> |
|||
<small>post</small> |
|||
</summary> |
|||
<iframe src="{{ post_dir }}"></iframe> |
|||
</details> |
|||
</div> |
|||
{% endfor %} |
|||
|
|||
<br> |
|||
<br> |
|||
<hr> |
|||
|
|||
<!-- --> |
|||
<h1>circulations (2)</h1> |
|||
<table id="circulations"> |
|||
<thead> |
|||
<tr> |
|||
<th onclick="sortTable(0)">title</th> |
|||
<th onclick="sortTable(1)">post</th> |
|||
<th onclick="sortTable(2)">feed</th> |
|||
<th onclick="sortTable(3)">date</th> |
|||
<th onclick="sortTable(4)">through</th> |
|||
</tr> |
|||
</thead> |
|||
<tbody> |
|||
{% for post_dir, post in db.items() %} |
|||
<tr> |
|||
<td>{{ post.frontmatter.title }}</td> |
|||
<td> |
|||
<a href="{{ post_dir }}">aggregated</a> |
|||
<a href="{{ post.frontmatter.original_link }}">source</a> |
|||
</td> |
|||
<td>{{ post.frontmatter.feed_name }}</td> |
|||
<td>{{ post.frontmatter.date }}</td> |
|||
<td>{{ post.frontmatter.author }}</td> |
|||
</tr> |
|||
{% endfor %} |
|||
</tbody> |
|||
</table> |
|||
|
|||
|
|||
<script> |
|||
function sortTable(n) { |
|||
var table, rows, switching, i, x, y, shouldSwitch, dir, switchcount = 0; |
|||
table = document.getElementById("circulations"); |
|||
switching = true; |
|||
dir = "asc"; |
|||
while (switching) { |
|||
switching = false; |
|||
rows = table.rows; |
|||
for (i = 1; i < (rows.length - 1); i++) { |
|||
shouldSwitch = false; |
|||
x = rows[i].getElementsByTagName("TD")[n]; |
|||
y = rows[i + 1].getElementsByTagName("TD")[n]; |
|||
if (dir == "asc") { |
|||
if (x.innerHTML.toLowerCase() > y.innerHTML.toLowerCase()) { |
|||
shouldSwitch = true; |
|||
break; |
|||
} |
|||
} else if (dir == "desc") { |
|||
if (x.innerHTML.toLowerCase() < y.innerHTML.toLowerCase()) { |
|||
shouldSwitch = true; |
|||
break; |
|||
} |
|||
} |
|||
} |
|||
if (shouldSwitch) { |
|||
rows[i].parentNode.insertBefore(rows[i + 1], rows[i]); |
|||
switching = true; |
|||
switchcount ++; |
|||
} else { |
|||
if (switchcount == 0 && dir == "asc") { |
|||
dir = "desc"; |
|||
switching = true; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
</script> |
|||
</div> |
|||
</body> |
|||
</html> |
@ -0,0 +1,2 @@ |
|||
<small class="frontmatter">{{ frontmatter }}</small> |
|||
<div class="post">{{ content }}</div> |
Loading…
Reference in new issue