bots/LogBot/logbot.py

333 lines
14 KiB
Python

import os
import re
import shutil
import urllib.request
from datetime import datetime
from urllib.parse import urlparse
import jinja2
from xbotlib import Bot
# Functions that are used as Jinja filters
def _href_wrap(post):
"""Wrap links in a tags as a Jinja template filter."""
for url in re.findall(r"http\S+", post):
url_with_href = f"<a href='{url}'>{url}</a>"
post = post.replace(url, url_with_href)
return post
# Main Logbot class
class Logbot(Bot):
help = """Oh dear, logbot is here!
(You can speak to the bot using @logbot or logbot:)
<image>: Your image is added to the log.
logbot @help: Print this message
logbot @add <message>: Add a message to the log.
logbot @delete <num>: Delete posts from the log. For example: @logbot @delete 5
logbot @title <string>: Set the title of your log.
logbot @folder <string>: Change the foldername of your log (by default, the roomname of the groupchat is used). Small warning: this changes the URL to your log page.
logbot @style <stylesheet>: Switch to another stylesheet. For example: logbot @style log. Available stylesheets include: timeline (default), float, opentab.
logbot @font <font>: Switch to another font. For example: logbot @font font. Available fonts include: polsku, notcouriersans; or select None to switch back to default serif.
logbot @uptime: To check how long @logbot has been around
@bots: To see who is around :)
""" # noqa
IMAGE_TYPES = (".jpg", "jpeg", "png", ".gif", ".bmp", ".svg", "eps")
AUDIO_TYPES = (".mp3", ".ogg", ".oga", ".mogg", ".wav", ".m4a", ".webm")
FILE_TYPES = ".pdf"
VIDEO_TYPES = (
".mp4",
".webm",
".flv",
".vob",
".avi",
".mov",
".qt",
".mpg",
".mpeg",
".mp4",
".m2v",
".mpe",
".3gp",
)
def _download(self, message):
"""Download media files."""
# define media_type
if message.url.lower().endswith(self.IMAGE_TYPES):
media_type = "images"
elif message.url.lower().endswith(self.FILE_TYPES):
media_type = "pdf"
elif message.url.lower().endswith(self.AUDIO_TYPES):
media_type = "audio"
elif message.url.lower().endswith(self.VIDEO_TYPES):
media_type = "video"
else:
media_type = None
self.log.info(f"Unable to determine media type of { message.url.lower() }")
# download file
data = urllib.request.urlopen(message.url).read()
if data:
self.log.info(f"downloading: { message.url }")
parsed_url = urlparse(message.url)
filename = (
os.path.basename(parsed_url.path).replace(" ", "_").replace("%20", "_")
) # safe url's
self.log.info(f"as the file: { filename }")
folder_name = self.db[message.room]["folder"]
path = os.path.join(self.output, folder_name, media_type)
if not os.path.isdir(path):
os.mkdir(path)
file_path = os.path.join(path, filename)
with open(file_path, "wb") as media_file:
media_file.write(data)
# define media_post
media_path = os.path.join(media_type, filename)
if message.url.lower().endswith(self.IMAGE_TYPES):
media_post = f'<img src="{ media_path }" loading="lazy">'
elif message.url.lower().endswith(self.FILE_TYPES):
media_post = f'<iframe src="{ media_path }" loading="lazy"></iframe>'
elif message.url.lower().endswith(self.AUDIO_TYPES):
media_post = f'<audio controls src="{ media_path }"></audio>'
elif message.url.lower().endswith(self.VIDEO_TYPES):
media_post = f'<video controls src="{ media_path }"></video>'
else:
media_post = None
return media_post, media_type
def _write_log(self, message):
"""Write new log to the file system."""
jinja_env = jinja2.Environment()
jinja_env.filters["href_wrap"] = _href_wrap
template = jinja_env.from_string(open("template.html").read())
folder_name = self.db[message.room]["folder"]
if "@" in folder_name: # hacky
folder_name = self._parse_room_name(folder_name)
log_path = os.path.join(self.output, folder_name, "index.html")
with open(log_path, "w") as out:
html = template.render(
title=self.db[message.room]["title"],
db=self.db[message.room]["messages"],
sorted_keys=[str(num) for num in sorted([int(num) for num in self.db[message.room]["messages"].keys()])]
)
out.write(html)
self.log.info(f"writing to: { log_path }")
def _generate_feed(self, message):
template = jinja2.Template(open("template.rss").read())
folder_name = self.db[message.room]["folder"]
if "@" in folder_name: # hacky
folder_name = self._parse_room_name(folder_name)
feed_path = os.path.join(self.output, folder_name, "feed.rss")
date = datetime.now()
with open(feed_path, "w") as out:
feed = template.render(
log_path=os.path.join(
"https://vvvvvvaria.org/logs/", folder_name, "index.html"
), # hard-coding the URL for now
title=self.db[message.room]["title"],
db=self.db[message.room],
date=date.strftime("%A, %d. %B %Y %I:%M%p"),
)
out.write(feed)
self.log.info(f"writing to: { feed_path }")
def _add_to_db(self, message, media_post=None):
"""Save new entry to database."""
keys = [x for x in self.db[message.room]["messages"].keys()]
keys.sort(key=int)
if not keys:
new_key = "0"
else:
new_key = str(int(keys[-1]) + 1)
if media_post:
self.db[message.room]["messages"][new_key] = media_post
else:
post = message.content.replace("@add ", "")
self.db[message.room]["messages"][new_key] = post
self.db._dumps()
def _parse_room_name(self, room):
"""Parse room name from entire address string."""
return str(re.match(r".*@", room).group()).replace("@", "")
def _setup_room(self, room):
"""Create directories and database entries for a new room."""
room_name = self._parse_room_name(room)
room_path = os.path.join(self.output, room_name)
self.log.info(f"Processing setup logic for: {room_path}")
if room not in self.db:
self.db[room] = {}
if "messages" not in self.db[room]:
self.db[room]["messages"] = {}
if "title" not in self.db[room]:
self.db[room]["title"] = room
if "folder" not in self.db[room]:
self.db[room]["folder"] = self._parse_room_name(room)
if "stylesheet" not in self.db[room]:
self.db[room]["stylesheet"] = "timeline" # default stylesheet
if "font" not in self.db[room]:
self.db[room]["font"] = "none" # default font
self.db._dumps()
self.log.info(f"Added to the database: { room }")
if not os.path.exists(room_path):
os.mkdir(room_path)
stylesheet_path = os.path.join(
"stylesheets", self.db[room]["stylesheet"] + ".css"
)
stylesheet_dest_path = os.path.join(room_path, "stylesheet.css")
shutil.copy(stylesheet_path, stylesheet_dest_path)
self.log.info(f"Created a folder for: { room }")
self.log.info(f"Added stylesheet.css to: { room }")
def setup(self):
"""Setup a log for all the rooms LogBot is subscribed to."""
self.log.info(f"Output folder is set to: { self.output }")
for room in self.rooms:
self._setup_room(room)
for room in self._data["invited"]:
self._setup_room(room)
def group_invite(self, message):
"""Extend xbotlib invite response logic and do required room setup."""
super().group_invite(message)
self._setup_room(str(message["from"]))
def group(self, message):
"""All the things LogBot does when it receives a group message."""
self.log.info("------------------")
self.log.info(f"message: { message.text }")
self.log.info(f"room: { message.room }")
self.log.info(f"sender: { message.sender }")
self.log.info("------------------")
# Response to files: image / PDF / audio / video
if message.url:
media_post, media_type = self._download(message)
if media_post:
self._add_to_db(message, media_post=media_post)
media_type = media_type.replace("images", "image") # linguistic hack!
reply = f"Thanks for that { media_type }!"
else:
reply = "Sorry, can't process that :( (unknown media type?)"
# Response to @add
elif "@add" in message.text:
self._add_to_db(message)
reply = "Added, thanks!"
# Response to @delete
elif "@delete" in message.text:
match = re.findall(r"@delete \d*", message.content)[0]
key = str(match.replace("@delete ", ""))
if key in self.db[message.room]["messages"]:
self.log.info(
f"To be deleted: { self.db[message.room]['messages'][key] }"
)
reply = f"This message is deleted: { self.db[message.room]['messages'][key] }"
del self.db[message.room]["messages"][key]
else:
reply = "This message is already gone!"
# Response to @title
elif "@title" in message.text:
match = re.findall("@title .*", message.content)[0]
title = match.replace("@title ", "")
self.db[message.room]["title"] = title
reply = f"The title of the log is changed to: { title }"
# Response to @folder
# https://git.vvvvvvaria.org/varia/bots/issues/5
elif "@folder" in message.text:
match = re.findall("@folder .*", message.content)[0]
new_folder_name = match.replace("@folder ", "")
current_folder_name = self.db[message.room]["folder"]
if "@" in current_folder_name: # hacky
current_folder_name = self._parse_room_name(current_folder_name)
current_folder_name_path = os.path.join(self.output, current_folder_name)
new_folder_name_path = os.path.join(self.output, new_folder_name)
try:
os.rename(current_folder_name_path, new_folder_name_path)
self.db[message.room]["folder"] = new_folder_name
reply = f"The foldername of the log is changed to: { new_folder_name }. The URL of the log changed into: https://vvvvvvaria.org/logs/{ new_folder_name}"
except:
reply = f"Sorry i couldn't shange that foldername into: '{ new_folder_name }'. Try again with: 'logbot @folder newname'."
# Response to @style
elif "@style" in message.text:
match = re.findall("@style .*", message.content)[0]
stylesheet = match.replace("@style ", "")
stylesheet = stylesheet.lower()
self.db[message.room]["stylesheet"] = stylesheet
room_name = self._parse_room_name(message.room)
room_path = os.path.join(self.output, self.db[message.room]["folder"])
if "@" in room_path: # hacky
room_path = self._parse_room_name(room_path)
stylesheet_path = os.path.join("stylesheets", f"{ stylesheet }.css")
stylesheet_dest_path = os.path.join(room_path, "stylesheet.css")
try:
shutil.copy(stylesheet_path, stylesheet_dest_path)
self.log.info(
f"Stylesheet in room { room_name } switched to: { stylesheet }"
)
reply = f"I'm switching the stylesheet of this log to: { stylesheet }."
except:
reply = f"The stylesheet '{ stylesheet }' is unknown to me. Check logbot @help to see the available stylesheets."
# Response to @font
elif "@font" in message.text:
match = re.findall("@font .*", message.content)[0]
font = match.replace("@font ", "")
font = font.lower()
self.db[message.room]["font"] = font
room_name = self._parse_room_name(message.room)
room_path = os.path.join(self.output, self.db[message.room]["folder"])
if "@" in room_path: # hacky
room_path = self._parse_room_name(room_path)
font_path = os.path.join("fonts", f"{ font }.ttf")
font_dest_path = os.path.join(room_path, "font.ttf")
if font == "none":
os.remove(font_dest_path)
reply = "I removed the font and switched back to default serif."
else:
try:
shutil.copy(font_path, font_dest_path)
self.log.info(f"font in room { room_name } switched to: { font }")
reply = f"I'm switching the font of this log to: { font }."
except:
reply = f"The font '{ font }' is unknown to me. Check @help to see the available fonts."
else:
reply = "Hmm ... not sure what you want to do? Check logbot @help to see the available commands."
# Regenerate the log webpage
self._write_log(message)
# Regenerate the RSS feed
self._generate_feed(message)
# Reply to the groupchat
self.reply(reply, room=message.room)
Logbot()