From 3f78d8ccb23d0c7aebd0fb4e87b561be4fe52d46 Mon Sep 17 00:00:00 2001 From: rra Date: Fri, 5 Feb 2021 10:09:04 +0100 Subject: [PATCH] init commit of multiprocess log parser --- logparser_multiprocess.py | 82 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 logparser_multiprocess.py diff --git a/logparser_multiprocess.py b/logparser_multiprocess.py new file mode 100644 index 0000000..705f646 --- /dev/null +++ b/logparser_multiprocess.py @@ -0,0 +1,82 @@ +import apachelogs, gzip, os, sqlite3 +import multiprocessing, time + + +parser = apachelogs.LogParser(apachelogs.COMBINED) + +#entry = parser.parse('207.237.221.51 - - [22/Jan/2021:00:14:58 +0100] "GET /feeds/all.rss.xml HTTP/2.0" 304 0 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:84.0) Gecko/20100101 Firefox/84.0"') + +def create_db(): + db_name = 'traffic_stats2.db' + if not os.path.exists(db_name): + conn = sqlite3.connect(db_name) + c = conn.cursor() + c.execute("""CREATE TABLE traffic + (datetimes timestamp, + bytes integer);""") + conn.commit() + c.close() + +def populate_db(pairlist): + #timeout is so we can avoid database locks + #if multiple processes try to write to it shortly after another + conn = sqlite3.connect('traffic_stats2.db', timeout=30) + c = conn.cursor() + c.execute('''PRAGMA synchronous = OFF''') + c.execute('''PRAGMA journal_mode = OFF''') + sqliteparam = """ INSERT INTO 'traffic' + ('datetimes', 'bytes') VALUES + (?,?);""" + + c.executemany(sqliteparam, pairlist) + conn.commit() + c.close() + + +def parse_log(gzip_file): + pairs = [] + + try: + logfile = gzip.open(gzip_file, 'r') + log = logfile.readlines() + for line in log: + if line: + line = line.decode('ascii').strip().strip('\x00') #hacky way to remove null bytes? https://stackoverflow.com/a/56904541 + try: + entry = parser.parse(str(line)) + pairs.append((entry.request_time, entry.bytes_sent)) + except Exception as e: + # print(e) + # this catches malformed entries but that is less than 0.1% of total set + pass + populate_db(pairs) + print(gzip_file,'\n') + except Exception as e: + print(e,g) + +files_to_process = [] + +for g in os.listdir('/home/r/Programming/ltm-2020-stats/traffic/solar_logs'): + if os.path.isfile(g): + if g.endswith('.gzip'): + if g: + if not g.startswith('2019'): + if not g.startswith('2021'): + files_to_process.append(g) + +files_to_process.sort() + +if __name__ == '__main__': + starttime = time.time() + create_db() + + manager = multiprocessing.Manager() + pool = multiprocessing.Pool() + pool.map(parse_log, files_to_process) + pool.close() + print() + print('saving data') + print('Time taken = {} seconds'.format(time.time() - starttime)) + + +