# This is a simple python flask implementation of 'How To Implement A Basic ActivityPub Server' # https://blog.joinmastodon.org/2018/06/how-to-implement-a-basic-activitypub-server/ # © 2018 homebrewserver.club contributors # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . import flask import os import requests import base64 import json from flask import request from flask import Response from time import strftime, gmtime from Crypto.Hash import SHA256 from Crypto.Signature import pkcs1_15 from Crypto.PublicKey import RSA # import logging import http.client as http_client http_client.HTTPConnection.debuglevel = 1 logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True # #Config DOMAIN = 'https://my-example.com' USERNAME = 'alice' def public_key(): """ Use commandline openssl to generate a public and private key """ if not os.path.exists('public.pem'): os.system('openssl genrsa -out private.pem 2048') os.system('openssl rsa -in private.pem -outform PEM -pubout -out public.pem') else: public_key = open('public.pem').read() public_key = public_key.replace('\n','\\n') #public key shouldn't contain verbatim linebreaks in json return public_key public_key() #generate public_key on first launch def build_signing_string(headers, header_values): return '\n'.join(map(lambda x: ': '.join([x.lower(), headers[x]]), header_values)) def messageContentDigest(messageBodyJsonStr: str) -> str: msg = messageBodyJsonStr.encode('utf-8') digestStr = SHA256.new(msg).digest() return base64.b64encode(digestStr).decode('utf-8') def sign_header(private_key, keyID, host, messageBodyJsonStr): """ Sign HTTP headers """ date = strftime("%a, %d %b %Y %H:%M:%S %Z", gmtime()) #RFC1123 Time format #Based on https://code.freedombone.net/bashrc/epicyon/src/main/httpsig.py keyfile = open(private_key,'rb').read() secret = RSA.import_key(keyfile) bodyDigest = messageContentDigest(messageBodyJsonStr) contentLength = len(messageBodyJsonStr) print(contentLength) path = '/inbox' headers = { '(request-target)': f'post {path}', 'host': host, 'date': date, 'digest': f'SHA-256={bodyDigest}', 'content-type': 'application/json', 'content-length': str(contentLength) } signedHeaderKeys = headers.keys() signedHeaderText = '' for headerKey in signedHeaderKeys: signedHeaderText += f'{headerKey}: {headers[headerKey]}\n' signedHeaderText = signedHeaderText.strip() headerDigest = SHA256.new(signedHeaderText.encode('ascii')) # Sign the digest rawSignature = pkcs1_15.new(secret).sign(headerDigest) signature = base64.b64encode(rawSignature).decode('ascii') # Put it into a valid HTTP signature format signatureDict = { 'keyId': keyID, 'algorithm': 'rsa-sha256', 'headers': ' '.join(signedHeaderKeys), 'signature': signature } signatureHeader = ','.join( [f'{k}="{v}"' for k, v in signatureDict.items()]) headers['signature'] = signatureHeader return headers #Flask app = flask.Flask(__name__) @app.route('/') def index(): return 'It works! Now try to look for a user@my-example.com via the mastodon interface' @app.route('/.well-known/webfinger') def finger(): """ Respond to webfinger queries (GET /.well-known/webfinger?resource=acct:alice@my-example.com) with a json object pointing to the actor see templates/webfinger.json """ if request.args.get('resource'): query = request.args.get('resource') actor = query.split(':')[1].split('@')[0] # from 'acct:alice@my-example.com' to 'alice' json = flask.render_template('webfinger.json', query=query, actor=actor, domain=DOMAIN) # render our ActivityPub answer return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object @app.route('/users/') def profile(actor): """ Return an Actor object see templates/actor.json """ json = flask.render_template('actor.json', preferred_username=USERNAME, actor=actor, domain=DOMAIN, public_key=public_key()) # render our ActivityPub answer return Response(response=json, status=200, mimetype="application/json") # return that answer as a json object @app.route('/post/', methods=['POST','GET']) def post(): date = strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime()) json_message=json.loads(flask.render_template('create.json', domain=DOMAIN,public_key=public_key(),actor=USERNAME,host='DOMAIN', date=date)) signed_headers = sign_header('private.pem', DOMAIN+'/users/'+USERNAME+'#main-key','https://post.lurk.org', json.dumps(json_message)) r = requests.post('https://post.lurk.org/inbox', json=json_message, headers=signed_headers) html = f'Status code
{r.status_code}

Response Headers:
{r.headers}


json message:
{json.dumps(json_message)}


Request headers:
{signed_headers}
' html_headers = f'{html}' return html_headers if __name__ == '__main__': app.debug =True app.run()