view get_agent.py @ 1:3b82ee3b9d79

cleanup
author drewp@bigasterisk.com
date Sun, 27 Aug 2023 11:20:30 -0700
parents 4365c72c59f6
children 76cec592435c
line wrap: on
line source

import datetime
import json
import logging
from typing import Any, Dict

import aiohttp
from attr import dataclass
from jwskate import JwkSet, Jwt
from rdflib import URIRef
from starlette.requests import Request

log = logging.getLogger(__name__)
tzlocal = datetime.datetime.now().astimezone().tzinfo

@dataclass
class Agent:
    email: str
    agent: URIRef
    name: str
    issuedAt: datetime.datetime
    expiration: datetime.datetime

    def asDict(self):
        now = datetime.datetime.now(tzlocal)
        return {
            'email': self.email,
            'agent': str(self.agent),
            'name': self.name,
            'issuedAt': {
                't': self.issuedAt.isoformat(),
                'secAgo': (now - self.issuedAt).total_seconds(),
            },
            'expiration': {
                't': self.expiration.isoformat(),
                'inSec': (self.expiration - now).total_seconds(),
            },
            'now': now.isoformat(),
        }


def foafAgentFromAuthEmail(email: str) -> URIRef:
    return URIRef({
        'drewpca@gmail.com': 'http://bigasterisk.com/foaf.rdf#drewp',
        'kelsimp@gmail.com': 'http://bigasterisk.com/kelsi/foaf.rdf#kelsi',
    }[email])


_jwkset: JwkSet | None = None


async def loadJwkset() -> JwkSet:
    global _jwkset
    if _jwkset is None:
        log.info("loading jwks.json")
        async with aiohttp.ClientSession() as session:
            async with session.get('https://authenticate2.bigasterisk.com/.well-known/pomerium/jwks.json') as response:
                if response.status != 200:
                    raise ValueError(f'{response.status=}')

                ct = json.loads(await response.text())
                _jwkset = JwkSet(ct)
    return _jwkset


async def getAgent(req: Request) -> Agent | None:
    jwkset = await loadJwkset()
    pomAssertion = req.headers.get('X-Pomerium-Jwt-Assertion', None)
    if pomAssertion is None:
        raise ValueError("missing X-Pomerium-Jwt-Assertion")
    jwt = Jwt(pomAssertion)
    jwt.validate(
        jwkset['keys'][0],  #??
        algs=['ES256'],
        issuer='authenticate2.bigasterisk.com',
        audience='bigasterisk.com')
    claims: Dict[str, Any] = jwt.claims
    log.debug('claims=%r', claims)
    if not claims['email']:
        return None

    return Agent(
        email=claims['email'],
        agent=foafAgentFromAuthEmail(claims['email']),
        expiration=datetime.datetime.fromtimestamp(claims['exp']).astimezone(tzlocal),
        issuedAt=datetime.datetime.fromtimestamp(claims['iat']).astimezone(tzlocal),
        name=claims['name'],
    )