Mercurial > code > home > repos > front-door-lock
view get_agent.py @ 9:6e0d47f9e56d
pom issuer change
author | drewp@bigasterisk.com |
---|---|
date | Mon, 04 Dec 2023 13:51:30 -0800 |
parents | 76cec592435c |
children |
line wrap: on
line source
import datetime import json import logging from typing import Any, Dict import aiohttp from attr import dataclass from jwskate import JwkSet, Jwt from rdflib import URIRef from starlette.requests import Request log = logging.getLogger(__name__) tzlocal = datetime.datetime.now().astimezone().tzinfo @dataclass class Agent: email: str agent: URIRef name: str issuedAt: datetime.datetime expiration: datetime.datetime def asDict(self): now = datetime.datetime.now(tzlocal) return { 'email': self.email, 'agent': str(self.agent), 'name': self.name, 'issuedAt': { 't': self.issuedAt.isoformat(), 'secAgo': (now - self.issuedAt).total_seconds(), }, 'expiration': { 't': self.expiration.isoformat(), 'inSec': (self.expiration - now).total_seconds(), }, 'now': now.isoformat(), } def foafAgentFromAuthEmail(email: str) -> URIRef: return URIRef({ 'drewpca@gmail.com': 'http://bigasterisk.com/foaf.rdf#drewp', 'kelsimp@gmail.com': 'http://bigasterisk.com/kelsi/foaf.rdf#kelsi', }[email]) _jwkset: JwkSet | None = None async def loadJwkset() -> JwkSet: global _jwkset if _jwkset is None: log.info("loading jwks.json") async with aiohttp.ClientSession() as session: async with session.get('https://authenticate2.bigasterisk.com/.well-known/pomerium/jwks.json') as response: if response.status != 200: raise ValueError(f'{response.status=}') ct = json.loads(await response.text()) _jwkset = JwkSet(ct) return _jwkset async def getAgent(req: Request) -> Agent | None: jwkset = await loadJwkset() pomAssertion = req.headers.get('X-Pomerium-Jwt-Assertion', None) if pomAssertion is None: raise ValueError("missing X-Pomerium-Jwt-Assertion") jwt = Jwt(pomAssertion) jwt.validate( jwkset['keys'][0], #?? algs=['ES256'], issuer='bigasterisk.com', audience='bigasterisk.com') claims: Dict[str, Any] = jwt.claims log.debug('claims=%r', claims) if not claims['email']: return None return Agent( email=claims['email'], agent=foafAgentFromAuthEmail(claims['email']), expiration=datetime.datetime.fromtimestamp(claims['exp']).astimezone(tzlocal), issuedAt=datetime.datetime.fromtimestamp(claims['iat']).astimezone(tzlocal), name=claims['name'], ) async def getFoafAgent(req) -> URIRef | None: """this is special because fingerprint needs to be able to send x-foaf-agent AND we need to get agents the normal way from pomerium""" if 'X-Pomerium-Jwt-Assertion' in req.headers: agent = await getAgent(req) if agent: return agent.agent else: if 'x-foaf-agent' in req.headers: # we can trust fingerprint-unlocker to give us a x-foaf-agent derived from the fingerprint return URIRef(req.headers['x-foaf-agent']) return None