Mercurial > code > home > repos > front-door-lock
diff get_agent.py @ 0:4365c72c59f6
start
author | drewp@bigasterisk.com |
---|---|
date | Sun, 27 Aug 2023 11:12:20 -0700 |
parents | |
children | 76cec592435c |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/get_agent.py Sun Aug 27 11:12:20 2023 -0700 @@ -0,0 +1,87 @@ +import datetime +import json +import logging +from typing import Any, Dict + +import aiohttp +from attr import dataclass +from jwskate import JwkSet, Jwt +from rdflib import URIRef +from starlette.requests import Request + +log = logging.getLogger(__name__) +tzlocal = datetime.datetime.now().astimezone().tzinfo + +@dataclass +class Agent: + email: str + agent: URIRef + name: str + issuedAt: datetime.datetime + expiration: datetime.datetime + + def asDict(self): + now = datetime.datetime.now(tzlocal) + return { + 'email': self.email, + 'agent': str(self.agent), + 'name': self.name, + 'issuedAt': { + 't': self.issuedAt.isoformat(), + 'secAgo': (now - self.issuedAt).total_seconds(), + }, + 'expiration': { + 't': self.expiration.isoformat(), + 'inSec': (self.expiration - now).total_seconds(), + }, + 'now': now.isoformat(), + } + + +def foafAgentFromAuthEmail(email: str) -> URIRef: + return URIRef({ + 'drewpca@gmail.com': 'http://bigasterisk.com/foaf.rdf#drewp', + 'kelsimp@gmail.com': 'http://bigasterisk.com/kelsi/foaf.rdf#kelsi', + }[email]) + + +_jwkset: JwkSet | None = None + + +async def loadJwkset() -> JwkSet: + global _jwkset + if _jwkset is None: + log.info("loading jwks.json") + async with aiohttp.ClientSession() as session: + async with session.get('https://authenticate2.bigasterisk.com/.well-known/pomerium/jwks.json') as response: + if response.status != 200: + raise ValueError(f'{response.status=}') + + ct = json.loads(await response.text()) + _jwkset = JwkSet(ct) + return _jwkset + + +async def getAgent(req: Request) -> Agent | None: + jwkset = await loadJwkset() + pomAssertion = req.headers.get('X-Pomerium-Jwt-Assertion', None) + if pomAssertion is None: + raise ValueError("missing X-Pomerium-Jwt-Assertion") + jwt = Jwt(pomAssertion) + jwt.validate( + jwkset['keys'][0], #?? + algs=['ES256'], + issuer='authenticate2.bigasterisk.com', + audience='bigasterisk.com') + claims: Dict[str, Any] = jwt.claims + log.debug('claims=%r', claims) + if not claims['email']: + return None + + return Agent( + email=claims['email'], + agent=foafAgentFromAuthEmail(claims['email']), + expiration=datetime.datetime.fromtimestamp(claims['exp']).astimezone(tzlocal), + issuedAt=datetime.datetime.fromtimestamp(claims['iat']).astimezone(tzlocal), + name=claims['name'], + )