view get_agent.py @ 13:3014db0a5500 default tip

mv board to proj/micro, rename this repo with dashes
author drewp@bigasterisk.com
date Fri, 28 Jun 2024 17:08:09 -0700
parents 6e0d47f9e56d
children
line wrap: on
line source

import datetime
import json
import logging
from typing import Any, Dict

import aiohttp
from attr import dataclass
from jwskate import JwkSet, Jwt
from rdflib import URIRef
from starlette.requests import Request

log = logging.getLogger(__name__)
tzlocal = datetime.datetime.now().astimezone().tzinfo


@dataclass
class Agent:
    email: str
    agent: URIRef
    name: str
    issuedAt: datetime.datetime
    expiration: datetime.datetime

    def asDict(self):
        now = datetime.datetime.now(tzlocal)
        return {
            'email': self.email,
            'agent': str(self.agent),
            'name': self.name,
            'issuedAt': {
                't': self.issuedAt.isoformat(),
                'secAgo': (now - self.issuedAt).total_seconds(),
            },
            'expiration': {
                't': self.expiration.isoformat(),
                'inSec': (self.expiration - now).total_seconds(),
            },
            'now': now.isoformat(),
        }


def foafAgentFromAuthEmail(email: str) -> URIRef:
    return URIRef({
        'drewpca@gmail.com': 'http://bigasterisk.com/foaf.rdf#drewp',
        'kelsimp@gmail.com': 'http://bigasterisk.com/kelsi/foaf.rdf#kelsi',
    }[email])


_jwkset: JwkSet | None = None


async def loadJwkset() -> JwkSet:
    global _jwkset
    if _jwkset is None:
        log.info("loading jwks.json")
        async with aiohttp.ClientSession() as session:
            async with session.get('https://authenticate2.bigasterisk.com/.well-known/pomerium/jwks.json') as response:
                if response.status != 200:
                    raise ValueError(f'{response.status=}')

                ct = json.loads(await response.text())
                _jwkset = JwkSet(ct)
    return _jwkset


async def getAgent(req: Request) -> Agent | None:
    jwkset = await loadJwkset()
    pomAssertion = req.headers.get('X-Pomerium-Jwt-Assertion', None)
    if pomAssertion is None:
        raise ValueError("missing X-Pomerium-Jwt-Assertion")
    jwt = Jwt(pomAssertion)
    jwt.validate(
        jwkset['keys'][0],  #??
        algs=['ES256'],
        issuer='bigasterisk.com',
        audience='bigasterisk.com')
    claims: Dict[str, Any] = jwt.claims
    log.debug('claims=%r', claims)
    if not claims['email']:
        return None

    return Agent(
        email=claims['email'],
        agent=foafAgentFromAuthEmail(claims['email']),
        expiration=datetime.datetime.fromtimestamp(claims['exp']).astimezone(tzlocal),
        issuedAt=datetime.datetime.fromtimestamp(claims['iat']).astimezone(tzlocal),
        name=claims['name'],
    )


async def getFoafAgent(req) -> URIRef | None:
    """this is special because fingerprint needs to be able to send
    x-foaf-agent AND we need to get agents the normal way from pomerium"""

    if 'X-Pomerium-Jwt-Assertion' in req.headers:
        agent = await getAgent(req)
        if agent:
            return agent.agent
    else:
        if 'x-foaf-agent' in req.headers:
            # we can trust fingerprint-unlocker to give us a x-foaf-agent derived from the fingerprint
            return URIRef(req.headers['x-foaf-agent'])

    return None