From 22c6985ae2f91c5da692daa951f826e6b95a0236 Mon Sep 17 00:00:00 2001 From: rra Date: Wed, 24 Jun 2020 19:18:47 +0200 Subject: [PATCH] now also use RSA keys of pubgate --- basic_ap.py | 89 +++++++++-------------------------------------------- httpsig.py | 4 +-- key.py | 74 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 77 deletions(-) create mode 100644 key.py diff --git a/basic_ap.py b/basic_ap.py index f4deaf6..a1f9630 100644 --- a/basic_ap.py +++ b/basic_ap.py @@ -31,6 +31,7 @@ from Crypto.Signature import pkcs1_15 from Crypto.PublicKey import RSA from httpsig import HTTPSigAuth +from key import Key, get_key # import logging @@ -61,73 +62,7 @@ def public_key(): public_key = public_key.replace('\n','\\n') #public key shouldn't contain verbatim linebreaks in json return public_key -public_key() #generate public_key on first launch - -# def build_signing_string(headers, header_values): -# return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), header_values)) - -# def messageContentDigest(messageBodyJsonStr: str) -> str: -# msg = messageBodyJsonStr.encode('utf-8') -# digestStr = SHA256.new(msg).digest() -# return base64.b64encode(digestStr).decode('utf-8') - - - -# def sign_header(private_key, keyID, host, messageBodyJsonStr): -# """ -# Sign HTTP headers -# """ -# date = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) #RFC1123 Time format - -# #Based on https://code.freedombone.net/bashrc/epicyon/src/main/httpsig.py -# keyfile = open(private_key,'rb').read() -# secret = RSA.import_key(keyfile) - -# bodyDigest = messageContentDigest(messageBodyJsonStr) -# contentLength = len(messageBodyJsonStr) -# print(contentLength) - -# path = '/inbox' - -# headers = { -# '(request-target)': f'post {path}', -# 'host': host, -# 'date': date, -# 'digest': f'SHA-256={bodyDigest}', -# 'content-type': 'application/json', -# 'content-length': str(contentLength) -# } - - -# signedHeaderKeys = headers.keys() -# signedHeaderText = '' - -# for headerKey in signedHeaderKeys: -# signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' - -# signedHeaderText = signedHeaderText.strip() -# headerDigest = SHA256.new(signedHeaderText.encode('ascii')) - -# # Sign the digest -# rawSignature = pkcs1_15.new(secret).sign(headerDigest) -# signature = base64.b64encode(rawSignature).decode('ascii') - -# # Put it into a valid HTTP signature format -# signatureDict = { -# 'keyId': keyID, -# 'algorithm': 'rsa-sha256', -# 'headers': ' '.join(signedHeaderKeys), -# 'signature': signature -# } -# signatureHeader = ', '.join( -# [f'{k}="{v}"' for k, v in signatureDict.items()]) - - -# headers['signature'] = signatureHeader - -# return headers - - +key = get_key(f'{DOMAIN}/users/{USERNAME}') #generate public_key on first launch #Flask @@ -159,34 +94,38 @@ def profile(actor): Return an Actor object see templates/actor.json """ - json = flask.render_template('actor.json', preferred_username=USERNAME, actor=actor, domain=DOMAIN, public_key=public_key()) # render our ActivityPub answer + key = get_key(f'{DOMAIN}/users/{actor}') + publicKeyPem = key.to_dict()['publicKeyPem'] + publicKeyPem = publicKeyPem.replace('\n','\\n') #public key shouldn't contain verbatim linebreaks in json + + json = flask.render_template('actor.json', preferred_username=USERNAME, actor=actor, domain=DOMAIN, public_key=publicKeyPem) # render our ActivityPub answer + return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object @app.route('/post/', methods=['POST','GET']) def post(): - keyfile = open('private.pem','rb').read() - key = {'privkey': RSA.import_key(keyfile), 'keyId': DOMAIN+'/users/'+USERNAME+'#main-key'} + #key = get_key(f'{DOMAIN}/users/{USERNAME}') headers = {"content-type": "application/activity+json", - "user-agent": f"test v:1"} + "user-agent": f"Basic AP v: 0.0"} - json_message=json.loads(flask.render_template('create.json', - domain=DOMAIN,public_key=public_key(),actor=USERNAME,host='DOMAIN')) + activity=json.loads(flask.render_template('create.json', + domain=DOMAIN, actor=USERNAME)) http_sig = HTTPSigAuth(key, headers) url = 'https://post.lurk.org/inbox' - body = json.dumps(json_message) + body = json.dumps(activity) headers = http_sig.sign(url,body) #signed_headers = sign_header('private.pem', DOMAIN+'/users/'+USERNAME+'#main-key','https://post.lurk.org', json.dumps(json_message)) - r = requests.post('https://post.lurk.org/inbox', json=body, headers=headers) + r = requests.post(url, json=body, headers=headers) html = f'Status code
{r.status_code}

Response Headers:
{r.headers}


json message:
{r.request.body}


Request headers:
{r.request.headers}
' html_headers = f'{html}' diff --git a/httpsig.py b/httpsig.py index 69a8164..aecf57f 100644 --- a/httpsig.py +++ b/httpsig.py @@ -103,13 +103,13 @@ class HTTPSigAuth: sigheaders = headers.keys() sigstring = build_signing_string(headers, sigheaders) - signer = PKCS1_v1_5.new(self.key['privkey']) + signer = PKCS1_v1_5.new(self.key.privkey) digest = SHA256.new() digest.update(sigstring.encode("ascii")) sigdata = base64.b64encode(signer.sign(digest)) sig = { - 'keyId': self.key['keyId'], + 'keyId': self.key.key_id(), 'algorithm': 'rsa-sha256', 'headers': ' '.join(sigheaders), 'signature': sigdata.decode('ascii') diff --git a/key.py b/key.py new file mode 100644 index 0000000..4f9534c --- /dev/null +++ b/key.py @@ -0,0 +1,74 @@ +import os +import re +import base64 +from Crypto.PublicKey import RSA +from Crypto.Util import number +from typing import Any +from typing import Dict +from typing import Optional + +KEY_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.') + +class Key(object): + DEFAULT_KEY_SIZE = 2048 + + def __init__(self, owner: str) -> None: + self.owner = owner + self.privkey_pem: Optional[str] = None + self.pubkey_pem: Optional[str] = None + self.privkey: Optional[Any] = None + self.pubkey: Optional[Any] = None + + def load_pub(self, pubkey_pem: str) -> None: + self.pubkey_pem = pubkey_pem + self.pubkey = RSA.importKey(pubkey_pem) + + def load(self, privkey_pem: str) -> None: + self.privkey_pem = privkey_pem + self.privkey = RSA.importKey(self.privkey_pem) + self.pubkey_pem = self.privkey.publickey().exportKey("PEM").decode("utf-8") + + def new(self) -> None: + k = RSA.generate(self.DEFAULT_KEY_SIZE) + self.privkey_pem = k.exportKey("PEM").decode("utf-8") + self.pubkey_pem = k.publickey().exportKey("PEM").decode("utf-8") + self.privkey = k + + def key_id(self) -> str: + return f"{self.owner}#main-key" + + def to_dict(self) -> Dict[str, Any]: + return { + "id": self.key_id(), + "owner": self.owner, + "publicKeyPem": self.pubkey_pem, + } + + def to_magic_key(self) -> str: + mod = base64.urlsafe_b64encode( + number.long_to_bytes(self.privkey.n) # type: ignore + ).decode("utf-8") + pubexp = base64.urlsafe_b64encode( + number.long_to_bytes(self.privkey.e) # type: ignore + ).decode("utf-8") + return f"data:application/magic-public-key,RSA.{mod}.{pubexp}" + + +def get_key(owner: str) -> Key: + """"Loads or generates an RSA key.""" + k = Key(owner) + user = re.sub('[^\w\d]', "_", owner) + key_path = os.path.join(KEY_DIR, f"key_{user}.pem") + if os.path.isfile(key_path): + with open(key_path) as f: + privkey_pem = f.read() + k.load(privkey_pem) + else: + k.new() + with open(key_path, "w") as f: + f.write(k.privkey_pem) + + return k + + +