forked from varia/bots
366 lines
16 KiB
Python
366 lines
16 KiB
Python
import os
|
|
import re
|
|
import shutil
|
|
import urllib.request
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
from mimetypes import guess_type
|
|
import jinja2
|
|
from xbotlib import Bot
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
import base64
|
|
|
|
# Main Logbot class
|
|
|
|
class Logbot(Bot):
|
|
|
|
baseurl = "https://vvvvvvaria.org/logs/" # hardcoding the url now, self.baseurl would be helpful to have in the conf
|
|
|
|
help = """Oh dear, logbot is here!
|
|
|
|
<image>: Your image is added to the log.
|
|
|
|
logbot @help: Print this message
|
|
|
|
logbot @add <message>: Add a message to the log.
|
|
|
|
logbot @delete <num>: Delete posts from the log. For example: @logbot @delete 5
|
|
|
|
logbot @title <string>: Set the title of your log.
|
|
|
|
logbot @folder <string>: Change the foldername of your log (by default, the roomname of the groupchat is used). Small warning: this changes the URL to your log page.
|
|
|
|
logbot @style <stylesheet>: Switch to another stylesheet. For example: logbot @style log. Available stylesheets include: timeline (default), float, opentab.
|
|
|
|
logbot @font <font>: Switch to another font. For example: logbot @font font. Available fonts include: polsku, notcouriersans; or select None to switch back to default serif.
|
|
|
|
logbot @url: Ask logbot to send the url of the log.
|
|
|
|
logbot @uptime: To check how long @logbot has been around
|
|
|
|
logbot @caption <num> "message": Adding a caption to a particular file.
|
|
|
|
@bots: To see who is around :)
|
|
""" # noqa
|
|
|
|
# Functions that are used to process logged materials
|
|
# These are marked with a "_" before their function name
|
|
|
|
def _download(self, message):
|
|
"""Download media files."""
|
|
media_mime, encoding = guess_type(message.url.lower())
|
|
media_type = str(re.match(r".*/", media_mime).group()).replace("/", "")
|
|
|
|
# download file
|
|
data = urllib.request.urlopen(message.url).read()
|
|
if data:
|
|
self.log.info(f"downloading: { message.url }")
|
|
parsed_url = urlparse(message.url)
|
|
filename = (
|
|
os.path.basename(parsed_url.path).replace(" ", "_").replace("%20", "_")
|
|
) # safe url's
|
|
self.log.info(f"as the file: { filename }")
|
|
folder_name = self.db[message.room]["folder"]
|
|
path = os.path.join(self.output, folder_name, media_type)
|
|
if not os.path.isdir(path):
|
|
os.mkdir(path)
|
|
file_path = os.path.join(path, filename)
|
|
with open(file_path, "wb") as media_file:
|
|
media_file.write(data)
|
|
|
|
# define relative path to media file as "media_path"
|
|
media_path = os.path.join(media_type, filename)
|
|
|
|
# get the size of the file
|
|
media_size = os.path.getsize(os.path.join(self.output, folder_name, media_path))
|
|
|
|
if media_type == "image":
|
|
try:
|
|
size = (450, 450)
|
|
im = Image.open(file_path)
|
|
im.thumbnail(size)
|
|
|
|
if (im.mode == 'RGBA'):
|
|
bg = Image.new('RGBA', im.size, (255,255,255))
|
|
composite = Image.alpha_composite(bg, im)
|
|
im=composite.convert('RGB')
|
|
|
|
output = BytesIO()
|
|
im.save(output, format='JPEG')
|
|
im_data = output.getvalue()
|
|
data_url = base64.b64encode(im_data).decode()
|
|
data_url = "data:image/png;base64, " + data_url
|
|
except Exception as e:
|
|
print('Thumbnailer:', e)
|
|
data_url = media_path
|
|
else:
|
|
data_url=''
|
|
|
|
return media_type, media_mime, media_path, media_size, data_url, filename
|
|
|
|
def _href_wrap(self, post):
|
|
"""Wrap links in <a> tags."""
|
|
for url in re.findall(r"http\S+", post):
|
|
url_with_href = f"<a href='{url}'>{url}</a>"
|
|
post = post.replace(url, url_with_href)
|
|
return post
|
|
|
|
def _write_log(self, message):
|
|
"""Generate a new log webpage."""
|
|
folder_name = self.db[message.room]["folder"]
|
|
if "@" in folder_name: # hacky
|
|
folder_name = self._parse_room_name(folder_name)
|
|
log_path = os.path.join(self.output, folder_name, "index.html")
|
|
template = jinja2.Template(open("template.html").read()) # it would be useful to use self.template here
|
|
with open(log_path, "w") as out:
|
|
html = template.render(
|
|
title=self.db[message.room]["title"],
|
|
db=self.db[message.room]["messages"],
|
|
sorted_numbering=[str(num) for num in sorted([int(num) for num in self.db[message.room]["messages"].keys()])]
|
|
)
|
|
out.write(html)
|
|
self.log.info(f"writing to: { log_path }")
|
|
|
|
def _generate_feed(self, message):
|
|
""" Generate a RSS feed. """
|
|
folder_name = self.db[message.room]["folder"]
|
|
if "@" in folder_name: # hacky
|
|
folder_name = self._parse_room_name(folder_name)
|
|
feed_path = os.path.join(self.output, folder_name, "feed.rss.xml")
|
|
date = datetime.now().strftime("%a, %d %b %Y %H:%M:%S")
|
|
print(date)
|
|
template = jinja2.Template(open("template.rss").read()) # self.feedtemplate would be useful to have in the conf
|
|
with open(feed_path, "w") as out:
|
|
feed = template.render(
|
|
log_path=os.path.join(self.baseurl, folder_name, "index.html"),
|
|
feed_path=os.path.join(self.baseurl, folder_name, "feed.rss.xml"),
|
|
title=self.db[message.room]["title"],
|
|
db=self.db[message.room],
|
|
date=date,
|
|
log_folder_url=os.path.join(self.baseurl, folder_name),
|
|
)
|
|
out.write(feed)
|
|
self.log.info(f"writing to: { feed_path }")
|
|
|
|
def _add_to_db(self, message, media_type=None, media_path=None, media_size=None, data_url=None, filename=None):
|
|
"""Save new entry to database."""
|
|
keys = [x for x in self.db[message.room]["messages"].keys()]
|
|
keys.sort(key=int)
|
|
date = datetime.now().strftime("%a, %d %b %Y %H:%M:%S")
|
|
if not keys:
|
|
new_key = "0"
|
|
else:
|
|
new_key = str(int(keys[-1]) + 1)
|
|
if media_path:
|
|
self.db[message.room]["messages"][new_key] = {}
|
|
self.db[message.room]["messages"][new_key]['post'] = ''
|
|
self.db[message.room]["messages"][new_key]['date'] = date
|
|
self.db[message.room]["messages"][new_key]['media'] = {}
|
|
self.db[message.room]["messages"][new_key]['media']['type'] = media_type
|
|
self.db[message.room]["messages"][new_key]['media']['path'] = media_path
|
|
self.db[message.room]["messages"][new_key]['media']['size'] = media_size
|
|
self.db[message.room]["messages"][new_key]['media']['filename'] = filename
|
|
self.db[message.room]["messages"][new_key]['media']['data_url'] = data_url
|
|
self.db[message.room]["messages"][new_key]['media']['caption'] = ''
|
|
else:
|
|
post = message.content.replace("@add ", "")
|
|
post = self._href_wrap(post)
|
|
self.db[message.room]["messages"][new_key] = {}
|
|
self.db[message.room]["messages"][new_key]['post'] = post
|
|
self.db[message.room]["messages"][new_key]['date'] = date
|
|
self.db._dumps()
|
|
return new_key
|
|
|
|
def _parse_room_name(self, room):
|
|
"""Parse room name from full MUC address string."""
|
|
return str(re.match(r".*@", room).group()).replace("@", "")
|
|
|
|
def _setup_room(self, room):
|
|
"""Create directories and database entries for a new room."""
|
|
room_name = self._parse_room_name(room)
|
|
if room in self.db:
|
|
room_path = self.db[room]["folder"]
|
|
else:
|
|
room_path = os.path.join(self.output, room_name)
|
|
self.log.info(f"Processing setup logic for: { room_path }")
|
|
|
|
if room not in self.db:
|
|
self.db[room] = {}
|
|
if "messages" not in self.db[room]:
|
|
self.db[room]["messages"] = {}
|
|
if "title" not in self.db[room]:
|
|
self.db[room]["title"] = room
|
|
if "folder" not in self.db[room]:
|
|
self.db[room]["folder"] = self._parse_room_name(room)
|
|
if "stylesheet" not in self.db[room]:
|
|
self.db[room]["stylesheet"] = "timeline" # default stylesheet
|
|
if "font" not in self.db[room]:
|
|
self.db[room]["font"] = "none" # default font
|
|
self.db._dumps()
|
|
self.log.info(f"Added to the database: { room }")
|
|
|
|
if not os.path.exists(room_path):
|
|
os.mkdir(room_path)
|
|
stylesheet_path = os.path.join("stylesheets", self.db[room]["stylesheet"] + ".css")
|
|
stylesheet_dest_path = os.path.join(room_path, "stylesheet.css")
|
|
shutil.copy(stylesheet_path, stylesheet_dest_path)
|
|
self.log.info(f"Created a folder for: { room }")
|
|
self.log.info(f"Added stylesheet.css to: { room }")
|
|
|
|
def setup(self):
|
|
"""Setup a log for all the rooms LogBot is subscribed to."""
|
|
self.log.info(f"Output folder is set to: { self.output }")
|
|
for room in self.rooms:
|
|
self._setup_room(room)
|
|
for room in self._data["invited"]:
|
|
self._setup_room(room)
|
|
|
|
def group_invite(self, message):
|
|
"""Extend xbotlib invite response logic and do required room setup."""
|
|
super().group_invite(message)
|
|
self._setup_room(str(message["from"]))
|
|
|
|
def group(self, message):
|
|
"""All the things LogBot does when it receives a group message."""
|
|
self.log.info("------------------")
|
|
self.log.info(f"message: { message.text }")
|
|
self.log.info(f"room: { message.room }")
|
|
self.log.info(f"sender: { message.sender }")
|
|
self.log.info("------------------")
|
|
|
|
# Response to files: image / PDF / audio / video
|
|
if message.url:
|
|
media_type, media_mime, media_path, media_size, data_url, filename = self._download(message)
|
|
# TODO: Insert a list of accepted file types here.
|
|
if media_path:
|
|
num = self._add_to_db(message, media_type=media_mime, media_path=media_path, media_size=media_size, data_url=data_url, filename=filename)
|
|
media_type = media_type.replace("images", "image") # linguistic hack!
|
|
if 'pdf' in message.url:
|
|
media_type = 'PDF' # linguistic hack!
|
|
reply = f"Thanks for that { media_type }! If you want to remove it from the log, you can send 'logbot @delete { num }'"
|
|
else:
|
|
reply = "Sorry, can't process that :( (unknown media type?)"
|
|
|
|
# Response to @add
|
|
elif "@add" in message.text:
|
|
self._add_to_db(message)
|
|
reply = "Added, thanks!"
|
|
|
|
# Response to @url
|
|
elif "@url" in message.text:
|
|
reply = f'{ self.baseurl }{ self.db[message.room]["folder"] }/'
|
|
|
|
# Response to @delete
|
|
elif "@delete" in message.text:
|
|
match = re.findall(r"@delete \d*", message.content)[0]
|
|
key = str(match.replace("@delete ", ""))
|
|
|
|
if key in self.db[message.room]["messages"]:
|
|
self.log.info(
|
|
f"To be deleted: { self.db[message.room]['messages'][key] }"
|
|
)
|
|
if 'media' in self.db[message.room]['messages'][key]:
|
|
reply = f"Deleted: { self.db[message.room]['messages'][key]['media']['path'] }"
|
|
else:
|
|
reply = f"Deleted: { self.db[message.room]['messages'][key]['post'] }"
|
|
del self.db[message.room]["messages"][key]
|
|
else:
|
|
reply = "This one is already gone!"
|
|
|
|
# Response to @caption
|
|
elif "@caption" in message.text:
|
|
match = re.findall(r"@caption \d*", message.content)[0]
|
|
key = str(match.replace("@caption ", ""))
|
|
captiontext = re.findall(r'"(.*?)"', message.text)
|
|
self.db[message.room]["messages"][key]['media']['caption'] = captiontext
|
|
reply = captiontext
|
|
|
|
if key in self.db[message.room]["messages"]:
|
|
self.log.info(
|
|
f"To be captioned: { self.db[message.room]['messages'][key] }"
|
|
)
|
|
reply = f"Added a caption for file number {key}."
|
|
|
|
# Response to @title
|
|
elif "@title" in message.text:
|
|
match = re.findall("@title .*", message.content)[0]
|
|
title = match.replace("@title ", "")
|
|
self.db[message.room]["title"] = title
|
|
reply = f"The title of the log is changed to: { title }"
|
|
|
|
# Response to @folder
|
|
# https://git.vvvvvvaria.org/varia/bots/issues/5
|
|
elif "@folder" in message.text:
|
|
match = re.findall("@folder .*", message.content)[0]
|
|
new_folder_name = match.replace("@folder ", "")
|
|
current_folder_name = self.db[message.room]["folder"]
|
|
if "@" in current_folder_name: # hacky
|
|
current_folder_name = self._parse_room_name(current_folder_name)
|
|
current_folder_name_path = os.path.join(self.output, current_folder_name)
|
|
new_folder_name_path = os.path.join(self.output, new_folder_name)
|
|
try:
|
|
os.rename(current_folder_name_path, new_folder_name_path)
|
|
self.db[message.room]["folder"] = new_folder_name
|
|
reply = f"The foldername of the log is changed to: { new_folder_name }. The URL of the log changed into: { self.baseurl }{ new_folder_name}"
|
|
except:
|
|
reply = f"Sorry i couldn't shange that foldername into: '{ new_folder_name }'. Try again with: 'logbot @folder newname'."
|
|
|
|
# Response to @style
|
|
elif "@style" in message.text:
|
|
match = re.findall("@style .*", message.content)[0]
|
|
stylesheet = match.replace("@style ", "")
|
|
stylesheet = stylesheet.lower()
|
|
self.db[message.room]["stylesheet"] = stylesheet
|
|
room_name = self._parse_room_name(message.room)
|
|
room_path = os.path.join(self.output, self.db[message.room]["folder"])
|
|
if "@" in room_path: # hacky
|
|
room_path = self._parse_room_name(room_path)
|
|
stylesheet_path = os.path.join("stylesheets", f"{ stylesheet }.css")
|
|
stylesheet_dest_path = os.path.join(room_path, "stylesheet.css")
|
|
try:
|
|
shutil.copy(stylesheet_path, stylesheet_dest_path)
|
|
self.log.info(
|
|
f"Stylesheet in room { room_name } switched to: { stylesheet }"
|
|
)
|
|
reply = f"I'm switching the stylesheet of this log to: { stylesheet }."
|
|
except:
|
|
reply = f"The stylesheet '{ stylesheet }' is unknown to me. Check logbot @help to see the available stylesheets."
|
|
|
|
# Response to @font
|
|
elif "@font" in message.text:
|
|
match = re.findall("@font .*", message.content)[0]
|
|
font = match.replace("@font ", "")
|
|
font = font.lower()
|
|
self.db[message.room]["font"] = font
|
|
room_name = self._parse_room_name(message.room)
|
|
room_path = os.path.join(self.output, self.db[message.room]["folder"])
|
|
if "@" in room_path: # hacky
|
|
room_path = self._parse_room_name(room_path)
|
|
font_path = os.path.join("fonts", f"{ font }.ttf")
|
|
font_dest_path = os.path.join(room_path, "font.ttf")
|
|
if font == "none":
|
|
os.remove(font_dest_path)
|
|
reply = "I removed the font and switched back to default serif."
|
|
else:
|
|
try:
|
|
shutil.copy(font_path, font_dest_path)
|
|
self.log.info(f"font in room { room_name } switched to: { font }")
|
|
reply = f"I'm switching the font of this log to: { font }."
|
|
except:
|
|
reply = f"The font '{ font }' is unknown to me. Check @help to see the available fonts."
|
|
|
|
else:
|
|
reply = "Hmm ... not sure what you want to do? Check logbot @help to see the available commands."
|
|
|
|
# Regenerate the log webpage
|
|
self._write_log(message)
|
|
|
|
# Regenerate the RSS feed
|
|
self._generate_feed(message)
|
|
|
|
# Reply to the groupchat
|
|
self.reply(reply, room=message.room)
|
|
|
|
Logbot()
|