import os import re import shutil import urllib.request from urllib.parse import urlparse import jinja2 from xbotlib import Bot IMAGE_TYPES = (".jpg", "jpeg", "png", ".gif", ".bmp", ".svg", "eps") AUDIO_TYPES = (".mp3", ".ogg", ".oga", ".mogg", ".wav", ".m4a", ".webm") FILE_TYPES = ".pdf" VIDEO_TYPES = ( ".mp4", ".webm", ".flv", ".vob", ".avi", ".mov", ".qt", ".mpg", ".mpeg", ".mp4", ".m2v", ".mpe", ".3gp", ) def add_to_db(self, message, media_post=None): keys = [x for x in self.db[message.room]["messages"].keys()] keys.sort(key=int) if not keys: new_key = "0" else: new_key = str(int(keys[-1]) + 1) if media_post: self.db[message.room]["messages"][new_key] = media_post else: self.db[message.room]["messages"][new_key] = message.content.replace("@add", "") def del_from_db(self, message, key): del self.db[message.room]["messages"][key] def write_log(self, message, logger): template = jinja2.Template(open("template.html").read()) roomname = re.sub(r"@.*", "", message.room) log_path = os.path.join(self.output, roomname, "index.html") with open(log_path, "w") as out: html = template.render( title=self.db[message.room]["title"], db=self.db[message.room]["messages"] ) out.write(html) logger.info("writing to: ", log_path) def download(self, message, logger): # define media_type if message.url.lower().endswith(IMAGE_TYPES): media_type = "images" elif message.url.lower().endswith(FILE_TYPES): media_type = "pdf" elif message.url.lower().endswith(AUDIO_TYPES): media_type = "audio" elif message.url.lower().endswith(VIDEO_TYPES): media_type = "video" else: media_type = None logger.info(f"Unable to determine media type of {message.url.lower()}") # download file data = urllib.request.urlopen(message.url).read() if data: logger.info("downloading: ", message.url) parsed_url = urlparse(message.url) filename = ( os.path.basename(parsed_url.path).replace(" ", "_").replace("%20", "_") ) # safe url's self.log.info("as the file: ", filename) roomname = re.sub(r"@.*", "", message.room) path = os.path.join(self.output, roomname, media_type) if not os.path.isdir(path): os.mkdir(path) file_path = os.path.join(path, filename) with open(file_path, "wb") as media_file: media_file.write(data) # define media_post media_path = os.path.join(media_type, filename) if message.url.lower().endswith(IMAGE_TYPES): media_post = f'' elif message.url.lower().endswith(FILE_TYPES): media_post = f'' elif message.url.lower().endswith(AUDIO_TYPES): media_post = f'' elif message.url.lower().endswith(VIDEO_TYPES): media_post = f'' else: media_post = None return media_post, media_type class Logbot(Bot): help = """Oh dear, logbot is here! (You can speak to the bot using @logbot or logbot:) : Your image is added to the log. logbot @help: Print this message logbot @add : Add a message to the log. logbot @delete : Delete posts from the log. For example: @logbot @delete 5 logbot @title : Set the title of your log. logbot @style : Edit the css of your log. For example: logbot @style body background-color: pink; [future-feature] logbot @uptime: To check how long @logbot has been around @bots: To see who is around :) """ def _parse_room_name(self, room): """Parse room name from entire address string.""" return str(re.match(r".*@", room).group()).replace("@", "") def _setup_room(self, room): """Create directories and database entries for a new room.""" room_name = self._parse_room_name(room) room_path = os.path.join(self.output, room_name) self.log.info(f"Processing setup logic for: {room_path}") if room not in self.db.keys(): self.db[room] = {} self.db[room]["messages"] = {} self.db[room]["title"] = room self.log.info(f"Added to the database: { room }") if not os.path.exists(room_path): os.mkdir(room_path) shutil.copy("stylesheet.css", room_path) self.log.info(f"Created a folder for: { room }") self.log.info(f"Copied stylesheet.css to: { room }") def setup(self): self.log.info(f"Output folder is set to: { self.output }") for room in self.rooms: self._setup_room(room) def group_invite(self, message): """Extend xbotlib invite response logic and do required room setup.""" super().group_invite(message) self._setup_room(message.room) def group(self, message): # to debug in the terminal self.log.info("------------------") self.log.info("message: ", message.text) self.log.info("room: ", message.room) self.log.info("sender: ", message.sender) # image / PDF / audio / video if message.url: media_post, media_type = download(self, message, self.log) if media_post: add_to_db(self, message, media_post=media_post) media_type = media_type.replace("images", "image") # linguistic hack! reply = f"Thanks for that { media_type }!" else: reply = "Sorry, can't process that :( (unknown media type?)" elif "@add" in message.text: add_to_db(self, message) reply = "Added, thanks!" elif "@delete" in message.text: match = re.findall(r"@delete \d*", message.content)[0] key = str(match.replace("@delete ", "")) if key in self.db[message.room]["messages"]: self.log.info("To be deleted:", self.db[message.room]["messages"][key]) reply = f"This message is deleted: { self.db[message.room]['messages'][key] }" del_from_db(self, message, key) else: reply = "This message is already gone!" elif "@title" in message.text: match = re.findall("@title .*", message.content)[0] title = match.replace("@title", "") self.db[message.room]["title"] = title reply = f"The title of the log is changed to: { title }" elif "@style" in message.text: reply = "This is a future-feature ..." else: reply = "Hmm ... not sure what you want to do?" write_log(self, message, self.log) self.reply(reply, room=message.room) Logbot()