import os import re import shutil import urllib.request from urllib.parse import urlparse import jinja2 from xbotlib import Bot class Logbot(Bot): help = """Oh dear, logbot is here! (You can speak to the bot using @logbot or logbot:) : Your image is added to the log. logbot @help: Print this message logbot @add : Add a message to the log. logbot @delete : Delete posts from the log. For example: @logbot @delete 5 logbot @title : Set the title of your log. logbot @style : Edit the css of your log. For example: logbot @style body background-color: pink; [future-feature] logbot @uptime: To check how long @logbot has been around @bots: To see who is around :) """ # noqa IMAGE_TYPES = (".jpg", "jpeg", "png", ".gif", ".bmp", ".svg", "eps") AUDIO_TYPES = (".mp3", ".ogg", ".oga", ".mogg", ".wav", ".m4a", ".webm") FILE_TYPES = ".pdf" VIDEO_TYPES = ( ".mp4", ".webm", ".flv", ".vob", ".avi", ".mov", ".qt", ".mpg", ".mpeg", ".mp4", ".m2v", ".mpe", ".3gp", ) def _download(self, message): """Download media files.""" # define media_type if message.url.lower().endswith(self.IMAGE_TYPES): media_type = "images" elif message.url.lower().endswith(self.FILE_TYPES): media_type = "pdf" elif message.url.lower().endswith(self.AUDIO_TYPES): media_type = "audio" elif message.url.lower().endswith(self.VIDEO_TYPES): media_type = "video" else: media_type = None self.log.info(f"Unable to determine media type of { message.url.lower() }") # download file data = urllib.request.urlopen(message.url).read() if data: self.log.info(f"downloading: { message.url }") parsed_url = urlparse(message.url) filename = ( os.path.basename(parsed_url.path).replace(" ", "_").replace("%20", "_") ) # safe url's self.log.info(f"as the file: { filename }") roomname = re.sub(r"@.*", "", message.room) path = os.path.join(self.output, roomname, media_type) if not os.path.isdir(path): os.mkdir(path) file_path = os.path.join(path, filename) with open(file_path, "wb") as media_file: media_file.write(data) # define media_post media_path = os.path.join(media_type, filename) if message.url.lower().endswith(self.IMAGE_TYPES): media_post = f'' elif message.url.lower().endswith(self.FILE_TYPES): media_post = ( f'' ) elif message.url.lower().endswith(self.AUDIO_TYPES): media_post = f'' elif message.url.lower().endswith(self.VIDEO_TYPES): media_post = f'' else: media_post = None return media_post, media_type def _write_log(self, message): """Write new log to the file system.""" template = jinja2.Template(open("template.html").read()) room_name = self._parse_room_name(message.room) log_path = os.path.join(self.output, room_name, "index.html") with open(log_path, "w") as out: html = template.render( title=self.db[message.room]["title"], db=self.db[message.room]["messages"], ) out.write(html) self.log.info(f"writing to: {log_path}") def _add_to_db(self, message, media_post=None): """Save new entry to database.""" keys = [x for x in self.db[message.room]["messages"].keys()] keys.sort(key=int) if not keys: new_key = "0" else: new_key = str(int(keys[-1]) + 1) if media_post: self.db[message.room]["messages"][new_key] = media_post else: replaced = message.content.replace("@add", "") self.db[message.room]["messages"][new_key] = replaced self.db._dumps() def _parse_room_name(self, room): """Parse room name from entire address string.""" return str(re.match(r".*@", room).group()).replace("@", "") def _setup_room(self, room): """Create directories and database entries for a new room.""" room_name = self._parse_room_name(room) room_path = os.path.join(self.output, room_name) self.log.info(f"Processing setup logic for: {room_path}") if room not in self.db: self.db[room] = {} if "messages" not in self.db[room]: self.db[room]["messages"] = {} if "title" not in self.db[room]: self.db[room]["title"] = room self.db._dumps() self.log.info(f"Added to the database: { room }") if not os.path.exists(room_path): os.mkdir(room_path) shutil.copy("stylesheet.css", room_path) self.log.info(f"Created a folder for: { room }") self.log.info(f"Copied stylesheet.css to: { room }") def setup(self): """Setup a log for all the rooms LogBot is subscribed to.""" self.log.info(f"Output folder is set to: { self.output }") for room in self.rooms: self._setup_room(room) for room in self._data["invited"]: self._setup_room(room) def group_invite(self, message): """Extend xbotlib invite response logic and do required room setup.""" super().group_invite(message) self._setup_room(str(message["from"])) def group(self, message): """All the things LogBot does when it receives a group message.""" self.log.info("------------------") self.log.info(f"message: { message.text }") self.log.info(f"room: { message.room }") self.log.info(f"sender: { message.sender }") self.log.info("------------------") # Response to files: image / PDF / audio / video if message.url: media_post, media_type = self._download(message) if media_post: self._add_to_db(message, media_post=media_post) media_type = media_type.replace("images", "image") # linguistic hack! reply = f"Thanks for that { media_type }!" else: reply = "Sorry, can't process that :( (unknown media type?)" # Response to @add elif "@add" in message.text: self._add_to_db(message) reply = "Added, thanks!" # Response to @delete elif "@delete" in message.text: match = re.findall(r"@delete \d*", message.content)[0] key = str(match.replace("@delete ", "")) if key in self.db[message.room]["messages"]: self.log.info( f"To be deleted: { self.db[message.room]['messages'][key] }" ) reply = f"This message is deleted: { self.db[message.room]['messages'][key] }" del self.db[message.room]["messages"][key] else: reply = "This message is already gone!" # Response to @title elif "@title" in message.text: match = re.findall("@title .*", message.content)[0] title = match.replace("@title", "") self.db[message.room]["title"] = title reply = f"The title of the log is changed to: { title }" # Response to @style elif "@style" in message.text: reply = "This is a future-feature ..." else: reply = "Hmm ... not sure what you want to do?" self._write_log(message) self.reply(reply, room=message.room) Logbot()