An initial stab at a async ready pull
This commit is contained in:
parent
2c38e5a267
commit
142d1d4f5a
@ -7,6 +7,8 @@ from time import sleep
|
||||
from urllib.parse import quote_plus, unquote_plus
|
||||
from urllib.request import HTTPError, urlopen
|
||||
|
||||
import trio
|
||||
|
||||
groupnamepat = re.compile(r"^g\.(\w+)\$")
|
||||
|
||||
|
||||
@ -73,6 +75,27 @@ def getjson(url, max_retry=3, retry_sleep_time=3):
|
||||
return ret
|
||||
|
||||
|
||||
async def agetjson(session, url):
|
||||
"""The asynchronous version of getjson."""
|
||||
RETRY = 20
|
||||
TIMEOUT = 10
|
||||
|
||||
ret = {}
|
||||
ret["_retries"] = 0
|
||||
|
||||
try:
|
||||
response = await session.get(url, timeout=TIMEOUT, retries=RETRY)
|
||||
rurl = response.url
|
||||
ret.update(response.json())
|
||||
ret["_code"] = response.status_code
|
||||
if rurl != url:
|
||||
ret["_url"] = rurl
|
||||
return ret
|
||||
except Exception as e:
|
||||
print('Failed to download {}, saw {}'.format(url, str(e)))
|
||||
return
|
||||
|
||||
|
||||
def loadpadinfo(p):
|
||||
with open(p) as f:
|
||||
info = json.load(f)
|
||||
@ -112,3 +135,8 @@ def unescape(text):
|
||||
|
||||
def istty():
|
||||
return sys.stdout.isatty() and os.environ.get('TERM') != 'dumb'
|
||||
|
||||
|
||||
def chunks(lst, n):
|
||||
for i in range(0, len(lst), n):
|
||||
yield lst[i : i + n]
|
||||
|
@ -1,18 +1,20 @@
|
||||
"""Check for pads that have changed since last sync (according to .meta.json)"""
|
||||
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from argparse import ArgumentParser
|
||||
from datetime import datetime
|
||||
from fnmatch import fnmatch
|
||||
from time import sleep
|
||||
from urllib.parse import quote, urlencode
|
||||
from urllib.request import HTTPError
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
import asks
|
||||
import html5lib
|
||||
import trio
|
||||
from tqdm import tqdm
|
||||
|
||||
from etherpump.commands.common import * # noqa
|
||||
@ -71,6 +73,12 @@ def build_argument_parser(args):
|
||||
type=int,
|
||||
help="skip this many items, default: None",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--connection",
|
||||
default=5,
|
||||
type=int,
|
||||
help="number of connections to run concurrently",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--meta",
|
||||
default=False,
|
||||
@ -163,25 +171,27 @@ def build_argument_parser(args):
|
||||
return parser
|
||||
|
||||
|
||||
def get_padids(args, info, data):
|
||||
async def get_padids(args, info, data, session):
|
||||
if args.padid:
|
||||
padids = args.padid
|
||||
elif args.glob:
|
||||
padids = getjson(
|
||||
info['localapiurl'] + 'listAllPads?' + urlencode(data)
|
||||
)['data']['padIDs']
|
||||
url = info['localapiurl'] + 'listAllPads?' + urlencode(data)
|
||||
padids = await agetjson(session, url)
|
||||
padids = padids['data']['padIDs']
|
||||
padids = [x for x in padids if fnmatch(x, args.glob)]
|
||||
else:
|
||||
padids = getjson(
|
||||
info['localapiurl'] + 'listAllPads?' + urlencode(data)
|
||||
)['data']['padIDs']
|
||||
url = info['localapiurl'] + 'listAllPads?' + urlencode(data)
|
||||
padids = await agetjson(session, url)
|
||||
padids = padids['data']['padIDs']
|
||||
|
||||
padids.sort()
|
||||
return padids
|
||||
|
||||
|
||||
def handle_pad(args, index, padid, data, info, raw_ext):
|
||||
if args.skip != None and index < args.skip:
|
||||
return
|
||||
async def handle_pad(args, padid, data, info, session):
|
||||
raw_ext = ".raw.txt"
|
||||
if args.no_raw_ext:
|
||||
raw_ext = ""
|
||||
|
||||
data['padID'] = padid
|
||||
p = padpath(padid, args.pub, args.group, args.fix_names)
|
||||
@ -200,9 +210,11 @@ def handle_pad(args, index, padid, data, info, raw_ext):
|
||||
if os.path.exists(metapath):
|
||||
with open(metapath) as f:
|
||||
meta.update(json.load(f))
|
||||
revisions = getjson(
|
||||
url = (
|
||||
info['localapiurl'] + 'getRevisionsCount?' + urlencode(data)
|
||||
)['data']['revisions']
|
||||
)
|
||||
response = await agetjson(session, url)
|
||||
revisions = response['data']['revisions']
|
||||
if meta['revisions'] == revisions and not args.force:
|
||||
skip = True
|
||||
break
|
||||
@ -214,9 +226,11 @@ def handle_pad(args, index, padid, data, info, raw_ext):
|
||||
)
|
||||
|
||||
if revisions is None:
|
||||
meta['revisions'] = getjson(
|
||||
url = (
|
||||
info['localapiurl'] + 'getRevisionsCount?' + urlencode(data)
|
||||
)['data']['revisions']
|
||||
)
|
||||
response = await agetjson(session, url)
|
||||
meta['revisions'] = response['data']['revisions']
|
||||
else:
|
||||
meta['revisions'] = revisions
|
||||
|
||||
@ -227,17 +241,19 @@ def handle_pad(args, index, padid, data, info, raw_ext):
|
||||
# todo: load more metadata!
|
||||
meta['group'], meta['pad'] = splitpadname(padid)
|
||||
meta['pathbase'] = p
|
||||
meta['lastedited_raw'] = int(
|
||||
getjson(
|
||||
info['localapiurl'] + 'getLastEdited?' + urlencode(data)
|
||||
)['data']['lastEdited']
|
||||
)
|
||||
|
||||
url = info['localapiurl'] + 'getLastEdited?' + urlencode(data)
|
||||
response = await agetjson(session, url)
|
||||
meta['lastedited_raw'] = int(response['data']['lastEdited'])
|
||||
|
||||
meta['lastedited_iso'] = datetime.fromtimestamp(
|
||||
int(meta['lastedited_raw']) / 1000
|
||||
).isoformat()
|
||||
meta['author_ids'] = getjson(
|
||||
info['localapiurl'] + 'listAuthorsOfPad?' + urlencode(data)
|
||||
)['data']['authorIDs']
|
||||
|
||||
url = info['localapiurl'] + 'listAuthorsOfPad?' + urlencode(data)
|
||||
response = await agetjson(session, url)
|
||||
meta['author_ids'] = response['data']['authorIDs']
|
||||
|
||||
break
|
||||
except HTTPError as e:
|
||||
tries += 1
|
||||
@ -249,7 +265,7 @@ def handle_pad(args, index, padid, data, info, raw_ext):
|
||||
skip = True
|
||||
break
|
||||
else:
|
||||
sleep(3)
|
||||
await trio.sleep(3)
|
||||
except TypeError as e:
|
||||
print(
|
||||
"Type Error loading pad {0} (phantom pad?), skipping".format(
|
||||
@ -273,7 +289,8 @@ def handle_pad(args, index, padid, data, info, raw_ext):
|
||||
pass
|
||||
|
||||
if args.all or args.text:
|
||||
text = getjson(info['localapiurl'] + 'getText?' + urlencode(data))
|
||||
url = info['localapiurl'] + 'getText?' + urlencode(data)
|
||||
text = await agetjson(session, url)
|
||||
ver = {"type": "text"}
|
||||
versions.append(ver)
|
||||
ver["code"] = text["_code"]
|
||||
@ -368,9 +385,8 @@ def handle_pad(args, index, padid, data, info, raw_ext):
|
||||
|
||||
if args.all or args.dhtml:
|
||||
data['startRev'] = "0"
|
||||
html = getjson(
|
||||
info['localapiurl'] + 'createDiffHTML?' + urlencode(data)
|
||||
)
|
||||
url = info['localapiurl'] + 'createDiffHTML?' + urlencode(data)
|
||||
html = await agetjson(session, url)
|
||||
ver = {"type": "diffhtml"}
|
||||
versions.append(ver)
|
||||
ver["code"] = html["_code"]
|
||||
@ -399,7 +415,8 @@ def handle_pad(args, index, padid, data, info, raw_ext):
|
||||
|
||||
# Process text, html, dhtml, all options
|
||||
if args.all or args.html:
|
||||
html = getjson(info['localapiurl'] + 'getHTML?' + urlencode(data))
|
||||
url = info['localapiurl'] + 'getHTML?' + urlencode(data)
|
||||
html = await agetjson(session, url)
|
||||
ver = {"type": "html"}
|
||||
versions.append(ver)
|
||||
ver["code"] = html["_code"]
|
||||
@ -428,24 +445,33 @@ def handle_pad(args, index, padid, data, info, raw_ext):
|
||||
json.dump(meta, f, indent=2)
|
||||
|
||||
|
||||
def main(args):
|
||||
p = build_argument_parser(args)
|
||||
args = p.parse_args(args)
|
||||
|
||||
raw_ext = ".raw.txt"
|
||||
if args.no_raw_ext:
|
||||
raw_ext = ""
|
||||
|
||||
info = loadpadinfo(args.padinfo)
|
||||
data = {}
|
||||
data['apikey'] = info['apikey']
|
||||
|
||||
padids = get_padids(args, info, data)
|
||||
|
||||
async def handle_pad_chunk(args, padids, data, info, session):
|
||||
progress_kwargs = {}
|
||||
if not istty():
|
||||
progress_kwargs.update(dict(disable=True))
|
||||
progress_pads = tqdm(iterable=padids, total=len(padids), **progress_kwargs)
|
||||
|
||||
for index, padid in enumerate(progress_pads):
|
||||
handle_pad(args, index, padid, data, info, raw_ext)
|
||||
padids = tqdm(iterable=padids, total=len(padids), **progress_kwargs,)
|
||||
for padid in padids:
|
||||
await handle_pad(args, padid, data, info, session)
|
||||
|
||||
|
||||
async def handle_pads(args):
|
||||
session = asks.Session(connections=args.connection)
|
||||
info = loadpadinfo(args.padinfo)
|
||||
data = {'apikey': info['apikey']}
|
||||
|
||||
padids = await get_padids(args, info, data, session)
|
||||
if args.skip:
|
||||
padids = padids[args.skip : len(padids)]
|
||||
CHUNK_SIZE = math.ceil(len(padids) / 3)
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
for padids in chunks(padids, CHUNK_SIZE):
|
||||
_args = (args, padids, data, info, session)
|
||||
nursery.start_soon(handle_pad_chunk, *_args)
|
||||
|
||||
|
||||
def main(args):
|
||||
p = build_argument_parser(args)
|
||||
args = p.parse_args(args)
|
||||
trio.run(handle_pads, args)
|
||||
|
Loading…
Reference in New Issue
Block a user