From 142d1d4f5a7e5ebe850fe38116c476d9c202920e Mon Sep 17 00:00:00 2001 From: Luke Murphy Date: Mon, 20 Jan 2020 16:03:49 +0100 Subject: [PATCH] An initial stab at a async ready pull --- etherpump/commands/common.py | 28 +++++++++ etherpump/commands/pull.py | 118 +++++++++++++++++++++-------------- 2 files changed, 100 insertions(+), 46 deletions(-) diff --git a/etherpump/commands/common.py b/etherpump/commands/common.py index f5b6de9..506900e 100644 --- a/etherpump/commands/common.py +++ b/etherpump/commands/common.py @@ -7,6 +7,8 @@ from time import sleep from urllib.parse import quote_plus, unquote_plus from urllib.request import HTTPError, urlopen +import trio + groupnamepat = re.compile(r"^g\.(\w+)\$") @@ -73,6 +75,27 @@ def getjson(url, max_retry=3, retry_sleep_time=3): return ret +async def agetjson(session, url): + """The asynchronous version of getjson.""" + RETRY = 20 + TIMEOUT = 10 + + ret = {} + ret["_retries"] = 0 + + try: + response = await session.get(url, timeout=TIMEOUT, retries=RETRY) + rurl = response.url + ret.update(response.json()) + ret["_code"] = response.status_code + if rurl != url: + ret["_url"] = rurl + return ret + except Exception as e: + print('Failed to download {}, saw {}'.format(url, str(e))) + return + + def loadpadinfo(p): with open(p) as f: info = json.load(f) @@ -112,3 +135,8 @@ def unescape(text): def istty(): return sys.stdout.isatty() and os.environ.get('TERM') != 'dumb' + + +def chunks(lst, n): + for i in range(0, len(lst), n): + yield lst[i : i + n] diff --git a/etherpump/commands/pull.py b/etherpump/commands/pull.py index 2ba0281..72dcdd6 100644 --- a/etherpump/commands/pull.py +++ b/etherpump/commands/pull.py @@ -1,18 +1,20 @@ """Check for pads that have changed since last sync (according to .meta.json)""" import json +import math import os import re import sys from argparse import ArgumentParser from datetime import datetime from fnmatch import fnmatch -from time import sleep from urllib.parse import quote, urlencode from urllib.request import HTTPError from xml.etree import ElementTree as ET +import asks import html5lib +import trio from tqdm import tqdm from etherpump.commands.common import * # noqa @@ -71,6 +73,12 @@ def build_argument_parser(args): type=int, help="skip this many items, default: None", ) + parser.add_argument( + "--connection", + default=5, + type=int, + help="number of connections to run concurrently", + ) parser.add_argument( "--meta", default=False, @@ -163,25 +171,27 @@ def build_argument_parser(args): return parser -def get_padids(args, info, data): +async def get_padids(args, info, data, session): if args.padid: padids = args.padid elif args.glob: - padids = getjson( - info['localapiurl'] + 'listAllPads?' + urlencode(data) - )['data']['padIDs'] + url = info['localapiurl'] + 'listAllPads?' + urlencode(data) + padids = await agetjson(session, url) + padids = padids['data']['padIDs'] padids = [x for x in padids if fnmatch(x, args.glob)] else: - padids = getjson( - info['localapiurl'] + 'listAllPads?' + urlencode(data) - )['data']['padIDs'] + url = info['localapiurl'] + 'listAllPads?' + urlencode(data) + padids = await agetjson(session, url) + padids = padids['data']['padIDs'] + padids.sort() return padids -def handle_pad(args, index, padid, data, info, raw_ext): - if args.skip != None and index < args.skip: - return +async def handle_pad(args, padid, data, info, session): + raw_ext = ".raw.txt" + if args.no_raw_ext: + raw_ext = "" data['padID'] = padid p = padpath(padid, args.pub, args.group, args.fix_names) @@ -200,9 +210,11 @@ def handle_pad(args, index, padid, data, info, raw_ext): if os.path.exists(metapath): with open(metapath) as f: meta.update(json.load(f)) - revisions = getjson( + url = ( info['localapiurl'] + 'getRevisionsCount?' + urlencode(data) - )['data']['revisions'] + ) + response = await agetjson(session, url) + revisions = response['data']['revisions'] if meta['revisions'] == revisions and not args.force: skip = True break @@ -214,9 +226,11 @@ def handle_pad(args, index, padid, data, info, raw_ext): ) if revisions is None: - meta['revisions'] = getjson( + url = ( info['localapiurl'] + 'getRevisionsCount?' + urlencode(data) - )['data']['revisions'] + ) + response = await agetjson(session, url) + meta['revisions'] = response['data']['revisions'] else: meta['revisions'] = revisions @@ -227,17 +241,19 @@ def handle_pad(args, index, padid, data, info, raw_ext): # todo: load more metadata! meta['group'], meta['pad'] = splitpadname(padid) meta['pathbase'] = p - meta['lastedited_raw'] = int( - getjson( - info['localapiurl'] + 'getLastEdited?' + urlencode(data) - )['data']['lastEdited'] - ) + + url = info['localapiurl'] + 'getLastEdited?' + urlencode(data) + response = await agetjson(session, url) + meta['lastedited_raw'] = int(response['data']['lastEdited']) + meta['lastedited_iso'] = datetime.fromtimestamp( int(meta['lastedited_raw']) / 1000 ).isoformat() - meta['author_ids'] = getjson( - info['localapiurl'] + 'listAuthorsOfPad?' + urlencode(data) - )['data']['authorIDs'] + + url = info['localapiurl'] + 'listAuthorsOfPad?' + urlencode(data) + response = await agetjson(session, url) + meta['author_ids'] = response['data']['authorIDs'] + break except HTTPError as e: tries += 1 @@ -249,7 +265,7 @@ def handle_pad(args, index, padid, data, info, raw_ext): skip = True break else: - sleep(3) + await trio.sleep(3) except TypeError as e: print( "Type Error loading pad {0} (phantom pad?), skipping".format( @@ -273,7 +289,8 @@ def handle_pad(args, index, padid, data, info, raw_ext): pass if args.all or args.text: - text = getjson(info['localapiurl'] + 'getText?' + urlencode(data)) + url = info['localapiurl'] + 'getText?' + urlencode(data) + text = await agetjson(session, url) ver = {"type": "text"} versions.append(ver) ver["code"] = text["_code"] @@ -368,9 +385,8 @@ def handle_pad(args, index, padid, data, info, raw_ext): if args.all or args.dhtml: data['startRev'] = "0" - html = getjson( - info['localapiurl'] + 'createDiffHTML?' + urlencode(data) - ) + url = info['localapiurl'] + 'createDiffHTML?' + urlencode(data) + html = await agetjson(session, url) ver = {"type": "diffhtml"} versions.append(ver) ver["code"] = html["_code"] @@ -399,7 +415,8 @@ def handle_pad(args, index, padid, data, info, raw_ext): # Process text, html, dhtml, all options if args.all or args.html: - html = getjson(info['localapiurl'] + 'getHTML?' + urlencode(data)) + url = info['localapiurl'] + 'getHTML?' + urlencode(data) + html = await agetjson(session, url) ver = {"type": "html"} versions.append(ver) ver["code"] = html["_code"] @@ -428,24 +445,33 @@ def handle_pad(args, index, padid, data, info, raw_ext): json.dump(meta, f, indent=2) -def main(args): - p = build_argument_parser(args) - args = p.parse_args(args) - - raw_ext = ".raw.txt" - if args.no_raw_ext: - raw_ext = "" - - info = loadpadinfo(args.padinfo) - data = {} - data['apikey'] = info['apikey'] - - padids = get_padids(args, info, data) - +async def handle_pad_chunk(args, padids, data, info, session): progress_kwargs = {} if not istty(): progress_kwargs.update(dict(disable=True)) - progress_pads = tqdm(iterable=padids, total=len(padids), **progress_kwargs) - for index, padid in enumerate(progress_pads): - handle_pad(args, index, padid, data, info, raw_ext) + padids = tqdm(iterable=padids, total=len(padids), **progress_kwargs,) + for padid in padids: + await handle_pad(args, padid, data, info, session) + + +async def handle_pads(args): + session = asks.Session(connections=args.connection) + info = loadpadinfo(args.padinfo) + data = {'apikey': info['apikey']} + + padids = await get_padids(args, info, data, session) + if args.skip: + padids = padids[args.skip : len(padids)] + CHUNK_SIZE = math.ceil(len(padids) / 3) + + async with trio.open_nursery() as nursery: + for padids in chunks(padids, CHUNK_SIZE): + _args = (args, padids, data, info, session) + nursery.start_soon(handle_pad_chunk, *_args) + + +def main(args): + p = build_argument_parser(args) + args = p.parse_args(args) + trio.run(handle_pads, args)