view get_agent.py @ 8:caea36c8289f

don't try to reconnect mqtt (was broken); just fail a k8s health check
author drewp@bigasterisk.com
date Thu, 30 Nov 2023 22:43:58 -0800
parents 76cec592435c
children 6e0d47f9e56d
line wrap: on
line source

import datetime
import json
import logging
from typing import Any, Dict

import aiohttp
from attr import dataclass
from jwskate import JwkSet, Jwt
from rdflib import URIRef
from starlette.requests import Request

log = logging.getLogger(__name__)
tzlocal = datetime.datetime.now().astimezone().tzinfo


@dataclass
class Agent:
    email: str
    agent: URIRef
    name: str
    issuedAt: datetime.datetime
    expiration: datetime.datetime

    def asDict(self):
        now = datetime.datetime.now(tzlocal)
        return {
            'email': self.email,
            'agent': str(self.agent),
            'name': self.name,
            'issuedAt': {
                't': self.issuedAt.isoformat(),
                'secAgo': (now - self.issuedAt).total_seconds(),
            },
            'expiration': {
                't': self.expiration.isoformat(),
                'inSec': (self.expiration - now).total_seconds(),
            },
            'now': now.isoformat(),
        }


def foafAgentFromAuthEmail(email: str) -> URIRef:
    return URIRef({
        'drewpca@gmail.com': 'http://bigasterisk.com/foaf.rdf#drewp',
        'kelsimp@gmail.com': 'http://bigasterisk.com/kelsi/foaf.rdf#kelsi',
    }[email])


_jwkset: JwkSet | None = None


async def loadJwkset() -> JwkSet:
    global _jwkset
    if _jwkset is None:
        log.info("loading jwks.json")
        async with aiohttp.ClientSession() as session:
            async with session.get('https://authenticate2.bigasterisk.com/.well-known/pomerium/jwks.json') as response:
                if response.status != 200:
                    raise ValueError(f'{response.status=}')

                ct = json.loads(await response.text())
                _jwkset = JwkSet(ct)
    return _jwkset


async def getAgent(req: Request) -> Agent | None:
    jwkset = await loadJwkset()
    pomAssertion = req.headers.get('X-Pomerium-Jwt-Assertion', None)
    if pomAssertion is None:
        raise ValueError("missing X-Pomerium-Jwt-Assertion")
    jwt = Jwt(pomAssertion)
    jwt.validate(
        jwkset['keys'][0],  #??
        algs=['ES256'],
        issuer='authenticate2.bigasterisk.com',
        audience='bigasterisk.com')
    claims: Dict[str, Any] = jwt.claims
    log.debug('claims=%r', claims)
    if not claims['email']:
        return None

    return Agent(
        email=claims['email'],
        agent=foafAgentFromAuthEmail(claims['email']),
        expiration=datetime.datetime.fromtimestamp(claims['exp']).astimezone(tzlocal),
        issuedAt=datetime.datetime.fromtimestamp(claims['iat']).astimezone(tzlocal),
        name=claims['name'],
    )


async def getFoafAgent(req) -> URIRef | None:
    """this is special because fingerprint needs to be able to send 
    x-foaf-agent AND we need to get agents the normal way from pomerium"""

    if 'X-Pomerium-Jwt-Assertion' in req.headers:
        agent = await getAgent(req)
        if agent:
            return agent.agent
    else:
        if 'x-foaf-agent' in req.headers:
            # we can trust fingerprint-unlocker to give us a x-foaf-agent derived from the fingerprint
            return URIRef(req.headers['x-foaf-agent'])

    return None