import os
import re
import shutil
import urllib . request
from datetime import datetime
from urllib . parse import urlparse
from mimetypes import guess_type
import jinja2
from xbotlib import Bot
# Main Logbot class
class Logbot ( Bot ) :
help = """ Oh dear, logbot is here!
( You can speak to the bot using @logbot or logbot : )
< image > : Your image is added to the log .
logbot @help : Print this message
logbot @add < message > : Add a message to the log .
logbot @delete < num > : Delete posts from the log . For example : @logbot @delete 5
logbot @title < string > : Set the title of your log .
logbot @folder < string > : Change the foldername of your log ( by default , the roomname of the groupchat is used ) . Small warning : this changes the URL to your log page .
logbot @style < stylesheet > : Switch to another stylesheet . For example : logbot @style log . Available stylesheets include : timeline ( default ) , float , opentab .
logbot @font < font > : Switch to another font . For example : logbot @font font . Available fonts include : polsku , notcouriersans ; or select None to switch back to default serif .
logbot @uptime : To check how long @logbot has been around
@bots : To see who is around : )
""" # noqa
# Functions that are used to process logged materials
# These are marked with a "_" before their function name
def _download ( self , message ) :
""" Download media files. """
media_mime , encoding = guess_type ( message . url . lower ( ) )
media_type = str ( re . match ( r " .*/ " , media_mime ) . group ( ) ) . replace ( " / " , " " )
# download file
data = urllib . request . urlopen ( message . url ) . read ( )
if data :
self . log . info ( f " downloading: { message . url } " )
parsed_url = urlparse ( message . url )
filename = (
os . path . basename ( parsed_url . path ) . replace ( " " , " _ " ) . replace ( " % 20 " , " _ " )
) # safe url's
self . log . info ( f " as the file: { filename } " )
folder_name = self . db [ message . room ] [ " folder " ]
path = os . path . join ( self . output , folder_name , media_type )
if not os . path . isdir ( path ) :
os . mkdir ( path )
file_path = os . path . join ( path , filename )
with open ( file_path , " wb " ) as media_file :
media_file . write ( data )
# define media_post
media_path = os . path . join ( media_type , filename )
if media_type == " image " :
media_post = f ' <img src= " { media_path } " loading= " lazy " > '
elif media_type == " application " :
media_post = f ' <iframe src= " { media_path } " loading= " lazy " ></iframe> '
elif media_type == " audio " :
media_post = f ' <audio controls src= " { media_path } " ></audio> '
elif media_type == " video " :
media_post = f ' <video controls src= " { media_path } " ></video> '
else :
media_post = None
# get the size of the file
media_size = os . path . getsize ( os . path . join ( self . output , folder_name , media_path ) )
return media_post , media_type , media_mime , media_path , media_size
def _href_wrap ( self , post ) :
""" Wrap links in <a> tags. """
for url in re . findall ( r " http \ S+ " , post ) :
url_with_href = f " <a href= ' { url } ' > { url } </a> "
post = post . replace ( url , url_with_href )
return post
def _write_log ( self , message ) :
""" Generate a new log webpage. """
folder_name = self . db [ message . room ] [ " folder " ]
if " @ " in folder_name : # hacky
folder_name = self . _parse_room_name ( folder_name )
log_path = os . path . join ( self . output , folder_name , " index.html " )
template = jinja2 . Template ( open ( " template.html " ) . read ( ) ) # it would be useful to use self.template here
with open ( log_path , " w " ) as out :
html = template . render (
title = self . db [ message . room ] [ " title " ] ,
db = self . db [ message . room ] [ " messages " ] ,
sorted_numbering = [ str ( num ) for num in sorted ( [ int ( num ) for num in self . db [ message . room ] [ " messages " ] . keys ( ) ] ) ]
)
out . write ( html )
self . log . info ( f " writing to: { log_path } " )
def _generate_feed ( self , message ) :
""" Generate a RSS feed. """
folder_name = self . db [ message . room ] [ " folder " ]
if " @ " in folder_name : # hacky
folder_name = self . _parse_room_name ( folder_name )
feed_path = os . path . join ( self . output , folder_name , " feed.rss " )
date = datetime . now ( )
template = jinja2 . Template ( open ( " template.rss " ) . read ( ) ) # self.feedtemplate would be useful to have in the conf
with open ( feed_path , " w " ) as out :
feed = template . render (
log_path = os . path . join ( " https://vvvvvvaria.org/logs/ " , folder_name , " index.html " ) , # hardcoding the url now, self.baseurl would be helpful to have in the conf
feed_path = os . path . join ( " https://vvvvvvaria.org/logs/ " , folder_name , " feed.rss " ) , # hardcoding the url again
title = self . db [ message . room ] [ " title " ] ,
db = self . db [ message . room ] ,
date = date . strftime ( " %a , %d % b % Y % H: % M: % S +0100 " ) # timezone is hardcoded now
)
out . write ( feed )
self . log . info ( f " writing to: { feed_path } " )
def _add_to_db ( self , message , media_post = None , media_type = None , media_url = None , media_size = None ) :
""" Save new entry to database. """
keys = [ x for x in self . db [ message . room ] [ " messages " ] . keys ( ) ]
keys . sort ( key = int )
date = datetime . now ( ) . strftime ( " %a , %d % b % Y % H: % M: % S +0100 " ) # timezone is hardcoded now
if not keys :
new_key = " 0 "
else :
new_key = str ( int ( keys [ - 1 ] ) + 1 )
if media_post :
self . db [ message . room ] [ " messages " ] [ new_key ] = { }
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' post ' ] = ' '
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' media ' ] = { }
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' media ' ] [ ' post ' ] = media_post
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' media ' ] [ ' type ' ] = media_type
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' media ' ] [ ' url ' ] = os . path . join ( " https://vvvvvvaria.org/logs/ " , media_url )
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' media ' ] [ ' size ' ] = media_size
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' date ' ] = date
else :
post = message . content . replace ( " @add " , " " )
post = self . _href_wrap ( post )
self . db [ message . room ] [ " messages " ] [ new_key ] = { }
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' post ' ] = post
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' date ' ] = date
self . db . _dumps ( )
return new_key
def _parse_room_name ( self , room ) :
""" Parse room name from full MUC address string. """
return str ( re . match ( r " .*@ " , room ) . group ( ) ) . replace ( " @ " , " " )
def _setup_room ( self , room ) :
""" Create directories and database entries for a new room. """
room_name = self . _parse_room_name ( room )
room_path = os . path . join ( self . output , room_name )
self . log . info ( f " Processing setup logic for: { room_path } " )
if room not in self . db :
self . db [ room ] = { }
if " messages " not in self . db [ room ] :
self . db [ room ] [ " messages " ] = { }
if " title " not in self . db [ room ] :
self . db [ room ] [ " title " ] = room
if " folder " not in self . db [ room ] :
self . db [ room ] [ " folder " ] = self . _parse_room_name ( room )
if " stylesheet " not in self . db [ room ] :
self . db [ room ] [ " stylesheet " ] = " timeline " # default stylesheet
if " font " not in self . db [ room ] :
self . db [ room ] [ " font " ] = " none " # default font
self . db . _dumps ( )
self . log . info ( f " Added to the database: { room } " )
if not os . path . exists ( room_path ) :
os . mkdir ( room_path )
stylesheet_path = os . path . join (
" stylesheets " , self . db [ room ] [ " stylesheet " ] + " .css "
)
stylesheet_dest_path = os . path . join ( room_path , " stylesheet.css " )
shutil . copy ( stylesheet_path , stylesheet_dest_path )
self . log . info ( f " Created a folder for: { room } " )
self . log . info ( f " Added stylesheet.css to: { room } " )
def setup ( self ) :
""" Setup a log for all the rooms LogBot is subscribed to. """
self . log . info ( f " Output folder is set to: { self . output } " )
for room in self . rooms :
self . _setup_room ( room )
for room in self . _data [ " invited " ] :
self . _setup_room ( room )
def group_invite ( self , message ) :
""" Extend xbotlib invite response logic and do required room setup. """
super ( ) . group_invite ( message )
self . _setup_room ( str ( message [ " from " ] ) )
def group ( self , message ) :
""" All the things LogBot does when it receives a group message. """
self . log . info ( " ------------------ " )
self . log . info ( f " message: { message . text } " )
self . log . info ( f " room: { message . room } " )
self . log . info ( f " sender: { message . sender } " )
self . log . info ( " ------------------ " )
# Response to files: image / PDF / audio / video
if message . url :
media_post , media_type , media_mime , media_path , media_size = self . _download ( message )
# TODO: Insert a list of accepted file types here.
if media_post :
num = self . _add_to_db ( message , media_post = media_post , media_type = media_mime , media_url = media_path , media_size = media_size )
media_type = media_type . replace ( " images " , " image " ) # linguistic hack!
if ' pdf ' in message . url :
media_type = ' PDF ' # linguistic hack!
reply = f " Thanks for that { media_type } ! If you want to remove it from the log, you can send ' logbot @delete { num } ' "
else :
reply = " Sorry, can ' t process that :( (unknown media type?) "
# Response to @add
elif " @add " in message . text :
self . _add_to_db ( message )
reply = " Added, thanks! "
# Response to @delete
elif " @delete " in message . text :
match = re . findall ( r " @delete \ d* " , message . content ) [ 0 ]
key = str ( match . replace ( " @delete " , " " ) )
if key in self . db [ message . room ] [ " messages " ] :
self . log . info (
f " To be deleted: { self . db [ message . room ] [ ' messages ' ] [ key ] } "
)
if ' media ' in self . db [ message . room ] [ ' messages ' ] [ key ] :
reply = f " Deleted: { self . db [ message . room ] [ ' messages ' ] [ key ] [ ' media ' ] [ ' url ' ] } "
else :
reply = f " Deleted: { self . db [ message . room ] [ ' messages ' ] [ key ] [ ' post ' ] } "
del self . db [ message . room ] [ " messages " ] [ key ]
else :
reply = " This one is already gone! "
# Response to @title
elif " @title " in message . text :
match = re . findall ( " @title .* " , message . content ) [ 0 ]
title = match . replace ( " @title " , " " )
self . db [ message . room ] [ " title " ] = title
reply = f " The title of the log is changed to: { title } "
# Response to @folder
# https://git.vvvvvvaria.org/varia/bots/issues/5
elif " @folder " in message . text :
match = re . findall ( " @folder .* " , message . content ) [ 0 ]
new_folder_name = match . replace ( " @folder " , " " )
current_folder_name = self . db [ message . room ] [ " folder " ]
if " @ " in current_folder_name : # hacky
current_folder_name = self . _parse_room_name ( current_folder_name )
current_folder_name_path = os . path . join ( self . output , current_folder_name )
new_folder_name_path = os . path . join ( self . output , new_folder_name )
try :
os . rename ( current_folder_name_path , new_folder_name_path )
self . db [ message . room ] [ " folder " ] = new_folder_name
reply = f " The foldername of the log is changed to: { new_folder_name } . The URL of the log changed into: https://vvvvvvaria.org/logs/ { new_folder_name } "
except :
reply = f " Sorry i couldn ' t shange that foldername into: ' { new_folder_name } ' . Try again with: ' logbot @folder newname ' . "
# Response to @style
elif " @style " in message . text :
match = re . findall ( " @style .* " , message . content ) [ 0 ]
stylesheet = match . replace ( " @style " , " " )
stylesheet = stylesheet . lower ( )
self . db [ message . room ] [ " stylesheet " ] = stylesheet
room_name = self . _parse_room_name ( message . room )
room_path = os . path . join ( self . output , self . db [ message . room ] [ " folder " ] )
if " @ " in room_path : # hacky
room_path = self . _parse_room_name ( room_path )
stylesheet_path = os . path . join ( " stylesheets " , f " { stylesheet } .css " )
stylesheet_dest_path = os . path . join ( room_path , " stylesheet.css " )
try :
shutil . copy ( stylesheet_path , stylesheet_dest_path )
self . log . info (
f " Stylesheet in room { room_name } switched to: { stylesheet } "
)
reply = f " I ' m switching the stylesheet of this log to: { stylesheet } . "
except :
reply = f " The stylesheet ' { stylesheet } ' is unknown to me. Check logbot @help to see the available stylesheets. "
# Response to @font
elif " @font " in message . text :
match = re . findall ( " @font .* " , message . content ) [ 0 ]
font = match . replace ( " @font " , " " )
font = font . lower ( )
self . db [ message . room ] [ " font " ] = font
room_name = self . _parse_room_name ( message . room )
room_path = os . path . join ( self . output , self . db [ message . room ] [ " folder " ] )
if " @ " in room_path : # hacky
room_path = self . _parse_room_name ( room_path )
font_path = os . path . join ( " fonts " , f " { font } .ttf " )
font_dest_path = os . path . join ( room_path , " font.ttf " )
if font == " none " :
os . remove ( font_dest_path )
reply = " I removed the font and switched back to default serif. "
else :
try :
shutil . copy ( font_path , font_dest_path )
self . log . info ( f " font in room { room_name } switched to: { font } " )
reply = f " I ' m switching the font of this log to: { font } . "
except :
reply = f " The font ' { font } ' is unknown to me. Check @help to see the available fonts. "
else :
reply = " Hmm ... not sure what you want to do? Check logbot @help to see the available commands. "
# Regenerate the log webpage
self . _write_log ( message )
# Regenerate the RSS feed
self . _generate_feed ( message )
# Reply to the groupchat
self . reply ( reply , room = message . room )
Logbot ( )