@ -4,19 +4,10 @@ import shutil
import urllib . request
from datetime import datetime
from urllib . parse import urlparse
from mimetypes import guess_type
import jinja2
from xbotlib import Bot
# Functions that are used as Jinja filters
def _href_wrap ( post ) :
""" Wrap links in a tags as a Jinja template filter. """
for url in re . findall ( r " http \ S+ " , post ) :
url_with_href = f " <a href= ' { url } ' > { url } </a> "
post = post . replace ( url , url_with_href )
return post
# Main Logbot class
class Logbot ( Bot ) :
@ -45,39 +36,13 @@ class Logbot(Bot):
@bots : To see who is around : )
""" # noqa
IMAGE_TYPES = ( " .jpg " , " jpeg " , " png " , " .gif " , " .bmp " , " .svg " , " eps " )
AUDIO_TYPES = ( " .mp3 " , " .ogg " , " .oga " , " .mogg " , " .wav " , " .m4a " , " .webm " )
FILE_TYPES = " .pdf "
VIDEO_TYPES = (
" .mp4 " ,
" .webm " ,
" .flv " ,
" .vob " ,
" .avi " ,
" .mov " ,
" .qt " ,
" .mpg " ,
" .mpeg " ,
" .mp4 " ,
" .m2v " ,
" .mpe " ,
" .3gp " ,
)
# Functions that are used to process logged materials
# These are marked with a "_" before their function name
def _download ( self , message ) :
""" Download media files. """
# define media_type
if message . url . lower ( ) . endswith ( self . IMAGE_TYPES ) :
media_type = " images "
elif message . url . lower ( ) . endswith ( self . FILE_TYPES ) :
media_type = " pdf "
elif message . url . lower ( ) . endswith ( self . AUDIO_TYPES ) :
media_type = " audio "
elif message . url . lower ( ) . endswith ( self . VIDEO_TYPES ) :
media_type = " video "
else :
media_type = None
self . log . info ( f " Unable to determine media type of { message . url . lower ( ) } " )
media_mime , encoding = guess_type ( message . url . lower ( ) )
media_type = str ( re . match ( r " .*/ " , media_mime ) . group ( ) ) . replace ( " / " , " " )
# download file
data = urllib . request . urlopen ( message . url ) . read ( )
@ -96,82 +61,102 @@ class Logbot(Bot):
with open ( file_path , " wb " ) as media_file :
media_file . write ( data )
# define media_post
media_path = os . path . join ( media_type , filename )
if message . url . lower ( ) . endswith ( self . IMAGE_TYPES ) :
media_post = f ' <img src= " { media_path } " loading= " lazy " > '
elif message . url . lower ( ) . endswith ( self . FILE_TYPES ) :
media_post = f ' <iframe src= " { media_path } " loading= " lazy " ></iframe> '
elif message . url . lower ( ) . endswith ( self . AUDIO_TYPES ) :
media_post = f ' <audio controls src= " { media_path } " ></audio> '
elif message . url . lower ( ) . endswith ( self . VIDEO_TYPES ) :
media_post = f ' <video controls src= " { media_path } " ></video> '
else :
media_post = None
# define media_post
media_path = os . path . join ( media_type , filename )
if media_type == " image " :
media_post = f ' <img src= " { media_path } " loading= " lazy " > '
elif media_type == " application " :
media_post = f ' <iframe src= " { media_path } " loading= " lazy " ></iframe> '
elif media_type == " audio " :
media_post = f ' <audio controls src= " { media_path } " ></audio> '
elif media_type == " video " :
media_post = f ' <video controls src= " { media_path } " ></video> '
else :
media_post = None
# get the size of the file
media_size = os . path . getsize ( os . path . join ( self . output , folder_name , media_path ) )
return media_post , media_type
return media_post , media_type , media_mime , media_path , media_size
def _href_wrap ( self , post ) :
""" Wrap links in <a> tags. """
for url in re . findall ( r " http \ S+ " , post ) :
url_with_href = f " <a href= ' { url } ' > { url } </a> "
post = post . replace ( url , url_with_href )
return post
def _write_log ( self , message ) :
""" Write new log to the file system. """
jinja_env = jinja2 . Environment ( )
jinja_env . filters [ " href_wrap " ] = _href_wrap
template = jinja_env . from_string ( open ( " template.html " ) . read ( ) )
""" Generate a new log webpage. """
folder_name = self . db [ message . room ] [ " folder " ]
if " @ " in folder_name : # hacky
folder_name = self . _parse_room_name ( folder_name )
log_path = os . path . join ( self . output , folder_name , " index.html " )
template = jinja2 . Template ( open ( " template.html " ) . read ( ) ) # it would be useful to use self.template here
with open ( log_path , " w " ) as out :
html = template . render (
title = self . db [ message . room ] [ " title " ] ,
db = self . db [ message . room ] [ " messages " ] ,
sorted_keys = [ str ( num ) for num in sorted ( [ int ( num ) for num in self . db [ message . room ] [ " messages " ] . keys ( ) ] ) ]
sorted_numbering = [ str ( num ) for num in sorted ( [ int ( num ) for num in self . db [ message . room ] [ " messages " ] . keys ( ) ] ) ]
)
out . write ( html )
self . log . info ( f " writing to: { log_path } " )
def _generate_feed ( self , message ) :
template = jinja2 . Template ( open ( " template.rss " ) . read ( ) )
""" Generate a RSS feed. """
folder_name = self . db [ message . room ] [ " folder " ]
if " @ " in folder_name : # hacky
if " @ " in folder_name : # hacky
folder_name = self . _parse_room_name ( folder_name )
feed_path = os . path . join ( self . output , folder_name , " feed.rss " )
date = datetime . now ( )
template = jinja2 . Template ( open ( " template.rss " ) . read ( ) ) # self.feedtemplate would be useful to have in the conf
with open ( feed_path , " w " ) as out :
feed = template . render (
log_path = os . path . join (
" https://vvvvvvaria.org/logs/ " , folder_name , " index.html "
) , # hard-coding the URL for now
) , # hardcoding the url now, self.baseurl would be helpful to have in the conf
title = self . db [ message . room ] [ " title " ] ,
db = self . db [ message . room ] ,
date = date . strftime ( " %a , %d % b % Y % H: % M: % S +0100 " )
date = date . strftime ( " %a , %d % b % Y % H: % M: % S +0100 " ) # timezone is hardcoded now
)
out . write ( feed )
self . log . info ( f " writing to: { feed_path } " )
def _add_to_db ( self , message , media_post = None ) :
def _add_to_db ( self , message , media_post = None , media_type = None , media_url = None , media_size = None ) :
""" Save new entry to database. """
keys = [ x for x in self . db [ message . room ] [ " messages " ] . keys ( ) ]
keys . sort ( key = int )
date = datetime . now ( ) . strftime ( " %a , %d % b % Y % H: % M: % S +0100 " ) # timezone is hardcoded now
if not keys :
new_key = " 0 "
else :
new_key = str ( int ( keys [ - 1 ] ) + 1 )
if media_post :
self . db [ message . room ] [ " messages " ] [ new_key ] = media_post
self . db [ message . room ] [ " messages " ] [ new_key ] = { }
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' post ' ] = ' '
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' media ' ] = { }
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' media ' ] [ ' post ' ] = media_post
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' media ' ] [ ' type ' ] = media_type
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' media ' ] [ ' url ' ] = media_url
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' media ' ] [ ' size ' ] = media_size
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' date ' ] = date
else :
post = message . content . replace ( " @add " , " " )
self . db [ message . room ] [ " messages " ] [ new_key ] = post
post = self . _href_wrap ( post )
self . db [ message . room ] [ " messages " ] [ new_key ] = { }
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' post ' ] = post
self . db [ message . room ] [ " messages " ] [ new_key ] [ ' date ' ] = date
self . db . _dumps ( )
def _parse_room_name ( self , room ) :
""" Parse room name from entire address string. """
""" Parse room name from full MUC address string. """
return str ( re . match ( r " .*@ " , room ) . group ( ) ) . replace ( " @ " , " " )
def _setup_room ( self , room ) :
""" Create directories and database entries for a new room. """
room_name = self . _parse_room_name ( room )
room_path = os . path . join ( self . output , room_name )
self . log . info ( f " Processing setup logic for: { room_path } " )
self . log . info ( f " Processing setup logic for: { room_path } " )
if room not in self . db :
self . db [ room ] = { }
@ -221,10 +206,13 @@ class Logbot(Bot):
# Response to files: image / PDF / audio / video
if message . url :
media_post , media_type = self . _download ( message )
media_post , media_type , media_mime , media_path , media_size = self . _download ( message )
# TODO: Insert a list of accepted file types here.
if media_post :
self . _add_to_db ( message , media_post = media_post )
media_type = media_type . replace ( " images " , " image " ) # linguistic hack!
self . _add_to_db ( message , media_post = media_post , media_type = media_mime , media_url = media_path , media_size = media_size )
media_type = media_type . replace ( " images " , " image " ) # linguistic hack!
if ' pdf ' in message . url :
media_type = ' PDF ' # linguistic hack!
reply = f " Thanks for that { media_type } ! "
else :
reply = " Sorry, can ' t process that :( (unknown media type?) "