From d578e42c96e5c9eb22eb497511765464421514b5 Mon Sep 17 00:00:00 2001 From: rra Date: Wed, 17 Jun 2020 10:59:15 +0200 Subject: [PATCH] signing headers still do not work --- basic_ap.py | 145 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 99 insertions(+), 46 deletions(-) diff --git a/basic_ap.py b/basic_ap.py index b1db9f0..a2fabdb 100644 --- a/basic_ap.py +++ b/basic_ap.py @@ -15,44 +15,91 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import flask, os +import flask +import os +import requests +import base64 +import json + from flask import request from flask import Response from time import strftime, gmtime -import httpsig, requests + + +from Crypto.Hash import SHA256 +from Crypto.Signature import PKCS1_v1_5 +from Crypto.PublicKey import RSA + +# +import logging +import http.client as http_client +http_client.HTTPConnection.debuglevel = 1 + +logging.basicConfig() +logging.getLogger().setLevel(logging.DEBUG) +requests_log = logging.getLogger("requests.packages.urllib3") +requests_log.setLevel(logging.DEBUG) +requests_log.propagate = True + +# #Config DOMAIN = 'https://my-example.com' USERNAME = 'alice' def public_key(): - """ - Use commandline openssl to generate a public and private key - """ - if not os.path.exists('public.pem'): - os.system('openssl genrsa -out private.pem 2048') - os.system('openssl rsa -in private.pem -outform PEM -pubout -out public.pem') - else: - public_key = open('public.pem').read() - public_key = public_key.replace('\n','\\n') #public key shouldn't contain verbatim linebreaks in json - return public_key + """ + Use commandline openssl to generate a public and private key + """ + if not os.path.exists('public.pem'): + os.system('openssl genrsa -out private.pem 2048') + os.system('openssl rsa -in private.pem -outform PEM -pubout -out public.pem') + else: + public_key = open('public.pem').read() + public_key = public_key.replace('\n','\\n') #public key shouldn't contain verbatim linebreaks in json + return public_key public_key() #generate public_key on first launch +def build_signing_string(headers, header_values): + return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), header_values)) + def sign_header(private_key, key_id, host): - date= strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime()) + """ + Sign HTTP headers + """ + date = strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime()) #RFC1123 Time format - secret= open(private_key,'rb').read() - hs = httpsig.HeaderSigner(key_id, secret, algorithm="rsa-sha256", headers=['(request-target)', 'host', 'date']) - auth = hs.sign({"Date": date, "Host": host}, method='POST',path='/inbox') + #Based on https://github.com/autogestion/pubgate/ + keyfile = open(private_key,'rb').read() + secret = RSA.import_key(keyfile) + signer = PKCS1_v1_5.new(secret) - # thanks to https://github.com/rowanlupton/pylodon/blob/master/pylodon/utilities.py for the inspiration - # this is necessary because httpsig.HeaderSigner returns an Authorization header instead of Signature - auth['Signature'] = auth.pop('authorization') - assert auth['Signature'].startswith('Signature ') - auth['Signature'] = auth['Signature'][len('Signature '):] - return auth + headers = { 'request-target': 'post /inbox', + 'host': host, #this is the destination host + 'date': date, + } + sigheaders = headers.keys() + + sigstring = build_signing_string(headers, sigheaders) + print(sigstring + ) + digest = SHA256.new() + digest.update(sigstring.encode('ascii')) + sigdata = base64.b64encode(signer.sign(digest)) + + sig = { + 'keyId': key_id, + 'algorithm': 'rsa-sha256', + 'headers': ' '.join(sigheaders), + 'signature': sigdata.decode('ascii') + } + + headers["signature"] = ','.join(['{}="{}"'.format(k, v) for k, v in sig.items()]) + + return headers + #Flask @@ -60,43 +107,49 @@ app = flask.Flask(__name__) @app.route('/') def index(): - return 'It works! Now try to look for a user@my-example.com via the mastodon interface' + return 'It works! Now try to look for a user@my-example.com via the mastodon interface' @app.route('/.well-known/webfinger') def finger(): - """ - Respond to webfinger queries (GET /.well-known/webfinger?resource=acct:alice@my-example.com) with a json object pointing to the actor - see templates/webfinger.json + """ + Respond to webfinger queries (GET /.well-known/webfinger?resource=acct:alice@my-example.com) with a json object pointing to the actor + see templates/webfinger.json - """ - if request.args.get('resource'): - query = request.args.get('resource') + """ + if request.args.get('resource'): + query = request.args.get('resource') - actor = query.split(':')[1].split('@')[0] # from 'acct:alice@my-example.com' to 'alice' + actor = query.split(':')[1].split('@')[0] # from 'acct:alice@my-example.com' to 'alice' - json = flask.render_template('webfinger.json', query=query, actor=actor, domain=DOMAIN) # render our ActivityPub answer + json = flask.render_template('webfinger.json', query=query, actor=actor, domain=DOMAIN) # render our ActivityPub answer - return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object - + return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object + @app.route('/users/') def profile(actor): - """ - Return an Actor object - see templates/actor.json - """ - json = flask.render_template('actor.json', preferred_username=USERNAME, actor=actor, domain=DOMAIN, public_key=public_key()) # render our ActivityPub answer - return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object + """ + Return an Actor object + see templates/actor.json + """ + json = flask.render_template('actor.json', preferred_username=USERNAME, actor=actor, domain=DOMAIN, public_key=public_key()) # render our ActivityPub answer + return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object @app.route('/post/', methods=['POST','GET']) def post(): - json=flask.render_template('create.json', - domain=DOMAIN,public_key=public_key(),actor='test',host='https://post.lurk.org') - r = requests.post(url='http://post.lurk.org/users/rra/inbox', json=json, headers=sign_header('private.pem', 'https://t.homebrewserver.club/users/test#main-key','https://post.lurk.org')) - a = (r.status_code, r.reason, r.text) - return 'test' + date = strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime()) + activity=json.loads(flask.render_template('create.json', + domain=DOMAIN,public_key=public_key(),actor=USERNAME,host='DOMAIN', date=date)) + signed_headers = sign_header('private.pem', DOMAIN+'/users/'+USERNAME+'#main-key','https://post.lurk.org') + + r = requests.post('https://post.lurk.org/inbox', json=activity, headers=signed_headers) + + a = (r.status_code, r.text, r.headers) + a = str(a) + + return a if __name__ == '__main__': - app.debug =True - app.run() + app.debug =True + app.run()