Add async support for pull.py #8

Merged
decentral1se merged 6 commits from :asyncpads into master 5 years ago
  1. 8
      README.md
  2. 33
      etherpump/commands/common.py
  3. 562
      etherpump/commands/pull.py
  4. 7
      setup.py

8
README.md

@ -30,6 +30,14 @@ After installing etherpump on the Varia server, we collectively decided to not w
Change log / notes Change log / notes
================== ==================
** January 2020**
Added experimental [Trio](trio.readthedocs.io) support for the `pull` command
which enables pads to be processed concurrently. The default `--connection`
option is set to 20 which may overpower the target server. If in doubt, set
this to a lower number (like 5). This functionality is experimental, be
cautious and please report bugs!
**October 2019** **October 2019**
Improve `etherpump --help` handling to make it easier for new users. Improve `etherpump --help` handling to make it easier for new users.

33
etherpump/commands/common.py

@ -7,6 +7,8 @@ from time import sleep
from urllib.parse import quote_plus, unquote_plus from urllib.parse import quote_plus, unquote_plus
from urllib.request import HTTPError, urlopen from urllib.request import HTTPError, urlopen
import trio
groupnamepat = re.compile(r"^g\.(\w+)\$") groupnamepat = re.compile(r"^g\.(\w+)\$")
@ -73,6 +75,27 @@ def getjson(url, max_retry=3, retry_sleep_time=3):
return ret return ret
async def agetjson(session, url):
"""The asynchronous version of getjson."""
RETRY = 20
TIMEOUT = 10
ret = {}
ret["_retries"] = 0
try:
response = await session.get(url, timeout=TIMEOUT, retries=RETRY)
rurl = response.url
ret.update(response.json())
ret["_code"] = response.status_code
if rurl != url:
ret["_url"] = rurl
return ret
except Exception as e:
print('Failed to download {}, saw {}'.format(url, str(e)))
return
def loadpadinfo(p): def loadpadinfo(p):
with open(p) as f: with open(p) as f:
info = json.load(f) info = json.load(f)
@ -81,7 +104,10 @@ def loadpadinfo(p):
return info return info
# Python developer Fredrik Lundh (author of elementtree, among other things) has such a function on his website, which works with decimal, hex and named entities: # Python developer Fredrik Lundh (author of elementtree, among other things)
# has such a function on his website, which works with decimal, hex and named
# entities:
## ##
# Removes HTML or XML character references and entities from a text string. # Removes HTML or XML character references and entities from a text string.
# #
@ -112,3 +138,8 @@ def unescape(text):
def istty(): def istty():
return sys.stdout.isatty() and os.environ.get('TERM') != 'dumb' return sys.stdout.isatty() and os.environ.get('TERM') != 'dumb'
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i : i + n]

562
etherpump/commands/pull.py

@ -1,18 +1,20 @@
"""Check for pads that have changed since last sync (according to .meta.json)""" """Check for pads that have changed since last sync (according to .meta.json)"""
import json import json
import math
import os import os
import re import re
import sys import sys
from argparse import ArgumentParser from argparse import ArgumentParser
from datetime import datetime from datetime import datetime
from fnmatch import fnmatch from fnmatch import fnmatch
from time import sleep
from urllib.parse import quote, urlencode from urllib.parse import quote, urlencode
from urllib.request import HTTPError from urllib.request import HTTPError
from xml.etree import ElementTree as ET from xml.etree import ElementTree as ET
import asks
import html5lib import html5lib
import trio
from tqdm import tqdm from tqdm import tqdm
from etherpump.commands.common import * # noqa from etherpump.commands.common import * # noqa
@ -36,410 +38,362 @@ def try_deleting(files):
pass pass
def main(args): def build_argument_parser(args):
p = ArgumentParser( parser = ArgumentParser(
"Check for pads that have changed since last sync (according to .meta.json)" "Check for pads that have changed since last sync (according to .meta.json)"
) )
parser.add_argument("padid", nargs="*", default=[])
p.add_argument("padid", nargs="*", default=[]) parser.add_argument(
p.add_argument(
"--glob", default=False, help="download pads matching a glob pattern" "--glob", default=False, help="download pads matching a glob pattern"
) )
parser.add_argument(
p.add_argument(
"--padinfo", "--padinfo",
default=".etherpump/settings.json", default=".etherpump/settings.json",
help="settings, default: .etherpump/settings.json", help="settings, default: .etherpump/settings.json",
) )
p.add_argument( parser.add_argument(
"--zerorevs", "--zerorevs",
default=False, default=False,
action="store_true", action="store_true",
help="include pads with zero revisions, default: False (i.e. pads with no revisions are skipped)", help="include pads with zero revisions, default: False (i.e. pads with no revisions are skipped)",
) )
p.add_argument( parser.add_argument(
"--pub", "--pub",
default="p", default="p",
help="folder to store files for public pads, default: p", help="folder to store files for public pads, default: p",
) )
p.add_argument( parser.add_argument(
"--group", "--group",
default="g", default="g",
help="folder to store files for group pads, default: g", help="folder to store files for group pads, default: g",
) )
p.add_argument( parser.add_argument(
"--skip", "--skip",
default=None, default=None,
type=int, type=int,
help="skip this many items, default: None", help="skip this many items, default: None",
) )
p.add_argument( parser.add_argument(
"--connection",
default=5,
type=int,
help="number of connections to run concurrently",
)
parser.add_argument(
"--meta", "--meta",
default=False, default=False,
action="store_true", action="store_true",
help="download meta to PADID.meta.json, default: False", help="download meta to PADID.meta.json, default: False",
) )
p.add_argument( parser.add_argument(
"--text", "--text",
default=False, default=False,
action="store_true", action="store_true",
help="download text to PADID.txt, default: False", help="download text to PADID.txt, default: False",
) )
p.add_argument( parser.add_argument(
"--html", "--html",
default=False, default=False,
action="store_true", action="store_true",
help="download html to PADID.html, default: False", help="download html to PADID.html, default: False",
) )
p.add_argument( parser.add_argument(
"--dhtml", "--dhtml",
default=False, default=False,
action="store_true", action="store_true",
help="download dhtml to PADID.diff.html, default: False", help="download dhtml to PADID.diff.html, default: False",
) )
p.add_argument( parser.add_argument(
"--all", "--all",
default=False, default=False,
action="store_true", action="store_true",
help="download all files (meta, text, html, dhtml), default: False", help="download all files (meta, text, html, dhtml), default: False",
) )
p.add_argument( parser.add_argument(
"--folder", "--folder",
default=False, default=False,
action="store_true", action="store_true",
help="dump files in a folder named PADID (meta, text, html, dhtml), default: False", help="dump files in a folder named PADID (meta, text, html, dhtml), default: False",
) )
p.add_argument( parser.add_argument(
"--output", "--output",
default=False, default=False,
action="store_true", action="store_true",
help="output changed padids on stdout", help="output changed padids on stdout",
) )
p.add_argument( parser.add_argument(
"--force", "--force",
default=False, default=False,
action="store_true", action="store_true",
help="reload, even if revisions count matches previous", help="reload, even if revisions count matches previous",
) )
p.add_argument( parser.add_argument(
"--no-raw-ext", "--no-raw-ext",
default=False, default=False,
action="store_true", action="store_true",
help="save plain text as padname with no (additional) extension", help="save plain text as padname with no (additional) extension",
) )
p.add_argument( parser.add_argument(
"--fix-names", "--fix-names",
default=False, default=False,
action="store_true", action="store_true",
help="normalize padid's (no spaces, special control chars) for use in file names", help="normalize padid's (no spaces, special control chars) for use in file names",
) )
parser.add_argument(
p.add_argument(
"--filter-ext", default=None, help="filter pads by extension" "--filter-ext", default=None, help="filter pads by extension"
) )
parser.add_argument(
p.add_argument(
"--css", "--css",
default="/styles.css", default="/styles.css",
help="add css url to output pages, default: /styles.css", help="add css url to output pages, default: /styles.css",
) )
p.add_argument( parser.add_argument(
"--script", "--script",
default="/versions.js", default="/versions.js",
help="add script url to output pages, default: /versions.js", help="add script url to output pages, default: /versions.js",
) )
parser.add_argument(
p.add_argument(
"--nopublish", "--nopublish",
default="__NOPUBLISH__", default="__NOPUBLISH__",
help="no publish magic word, default: __NOPUBLISH__", help="no publish magic word, default: __NOPUBLISH__",
) )
p.add_argument( parser.add_argument(
"--publish", "--publish",
default="__PUBLISH__", default="__PUBLISH__",
help="the publish magic word, default: __PUBLISH__", help="the publish magic word, default: __PUBLISH__",
) )
p.add_argument( parser.add_argument(
"--publish-opt-in", "--publish-opt-in",
default=False, default=False,
action="store_true", action="store_true",
help="ensure `--publish` is honoured instead of `--nopublish`", help="ensure `--publish` is honoured instead of `--nopublish`",
) )
return parser
args = p.parse_args(args)
raw_ext = ".raw.txt"
if args.no_raw_ext:
raw_ext = ""
info = loadpadinfo(args.padinfo)
data = {}
data['apikey'] = info['apikey']
async def get_padids(args, info, data, session):
if args.padid: if args.padid:
padids = args.padid padids = args.padid
elif args.glob: elif args.glob:
padids = getjson( url = info['localapiurl'] + 'listAllPads?' + urlencode(data)
info['localapiurl'] + 'listAllPads?' + urlencode(data) padids = await agetjson(session, url)
)['data']['padIDs'] padids = padids['data']['padIDs']
padids = [x for x in padids if fnmatch(x, args.glob)] padids = [x for x in padids if fnmatch(x, args.glob)]
else: else:
padids = getjson( url = info['localapiurl'] + 'listAllPads?' + urlencode(data)
info['localapiurl'] + 'listAllPads?' + urlencode(data) padids = await agetjson(session, url)
)['data']['padIDs'] padids = padids['data']['padIDs']
padids.sort() padids.sort()
numpads = len(padids) return padids
# maxmsglen = 0
count = 0
progress_kwargs = {}
if not istty():
progress_kwargs.update(dict(disable=True))
progress_pads = tqdm(iterable=padids, total=len(padids), **progress_kwargs)
for i, padid in enumerate(progress_pads): async def handle_pad(args, padid, data, info, session):
if args.skip != None and i < args.skip: raw_ext = ".raw.txt"
continue if args.no_raw_ext:
raw_ext = ""
data['padID'] = padid data['padID'] = padid
p = padpath(padid, args.pub, args.group, args.fix_names) p = padpath(padid, args.pub, args.group, args.fix_names)
if args.folder: if args.folder:
p = os.path.join(p, padid) p = os.path.join(p, padid)
metapath = p + ".meta.json" metapath = p + ".meta.json"
revisions = None revisions = None
tries = 1 tries = 1
skip = False skip = False
padurlbase = re.sub(r"api/1.2.9/$", "p/", info["apiurl"]) padurlbase = re.sub(r"api/1.2.9/$", "p/", info["apiurl"])
meta = {} meta = {}
while True: while True:
try: try:
if os.path.exists(metapath): if os.path.exists(metapath):
with open(metapath) as f: with open(metapath) as f:
meta.update(json.load(f)) meta.update(json.load(f))
revisions = getjson( url = (
info['localapiurl'] info['localapiurl'] + 'getRevisionsCount?' + urlencode(data)
+ 'getRevisionsCount?'
+ urlencode(data)
)['data']['revisions']
if meta['revisions'] == revisions and not args.force:
skip = True
break
meta['padid'] = padid
versions = meta["versions"] = []
versions.append(
{
"url": padurlbase + quote(padid),
"type": "pad",
"code": 200,
}
) )
response = await agetjson(session, url)
if revisions == None: revisions = response['data']['revisions']
meta['revisions'] = getjson( if meta['revisions'] == revisions and not args.force:
info['localapiurl']
+ 'getRevisionsCount?'
+ urlencode(data)
)['data']['revisions']
else:
meta['revisions'] = revisions
if (meta['revisions'] == 0) and (not args.zerorevs):
skip = True skip = True
break break
# todo: load more metadata! meta['padid'] = padid
meta['group'], meta['pad'] = splitpadname(padid) versions = meta["versions"] = []
meta['pathbase'] = p versions.append(
meta['lastedited_raw'] = int( {"url": padurlbase + quote(padid), "type": "pad", "code": 200,}
getjson( )
info['localapiurl'] + 'getLastEdited?' + urlencode(data)
)['data']['lastEdited'] if revisions is None:
url = (
info['localapiurl'] + 'getRevisionsCount?' + urlencode(data)
) )
meta['lastedited_iso'] = datetime.fromtimestamp( response = await agetjson(session, url)
int(meta['lastedited_raw']) / 1000 meta['revisions'] = response['data']['revisions']
).isoformat() else:
meta['author_ids'] = getjson( meta['revisions'] = revisions
info['localapiurl'] + 'listAuthorsOfPad?' + urlencode(data)
)['data']['authorIDs'] if (meta['revisions'] == 0) and (not args.zerorevs):
skip = True
break break
except HTTPError as e:
tries += 1 # todo: load more metadata!
if tries > 3: meta['group'], meta['pad'] = splitpadname(padid)
print( meta['pathbase'] = p
"Too many failures ({0}), skipping".format(padid),
file=sys.stderr, url = info['localapiurl'] + 'getLastEdited?' + urlencode(data)
) response = await agetjson(session, url)
skip = True meta['lastedited_raw'] = int(response['data']['lastEdited'])
break
else: meta['lastedited_iso'] = datetime.fromtimestamp(
sleep(3) int(meta['lastedited_raw']) / 1000
except TypeError as e: ).isoformat()
url = info['localapiurl'] + 'listAuthorsOfPad?' + urlencode(data)
response = await agetjson(session, url)
meta['author_ids'] = response['data']['authorIDs']
break
except HTTPError as e:
tries += 1
if tries > 3:
print( print(
"Type Error loading pad {0} (phantom pad?), skipping".format( "Too many failures ({0}), skipping".format(padid),
padid
),
file=sys.stderr, file=sys.stderr,
) )
skip = True skip = True
break break
else:
await trio.sleep(3)
except TypeError as e:
print(
"Type Error loading pad {0} (phantom pad?), skipping".format(
padid
),
file=sys.stderr,
)
skip = True
break
if skip: if skip:
continue return
count += 1 if args.output:
print(padid)
if args.output: if args.all or (args.meta or args.text or args.html or args.dhtml):
print(padid) try:
os.makedirs(os.path.split(metapath)[0])
except OSError:
pass
if args.all or (args.meta or args.text or args.html or args.dhtml): if args.all or args.text:
try: url = info['localapiurl'] + 'getText?' + urlencode(data)
os.makedirs(os.path.split(metapath)[0]) text = await agetjson(session, url)
except OSError: ver = {"type": "text"}
pass versions.append(ver)
ver["code"] = text["_code"]
if args.all or args.text: if text["_code"] == 200:
text = getjson(info['localapiurl'] + 'getText?' + urlencode(data)) text = text['data']['text']
ver = {"type": "text"}
versions.append(ver) ##########################################
ver["code"] = text["_code"] ## ENFORCE __NOPUBLISH__ MAGIC WORD
if text["_code"] == 200: ##########################################
text = text['data']['text'] if args.nopublish and args.nopublish in text:
# NEED TO PURGE ANY EXISTING DOCS
########################################## try_deleting(
## ENFORCE __NOPUBLISH__ MAGIC WORD (
########################################## p + raw_ext,
if args.nopublish and args.nopublish in text: p + ".raw.html",
# NEED TO PURGE ANY EXISTING DOCS p + ".diff.html",
try_deleting( p + ".meta.json",
(
p + raw_ext,
p + ".raw.html",
p + ".diff.html",
p + ".meta.json",
)
) )
continue )
return
##########################################
## ENFORCE __PUBLISH__ MAGIC WORD ##########################################
########################################## ## ENFORCE __PUBLISH__ MAGIC WORD
if args.publish_opt_in and args.publish not in text: ##########################################
try_deleting( if args.publish_opt_in and args.publish not in text:
( try_deleting(
p + raw_ext, (
p + ".raw.html", p + raw_ext,
p + ".diff.html", p + ".raw.html",
p + ".meta.json", p + ".diff.html",
) p + ".meta.json",
) )
continue )
return
ver["path"] = p + raw_ext
ver["url"] = quote(ver["path"]) ver["path"] = p + raw_ext
with open(ver["path"], "w") as f: ver["url"] = quote(ver["path"])
f.write(text) with open(ver["path"], "w") as f:
# once the content is settled, compute a hash f.write(text)
# and link it in the metadata! # once the content is settled, compute a hash
# and link it in the metadata!
links = []
if args.css: links = []
links.append({"href": args.css, "rel": "stylesheet"}) if args.css:
# todo, make this process reflect which files actually were made links.append({"href": args.css, "rel": "stylesheet"})
versionbaseurl = quote(padid) # todo, make this process reflect which files actually were made
versionbaseurl = quote(padid)
links.append(
{
"href": versions[0]["url"],
"rel": "alternate",
"type": "text/html",
"title": "Etherpad",
}
)
if args.all or args.text:
links.append(
{
"href": versionbaseurl + raw_ext,
"rel": "alternate",
"type": "text/plain",
"title": "Plain text",
}
)
if args.all or args.html:
links.append( links.append(
{ {
"href": versions[0]["url"], "href": versionbaseurl + ".raw.html",
"rel": "alternate", "rel": "alternate",
"type": "text/html", "type": "text/html",
"title": "Etherpad", "title": "HTML",
}
)
if args.all or args.dhtml:
links.append(
{
"href": versionbaseurl + ".diff.html",
"rel": "alternate",
"type": "text/html",
"title": "HTML with author colors",
}
)
if args.all or args.meta:
links.append(
{
"href": versionbaseurl + ".meta.json",
"rel": "alternate",
"type": "application/json",
"title": "Meta data",
} }
) )
if args.all or args.text:
links.append(
{
"href": versionbaseurl + raw_ext,
"rel": "alternate",
"type": "text/plain",
"title": "Plain text",
}
)
if args.all or args.html:
links.append(
{
"href": versionbaseurl + ".raw.html",
"rel": "alternate",
"type": "text/html",
"title": "HTML",
}
)
if args.all or args.dhtml:
links.append(
{
"href": versionbaseurl + ".diff.html",
"rel": "alternate",
"type": "text/html",
"title": "HTML with author colors",
}
)
if args.all or args.meta:
links.append(
{
"href": versionbaseurl + ".meta.json",
"rel": "alternate",
"type": "application/json",
"title": "Meta data",
}
)
# links.append({"href":"/", "rel":"search", "type":"text/html", "title":"Index"})
if args.all or args.dhtml: if args.all or args.dhtml:
data['startRev'] = "0" data['startRev'] = "0"
html = getjson( url = info['localapiurl'] + 'createDiffHTML?' + urlencode(data)
info['localapiurl'] + 'createDiffHTML?' + urlencode(data) html = await agetjson(session, url)
) ver = {"type": "diffhtml"}
ver = {"type": "diffhtml"} versions.append(ver)
versions.append(ver) ver["code"] = html["_code"]
ver["code"] = html["_code"] if html["_code"] == 200:
if html["_code"] == 200: try:
try:
html = html['data']['html']
ver["path"] = p + ".diff.html"
ver["url"] = quote(ver["path"])
# doc = html5lib.parse(html, treebuilder="etree", override_encoding="utf-8", namespaceHTMLElements=False)
doc = html5lib.parse(
html, treebuilder="etree", namespaceHTMLElements=False
)
html5tidy(
doc,
indent=True,
title=padid,
scripts=args.script,
links=links,
)
with open(ver["path"], "w") as f:
print(
ET.tostring(doc, method="html", encoding="unicode"),
file=f,
)
except TypeError:
# Malformed / incomplete response, record the message (such as "internal error") in the metadata and write NO file!
ver["message"] = html["message"]
# with open(ver["path"], "w") as f:
# print ("""<pre>{0}</pre>""".format(json.dumps(html, indent=2)), file=f)
# Process text, html, dhtml, all options
if args.all or args.html:
html = getjson(info['localapiurl'] + 'getHTML?' + urlencode(data))
ver = {"type": "html"}
versions.append(ver)
ver["code"] = html["_code"]
if html["_code"] == 200:
html = html['data']['html'] html = html['data']['html']
ver["path"] = p + ".raw.html" ver["path"] = p + ".diff.html"
ver["url"] = quote(ver["path"]) ver["url"] = quote(ver["path"])
doc = html5lib.parse( doc = html5lib.parse(
html, treebuilder="etree", namespaceHTMLElements=False html, treebuilder="etree", namespaceHTMLElements=False
@ -456,12 +410,68 @@ def main(args):
ET.tostring(doc, method="html", encoding="unicode"), ET.tostring(doc, method="html", encoding="unicode"),
file=f, file=f,
) )
except TypeError:
ver["message"] = html["message"]
# Process text, html, dhtml, all options
if args.all or args.html:
url = info['localapiurl'] + 'getHTML?' + urlencode(data)
html = await agetjson(session, url)
ver = {"type": "html"}
versions.append(ver)
ver["code"] = html["_code"]
if html["_code"] == 200:
html = html['data']['html']
ver["path"] = p + ".raw.html"
ver["url"] = quote(ver["path"])
doc = html5lib.parse(
html, treebuilder="etree", namespaceHTMLElements=False
)
html5tidy(
doc, indent=True, title=padid, scripts=args.script, links=links,
)
with open(ver["path"], "w") as f:
print(
ET.tostring(doc, method="html", encoding="unicode"), file=f,
)
# output meta
if args.all or args.meta:
ver = {"type": "meta"}
versions.append(ver)
ver["path"] = metapath
ver["url"] = quote(metapath)
with open(metapath, "w") as f:
json.dump(meta, f, indent=2)
# output meta
if args.all or args.meta: async def handle_pad_chunk(args, padids, data, info, session):
ver = {"type": "meta"} progress_kwargs = {}
versions.append(ver) if not istty():
ver["path"] = metapath progress_kwargs.update(dict(disable=True))
ver["url"] = quote(metapath)
with open(metapath, "w") as f: padids = tqdm(iterable=padids, total=len(padids), **progress_kwargs,)
json.dump(meta, f, indent=2) for padid in padids:
await handle_pad(args, padid, data, info, session)
async def handle_pads(args):
session = asks.Session(connections=args.connection)
info = loadpadinfo(args.padinfo)
data = {'apikey': info['apikey']}
padids = await get_padids(args, info, data, session)
if args.skip:
padids = padids[args.skip : len(padids)]
CHUNK_SIZE = math.ceil(len(padids) / 3)
async with trio.open_nursery() as nursery:
for padids in chunks(padids, CHUNK_SIZE):
_args = (args, padids, data, info, session)
nursery.start_soon(handle_pad_chunk, *_args)
def main(args):
p = build_argument_parser(args)
args = p.parse_args(args)
trio.run(handle_pads, args)

7
setup.py

@ -43,13 +43,16 @@ setup(
long_description=long_description, long_description=long_description,
long_description_content_type='text/markdown', long_description_content_type='text/markdown',
install_requires=[ install_requires=[
"asks",
"html5lib", "html5lib",
"jinja2", "jinja2",
"python-dateutil",
"pypandoc", "pypandoc",
"tqdm", "python-dateutil",
"requests", "requests",
"tqdm",
"trio",
], ],
python_requires=">=3.5",
classifiers=[ classifiers=[
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',
'Environment :: Console', 'Environment :: Console',

Loading…
Cancel
Save