Mercurial > code > home > repos > front-door-lock
view get_agent.py @ 1:3b82ee3b9d79
cleanup
author | drewp@bigasterisk.com |
---|---|
date | Sun, 27 Aug 2023 11:20:30 -0700 |
parents | 4365c72c59f6 |
children | 76cec592435c |
line wrap: on
line source
import datetime import json import logging from typing import Any, Dict import aiohttp from attr import dataclass from jwskate import JwkSet, Jwt from rdflib import URIRef from starlette.requests import Request log = logging.getLogger(__name__) tzlocal = datetime.datetime.now().astimezone().tzinfo @dataclass class Agent: email: str agent: URIRef name: str issuedAt: datetime.datetime expiration: datetime.datetime def asDict(self): now = datetime.datetime.now(tzlocal) return { 'email': self.email, 'agent': str(self.agent), 'name': self.name, 'issuedAt': { 't': self.issuedAt.isoformat(), 'secAgo': (now - self.issuedAt).total_seconds(), }, 'expiration': { 't': self.expiration.isoformat(), 'inSec': (self.expiration - now).total_seconds(), }, 'now': now.isoformat(), } def foafAgentFromAuthEmail(email: str) -> URIRef: return URIRef({ 'drewpca@gmail.com': 'http://bigasterisk.com/foaf.rdf#drewp', 'kelsimp@gmail.com': 'http://bigasterisk.com/kelsi/foaf.rdf#kelsi', }[email]) _jwkset: JwkSet | None = None async def loadJwkset() -> JwkSet: global _jwkset if _jwkset is None: log.info("loading jwks.json") async with aiohttp.ClientSession() as session: async with session.get('https://authenticate2.bigasterisk.com/.well-known/pomerium/jwks.json') as response: if response.status != 200: raise ValueError(f'{response.status=}') ct = json.loads(await response.text()) _jwkset = JwkSet(ct) return _jwkset async def getAgent(req: Request) -> Agent | None: jwkset = await loadJwkset() pomAssertion = req.headers.get('X-Pomerium-Jwt-Assertion', None) if pomAssertion is None: raise ValueError("missing X-Pomerium-Jwt-Assertion") jwt = Jwt(pomAssertion) jwt.validate( jwkset['keys'][0], #?? algs=['ES256'], issuer='authenticate2.bigasterisk.com', audience='bigasterisk.com') claims: Dict[str, Any] = jwt.claims log.debug('claims=%r', claims) if not claims['email']: return None return Agent( email=claims['email'], agent=foafAgentFromAuthEmail(claims['email']), expiration=datetime.datetime.fromtimestamp(claims['exp']).astimezone(tzlocal), issuedAt=datetime.datetime.fromtimestamp(claims['iat']).astimezone(tzlocal), name=claims['name'], )