A place for all our bot adventures.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

374 lines
17 KiB

import os
import re
import shutil
import urllib.request
from datetime import datetime
from urllib.parse import urlparse
from mimetypes import guess_type
import jinja2
from xbotlib import Bot
from PIL import Image
from io import BytesIO
import base64
# Main Logbot class
class Logbot(Bot):
baseurl = "https://vvvvvvaria.org/logs/" # hardcoding the url now, self.baseurl would be helpful to have in the conf
help = """Oh dear, logbot is here!
To interact with logbot, see below:
<image>: Your image is added to the log.
logbot @help: Print this message
logbot @add <message>: Add a message to the log.
logbot @delete <num>: Delete posts from the log. For example: @logbot @delete 5
logbot @title <string>: Set the title of your log.
logbot @folder <string>: Change the foldername of your log (by default, the roomname of the groupchat is used). Small warning: this changes the URL to your log page.
logbot @style <stylesheet>: Switch to another stylesheet. For example: logbot @style log. Available stylesheets include: timeline (default), float, opentab.
logbot @font <font>: Switch to another font. For example: logbot @font font. Available fonts include: polsku, notcouriersans; or select None to switch back to default serif.
logbot @url: Ask logbot to send the url of the log.
logbot @uptime: To check how long @logbot has been around
logbot @caption <num> "message": Adding a caption to a particular file.
@bots: To see who is around :)
""" # noqa
# Functions that are used to process logged materials
# These are marked with a "_" before their function name
def _download(self, message):
"""Download media files."""
media_mime, encoding = guess_type(message.url.lower())
media_type = str(re.match(r".*/", media_mime).group()).replace("/", "")
# download file
data = urllib.request.urlopen(message.url).read()
if data:
self.log.info(f"downloading: { message.url }")
parsed_url = urlparse(message.url)
filename = (
os.path.basename(parsed_url.path).replace(" ", "_").replace("%20", "_")
) # safe url's
self.log.info(f"as the file: { filename }")
folder_name = self.db[message.room]["folder"]
path = os.path.join(self.output, folder_name, media_type)
if not os.path.isdir(path):
os.mkdir(path)
file_path = os.path.join(path, filename)
with open(file_path, "wb") as media_file:
media_file.write(data)
# define relative path to media file as "media_path"
media_path = os.path.join(media_type, filename)
# get the size of the file
media_size = os.path.getsize(os.path.join(self.output, folder_name, media_path))
# if the file is an image, create a thumbnail
if media_type == "image":
try:
size = (450, 450)
im = Image.open(file_path)
im.thumbnail(size)
if (im.mode == 'RGBA'):
bg = Image.new('RGBA', im.size, (255,255,255))
composite = Image.alpha_composite(bg, im)
im=composite.convert('RGB')
output = BytesIO()
im.save(output, format='JPEG')
im_data = output.getvalue()
data_url = base64.b64encode(im_data).decode()
data_url = "data:image/png;base64, " + data_url
except Exception as e:
print('Thumbnailer:', e)
data_url = media_path
else:
data_url=''
return media_type, media_mime, media_path, media_size, data_url, filename
def _href_wrap(self, post):
"""Wrap links in <a> tags."""
for url in re.findall(r"http\S+", post):
url_with_href = f"<a href='{url}'>{url}</a>"
post = post.replace(url, url_with_href)
return post
def _write_log(self, message):
"""Generate a new log webpage."""
folder_name = self.db[message.room]["folder"]
if "@" in folder_name: # hacky
folder_name = self._parse_room_name(folder_name)
log_path = os.path.join(self.output, folder_name, "index.html")
template = jinja2.Template(open("template.html").read()) # it would be useful to use self.template here
with open(log_path, "w") as out:
html = template.render(
title=self.db[message.room]["title"],
# description = self.db[message.room]["description"],
db=self.db[message.room]["messages"],
sorted_numbering=[str(num) for num in sorted([int(num) for num in self.db[message.room]["messages"].keys()])]
)
out.write(html)
self.log.info(f"writing to: { log_path }")
def _generate_feed(self, message):
""" Generate a RSS feed. """
folder_name = self.db[message.room]["folder"]
if "@" in folder_name: # hacky
folder_name = self._parse_room_name(folder_name)
feed_path = os.path.join(self.output, folder_name, "feed.rss.xml")
date = datetime.now().strftime("%a, %d %b %Y %H:%M:%S")
print(date)
template = jinja2.Template(open("template.rss").read()) # self.feedtemplate would be useful to have in the conf
with open(feed_path, "w") as out:
feed = template.render(
log_path=os.path.join(self.baseurl, folder_name, "index.html"),
feed_path=os.path.join(self.baseurl, folder_name, "feed.rss.xml"),
title=self.db[message.room]["title"],
db=self.db[message.room],
date=date,
log_folder_url=os.path.join(self.baseurl, folder_name),
)
out.write(feed)
self.log.info(f"writing to: { feed_path }")
def _add_to_db(self, message, media_type=None, media_path=None, media_size=None, data_url=None, filename=None):
"""Save new entry to database."""
keys = [x for x in self.db[message.room]["messages"].keys()]
keys.sort(key=int)
date = datetime.now().strftime("%a, %d %b %Y %H:%M:%S")
if not keys:
new_key = "0"
else:
new_key = str(int(keys[-1]) + 1)
if media_path:
self.db[message.room]["messages"][new_key] = {}
self.db[message.room]["messages"][new_key]['post'] = ''
self.db[message.room]["messages"][new_key]['date'] = date
self.db[message.room]["messages"][new_key]['media'] = {}
self.db[message.room]["messages"][new_key]['media']['type'] = media_type
self.db[message.room]["messages"][new_key]['media']['path'] = media_path
self.db[message.room]["messages"][new_key]['media']['size'] = media_size
self.db[message.room]["messages"][new_key]['media']['filename'] = filename
self.db[message.room]["messages"][new_key]['media']['data_url'] = data_url
self.db[message.room]["messages"][new_key]['media']['caption'] = ''
else:
post = message.content.replace("@add ", "")
post = self._href_wrap(post)
self.db[message.room]["messages"][new_key] = {}
self.db[message.room]["messages"][new_key]['post'] = post
self.db[message.room]["messages"][new_key]['date'] = date
self.db._dumps()
return new_key
def _parse_room_name(self, room):
"""Parse room name from full MUC address string."""
return str(re.match(r".*@", room).group()).replace("@", "")
def _setup_room(self, room):
"""Create directories and database entries for a new room."""
room_name = self._parse_room_name(room)
if room in self.db:
room_path = self.db[room]["folder"]
else:
room_path = os.path.join(self.output, room_name)
self.log.info(f"Processing setup logic for: { room_path }")
if room not in self.db:
self.db[room] = {}
if "messages" not in self.db[room]:
self.db[room]["messages"] = {}
if "title" not in self.db[room]:
self.db[room]["title"] = room
if "folder" not in self.db[room]:
self.db[room]["folder"] = self._parse_room_name(room)
if "stylesheet" not in self.db[room]:
self.db[room]["stylesheet"] = "timeline" # default stylesheet
if "font" not in self.db[room]:
self.db[room]["font"] = "none" # default font
self.db._dumps()
self.log.info(f"Added to the database: { room }")
if not os.path.exists(room_path):
os.mkdir(room_path)
stylesheet_path = os.path.join("stylesheets", self.db[room]["stylesheet"] + ".css")
stylesheet_dest_path = os.path.join(room_path, "stylesheet.css")
shutil.copy(stylesheet_path, stylesheet_dest_path)
self.log.info(f"Created a folder for: { room }")
self.log.info(f"Added stylesheet.css to: { room }")
def setup(self):
"""Setup a log for all the rooms LogBot is subscribed to."""
self.log.info(f"Output folder is set to: { self.output }")
for room in self.rooms:
self._setup_room(room)
for room in self._data["invited"]:
self._setup_room(room)
def group_invite(self, message):
"""Extend xbotlib invite response logic and do required room setup."""
super().group_invite(message)
self._setup_room(str(message["from"]))
def group(self, message):
"""All the things LogBot does when it receives a group message."""
self.log.info("------------------")
self.log.info(f"message: { message.text }")
self.log.info(f"room: { message.room }")
self.log.info(f"sender: { message.sender }")
self.log.info("------------------")
# Response to files: image / PDF / audio / video
if message.url:
media_type, media_mime, media_path, media_size, data_url, filename = self._download(message)
# TODO: Insert a list of accepted file types here.
if media_path:
num = self._add_to_db(message, media_type=media_mime, media_path=media_path, media_size=media_size, data_url=data_url, filename=filename)
media_type = media_type.replace("images", "image") # linguistic hack!
if 'pdf' in message.url:
media_type = 'PDF' # linguistic hack!
reply = f"Thanks for that { media_type }! If you want to remove it from the log, you can send 'logbot @delete { num }'"
else:
reply = "Sorry, can't process that :( (unknown media type?)"
# Response to @add
elif "@add" in message.text:
self._add_to_db(message)
reply = "Added, thanks!"
# Response to @url
elif "@url" in message.text:
reply = f'{ self.baseurl }{ self.db[message.room]["folder"] }/'
# Response to @delete
elif "@delete" in message.text:
match = re.findall(r"@delete \d*", message.content)[0]
key = str(match.replace("@delete ", ""))
if key in self.db[message.room]["messages"]:
self.log.info(
f"To be deleted: { self.db[message.room]['messages'][key] }"
)
if 'media' in self.db[message.room]['messages'][key]:
reply = f"Deleted: { self.db[message.room]['messages'][key]['media']['path'] }"
else:
reply = f"Deleted: { self.db[message.room]['messages'][key]['post'] }"
del self.db[message.room]["messages"][key]
else:
reply = "This one is already gone!"
# Add a caption and write a response to the prompt @caption
elif "@caption" in message.text:
match = re.findall(r"@caption \d*", message.content)[0]
key = str(match.replace("@caption ", ""))
captiontext = re.findall(r'"(.*?)"', message.text)
self.db[message.room]["messages"][key]['media']['caption'] = captiontext
reply = captiontext
if key in self.db[message.room]["messages"]:
self.log.info(
f"To be captioned: { self.db[message.room]['messages'][key] }"
)
reply = f"Added a caption for file number {key}."
# Response to @title
elif "@title" in message.text:
match = re.findall("@title .*", message.content)[0]
title = match.replace("@title ", "")
self.db[message.room]["title"] = title
reply = f"The title of the log is changed to: { title }"
# # Trying to make a function for the room to have a description
# elif "@description" in message.text:
# match = re.findall("@description .*", message.content)[0]
# description = re.findall(r'"(.*?)"', message.text)
# reply = description
# Response to @folder
# https://git.vvvvvvaria.org/varia/bots/issues/5
elif "@folder" in message.text:
match = re.findall("@folder .*", message.content)[0]
new_folder_name = match.replace("@folder ", "")
current_folder_name = self.db[message.room]["folder"]
if "@" in current_folder_name: # hacky
current_folder_name = self._parse_room_name(current_folder_name)
current_folder_name_path = os.path.join(self.output, current_folder_name)
new_folder_name_path = os.path.join(self.output, new_folder_name)
try:
os.rename(current_folder_name_path, new_folder_name_path)
self.db[message.room]["folder"] = new_folder_name
reply = f"The foldername of the log is changed to: { new_folder_name }. The URL of the log changed into: { self.baseurl }{ new_folder_name}"
except:
reply = f"Sorry i couldn't shange that foldername into: '{ new_folder_name }'. Try again with: 'logbot @folder newname'."
# Response to @style
elif "@style" in message.text:
match = re.findall("@style .*", message.content)[0]
stylesheet = match.replace("@style ", "")
stylesheet = stylesheet.lower()
self.db[message.room]["stylesheet"] = stylesheet
room_name = self._parse_room_name(message.room)
room_path = os.path.join(self.output, self.db[message.room]["folder"])
if "@" in room_path: # hacky
room_path = self._parse_room_name(room_path)
stylesheet_path = os.path.join("stylesheets", f"{ stylesheet }.css")
stylesheet_dest_path = os.path.join(room_path, "stylesheet.css")
try:
shutil.copy(stylesheet_path, stylesheet_dest_path)
self.log.info(
f"Stylesheet in room { room_name } switched to: { stylesheet }"
)
reply = f"I'm switching the stylesheet of this log to: { stylesheet }."
except:
reply = f"The stylesheet '{ stylesheet }' is unknown to me. Check logbot @help to see the available stylesheets."
# Response to @font
elif "@font" in message.text:
match = re.findall("@font .*", message.content)[0]
font = match.replace("@font ", "")
font = font.lower()
self.db[message.room]["font"] = font
room_name = self._parse_room_name(message.room)
room_path = os.path.join(self.output, self.db[message.room]["folder"])
if "@" in room_path: # hacky
room_path = self._parse_room_name(room_path)
font_path = os.path.join("fonts", f"{ font }.ttf")
font_dest_path = os.path.join(room_path, "font.ttf")
if font == "none":
os.remove(font_dest_path)
reply = "I removed the font and switched back to default serif."
else:
try:
shutil.copy(font_path, font_dest_path)
self.log.info(f"font in room { room_name } switched to: { font }")
reply = f"I'm switching the font of this log to: { font }."
except:
reply = f"The font '{ font }' is unknown to me. Check @help to see the available fonts."
else:
reply = "Hmm ... not sure what you want to do? Check logbot @help to see the available commands."
# Regenerate the log webpage
self._write_log(message)
# Regenerate the RSS feed
self._generate_feed(message)
# Reply to the groupchat
self.reply(reply, room=message.room)
Logbot()