Browse Source

signing headers still do not work

master
rra 5 years ago
parent
commit
d578e42c96
  1. 145
      basic_ap.py

145
basic_ap.py

@ -15,44 +15,91 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import flask, os
import flask
import os
import requests
import base64
import json
from flask import request
from flask import Response
from time import strftime, gmtime
import httpsig, requests
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.PublicKey import RSA
#
import logging
import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
#
#Config
DOMAIN = 'https://my-example.com'
USERNAME = 'alice'
def public_key():
"""
Use commandline openssl to generate a public and private key
"""
if not os.path.exists('public.pem'):
os.system('openssl genrsa -out private.pem 2048')
os.system('openssl rsa -in private.pem -outform PEM -pubout -out public.pem')
else:
public_key = open('public.pem').read()
public_key = public_key.replace('\n','\\n') #public key shouldn't contain verbatim linebreaks in json
return public_key
"""
Use commandline openssl to generate a public and private key
"""
if not os.path.exists('public.pem'):
os.system('openssl genrsa -out private.pem 2048')
os.system('openssl rsa -in private.pem -outform PEM -pubout -out public.pem')
else:
public_key = open('public.pem').read()
public_key = public_key.replace('\n','\\n') #public key shouldn't contain verbatim linebreaks in json
return public_key
public_key() #generate public_key on first launch
def build_signing_string(headers, header_values):
return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), header_values))
def sign_header(private_key, key_id, host):
date= strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime())
"""
Sign HTTP headers
"""
date = strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime()) #RFC1123 Time format
secret= open(private_key,'rb').read()
hs = httpsig.HeaderSigner(key_id, secret, algorithm="rsa-sha256", headers=['(request-target)', 'host', 'date'])
auth = hs.sign({"Date": date, "Host": host}, method='POST',path='/inbox')
#Based on https://github.com/autogestion/pubgate/
keyfile = open(private_key,'rb').read()
secret = RSA.import_key(keyfile)
signer = PKCS1_v1_5.new(secret)
# thanks to https://github.com/rowanlupton/pylodon/blob/master/pylodon/utilities.py for the inspiration
# this is necessary because httpsig.HeaderSigner returns an Authorization header instead of Signature
auth['Signature'] = auth.pop('authorization')
assert auth['Signature'].startswith('Signature ')
auth['Signature'] = auth['Signature'][len('Signature '):]
return auth
headers = { 'request-target': 'post /inbox',
'host': host, #this is the destination host
'date': date,
}
sigheaders = headers.keys()
sigstring = build_signing_string(headers, sigheaders)
print(sigstring
)
digest = SHA256.new()
digest.update(sigstring.encode('ascii'))
sigdata = base64.b64encode(signer.sign(digest))
sig = {
'keyId': key_id,
'algorithm': 'rsa-sha256',
'headers': ' '.join(sigheaders),
'signature': sigdata.decode('ascii')
}
headers["signature"] = ','.join(['{}="{}"'.format(k, v) for k, v in sig.items()])
return headers
#Flask
@ -60,43 +107,49 @@ app = flask.Flask(__name__)
@app.route('/')
def index():
return 'It works! Now try to look for a user@my-example.com via the mastodon interface'
return 'It works! Now try to look for a user@my-example.com via the mastodon interface'
@app.route('/.well-known/webfinger')
def finger():
"""
Respond to webfinger queries (GET /.well-known/webfinger?resource=acct:alice@my-example.com) with a json object pointing to the actor
see templates/webfinger.json
"""
Respond to webfinger queries (GET /.well-known/webfinger?resource=acct:alice@my-example.com) with a json object pointing to the actor
see templates/webfinger.json
"""
if request.args.get('resource'):
query = request.args.get('resource')
"""
if request.args.get('resource'):
query = request.args.get('resource')
actor = query.split(':')[1].split('@')[0] # from 'acct:alice@my-example.com' to 'alice'
actor = query.split(':')[1].split('@')[0] # from 'acct:alice@my-example.com' to 'alice'
json = flask.render_template('webfinger.json', query=query, actor=actor, domain=DOMAIN) # render our ActivityPub answer
json = flask.render_template('webfinger.json', query=query, actor=actor, domain=DOMAIN) # render our ActivityPub answer
return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object
return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object
@app.route('/users/<actor>')
def profile(actor):
"""
Return an Actor object
see templates/actor.json
"""
json = flask.render_template('actor.json', preferred_username=USERNAME, actor=actor, domain=DOMAIN, public_key=public_key()) # render our ActivityPub answer
return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object
"""
Return an Actor object
see templates/actor.json
"""
json = flask.render_template('actor.json', preferred_username=USERNAME, actor=actor, domain=DOMAIN, public_key=public_key()) # render our ActivityPub answer
return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object
@app.route('/post/', methods=['POST','GET'])
def post():
json=flask.render_template('create.json',
domain=DOMAIN,public_key=public_key(),actor='test',host='https://post.lurk.org')
r = requests.post(url='http://post.lurk.org/users/rra/inbox', json=json, headers=sign_header('private.pem', 'https://t.homebrewserver.club/users/test#main-key','https://post.lurk.org'))
a = (r.status_code, r.reason, r.text)
return 'test'
date = strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime())
activity=json.loads(flask.render_template('create.json',
domain=DOMAIN,public_key=public_key(),actor=USERNAME,host='DOMAIN', date=date))
signed_headers = sign_header('private.pem', DOMAIN+'/users/'+USERNAME+'#main-key','https://post.lurk.org')
r = requests.post('https://post.lurk.org/inbox', json=activity, headers=signed_headers)
a = (r.status_code, r.text, r.headers)
a = str(a)
return a
if __name__ == '__main__':
app.debug =True
app.run()
app.debug =True
app.run()

Loading…
Cancel
Save