A place for all our bot adventures.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

285 lines
11 KiB

import os
import re
import shutil
import urllib.request
from urllib.parse import urlparse
from datetime import datetime
import jinja2
from xbotlib import Bot
class Logbot(Bot):
help = """Oh dear, logbot is here!
(You can speak to the bot using @logbot or logbot:)
<image>: Your image is added to the log.
logbot @help: Print this message
logbot @add <message>: Add a message to the log.
logbot @delete <num>: Delete posts from the log. For example: @logbot @delete 5
logbot @title <string>: Set the title of your log.
logbot @style <stylesheet>: Switch to another stylesheet. For example: logbot @style log. Available stylesheets include: timeline, float.
logbot @font <font>: Switch to another font. For example: logbot @font font. Available fonts include: polsku, notcouriersans; or select None to switch back to default serif.
logbot @uptime: To check how long @logbot has been around
@bots: To see who is around :)
""" # noqa
IMAGE_TYPES = (".jpg", "jpeg", "png", ".gif", ".bmp", ".svg", "eps")
AUDIO_TYPES = (".mp3", ".ogg", ".oga", ".mogg", ".wav", ".m4a", ".webm")
FILE_TYPES = ".pdf"
VIDEO_TYPES = (
".mp4",
".webm",
".flv",
".vob",
".avi",
".mov",
".qt",
".mpg",
".mpeg",
".mp4",
".m2v",
".mpe",
".3gp",
)
def _download(self, message):
"""Download media files."""
# define media_type
if message.url.lower().endswith(self.IMAGE_TYPES):
media_type = "images"
elif message.url.lower().endswith(self.FILE_TYPES):
media_type = "pdf"
elif message.url.lower().endswith(self.AUDIO_TYPES):
media_type = "audio"
elif message.url.lower().endswith(self.VIDEO_TYPES):
media_type = "video"
else:
media_type = None
self.log.info(f"Unable to determine media type of { message.url.lower() }")
# download file
data = urllib.request.urlopen(message.url).read()
if data:
self.log.info(f"downloading: { message.url }")
parsed_url = urlparse(message.url)
filename = (
os.path.basename(parsed_url.path).replace(" ", "_").replace("%20", "_")
) # safe url's
self.log.info(f"as the file: { filename }")
roomname = re.sub(r"@.*", "", message.room)
path = os.path.join(self.output, roomname, media_type)
if not os.path.isdir(path):
os.mkdir(path)
file_path = os.path.join(path, filename)
with open(file_path, "wb") as media_file:
media_file.write(data)
# define media_post
media_path = os.path.join(media_type, filename)
if message.url.lower().endswith(self.IMAGE_TYPES):
media_post = f'<img src="{ media_path }">'
elif message.url.lower().endswith(self.FILE_TYPES):
media_post = (
f'<iframe src="{ media_path }" width="800" height="1000"></iframe>'
)
elif message.url.lower().endswith(self.AUDIO_TYPES):
media_post = f'<audio controls src="{ media_path }"></audio>'
elif message.url.lower().endswith(self.VIDEO_TYPES):
media_post = f'<video controls src="{ media_path }"></video>'
else:
media_post = None
return media_post, media_type
def _write_log(self, message):
"""Write new log to the file system."""
template = jinja2.Template(open("template.html").read())
room_name = self._parse_room_name(message.room)
log_path = os.path.join(self.output, room_name, "index.html")
with open(log_path, "w") as out:
html = template.render(
title=self.db[message.room]["title"],
db=self.db[message.room]["messages"]
)
out.write(html)
self.log.info(f"writing to: { log_path }")
def _generate_feed(self, message):
template = jinja2.Template(open("feed.rss").read())
room_name = self._parse_room_name(message.room)
feed_path = os.path.join(self.output, room_name, "feed.rss")
date = datetime.now()
with open(feed_path, "w") as out:
feed = template.render(
log_path=os.path.join("https://vvvvvvaria.org/logs/", room_name, "index.html"), # hard-coding the URL for now
title=self.db[message.room]["title"],
db=self.db[message.room],
date=date.strftime("%A, %d. %B %Y %I:%M%p")
)
out.write(feed)
self.log.info(f"writing to: { feed_path }")
def _add_to_db(self, message, media_post=None):
"""Save new entry to database."""
keys = [x for x in self.db[message.room]["messages"].keys()]
keys.sort(key=int)
if not keys:
new_key = "0"
else:
new_key = str(int(keys[-1]) + 1)
if media_post:
self.db[message.room]["messages"][new_key] = media_post
else:
post = message.content.replace("@add ", "")
self.db[message.room]["messages"][new_key] = post
self.db._dumps()
def _parse_room_name(self, room):
"""Parse room name from entire address string."""
return str(re.match(r".*@", room).group()).replace("@", "")
def _setup_room(self, room):
"""Create directories and database entries for a new room."""
room_name = self._parse_room_name(room)
room_path = os.path.join(self.output, room_name)
self.log.info(f"Processing setup logic for: {room_path}")
if room not in self.db:
self.db[room] = {}
if "messages" not in self.db[room]:
self.db[room]["messages"] = {}
if "title" not in self.db[room]:
self.db[room]["title"] = room
if "stylesheet" not in self.db[room]:
self.db[room]["stylesheet"] = room
if "font" not in self.db[room]:
self.db[room]["font"] = room
self.db._dumps()
self.log.info(f"Added to the database: { room }")
if not os.path.exists(room_path):
os.mkdir(room_path)
stylesheet_path = os.path.join("stylesheets","linear.css") # default stylesheet
stylesheet_dest_path = os.path.join(room_path,"stylesheet.css")
shutil.copy(stylesheet_path, stylesheet_dest_path)
self.log.info(f"Created a folder for: { room }")
self.log.info(f"Added stylesheet.css to: { room }")
def setup(self):
"""Setup a log for all the rooms LogBot is subscribed to."""
self.log.info(f"Output folder is set to: { self.output }")
for room in self.rooms:
self._setup_room(room)
for room in self._data["invited"]:
self._setup_room(room)
def group_invite(self, message):
"""Extend xbotlib invite response logic and do required room setup."""
super().group_invite(message)
self._setup_room(str(message["from"]))
def group(self, message):
"""All the things LogBot does when it receives a group message."""
self.log.info("------------------")
self.log.info(f"message: { message.text }")
self.log.info(f"room: { message.room }")
self.log.info(f"sender: { message.sender }")
self.log.info("------------------")
# Response to files: image / PDF / audio / video
if message.url:
media_post, media_type = self._download(message)
if media_post:
self._add_to_db(message, media_post=media_post)
media_type = media_type.replace("images", "image") # linguistic hack!
reply = f"Thanks for that { media_type }!"
else:
reply = "Sorry, can't process that :( (unknown media type?)"
# Response to @add
elif "@add" in message.text:
self._add_to_db(message)
reply = "Added, thanks!"
# Response to @delete
elif "@delete" in message.text:
match = re.findall(r"@delete \d*", message.content)[0]
key = str(match.replace("@delete ", ""))
if key in self.db[message.room]["messages"]:
4 years ago
self.log.info(
f"To be deleted: { self.db[message.room]['messages'][key] }"
4 years ago
)
reply = f"This message is deleted: { self.db[message.room]['messages'][key] }"
del self.db[message.room]["messages"][key]
else:
reply = "This message is already gone!"
# Response to @title
elif "@title" in message.text:
match = re.findall("@title .*", message.content)[0]
title = match.replace("@title ", "")
self.db[message.room]["title"] = title
reply = f"The title of the log is changed to: { title }"
# Response to @style
elif "@style" in message.text:
match = re.findall("@style .*", message.content)[0]
stylesheet = match.replace("@style ", "")
stylesheet = stylesheet.lower()
self.db[message.room]["stylesheet"] = stylesheet
room_name = self._parse_room_name(message.room)
room_path = os.path.join(self.output, room_name)
stylesheet_path = os.path.join("stylesheets", f"{ stylesheet }.css")
stylesheet_dest_path = os.path.join(room_path, "stylesheet.css")
try:
shutil.copy(stylesheet_path, stylesheet_dest_path)
self.log.info(f"Stylesheet in room { room_name } switched to: { stylesheet }")
reply = f"I'm switching the stylesheet of this log to: { stylesheet }."
except:
reply = f"The stylesheet '{ stylesheet }' is unknown to me. Check @help to see the available stylesheets."
# Response to @font
elif "@font" in message.text:
match = re.findall("@font .*", message.content)[0]
font = match.replace("@font ", "")
font = font.lower()
self.db[message.room]["font"] = font
room_name = self._parse_room_name(message.room)
room_path = os.path.join(self.output, room_name)
font_path = os.path.join("fonts", f"{ font }.ttf")
font_dest_path = os.path.join(room_path, "font.ttf")
if font == 'none':
os.remove(font_dest_path)
reply = "I removed the font and switched back to default serif."
else:
try:
shutil.copy(font_path, font_dest_path)
self.log.info(f"font in room { room_name } switched to: { font }")
reply = f"I'm switching the font of this log to: { font }."
except:
reply = f"The font '{ font }' is unknown to me. Check @help to see the available fonts."
else:
reply = "Hmm ... not sure what you want to do?"
# Regenerate the log webpage
self._write_log(message)
# Regenerate the RSS feed
self._generate_feed(message)
# Reply to the groupchat
self.reply(reply, room=message.room)
Logbot()