diff get_agent.py @ 0:4365c72c59f6

start
author drewp@bigasterisk.com
date Sun, 27 Aug 2023 11:12:20 -0700
parents
children 76cec592435c
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/get_agent.py	Sun Aug 27 11:12:20 2023 -0700
@@ -0,0 +1,87 @@
+import datetime
+import json
+import logging
+from typing import Any, Dict
+
+import aiohttp
+from attr import dataclass
+from jwskate import JwkSet, Jwt
+from rdflib import URIRef
+from starlette.requests import Request
+
+log = logging.getLogger(__name__)
+tzlocal = datetime.datetime.now().astimezone().tzinfo
+
+@dataclass
+class Agent:
+    email: str
+    agent: URIRef
+    name: str
+    issuedAt: datetime.datetime
+    expiration: datetime.datetime
+
+    def asDict(self):
+        now = datetime.datetime.now(tzlocal)
+        return {
+            'email': self.email,
+            'agent': str(self.agent),
+            'name': self.name,
+            'issuedAt': {
+                't': self.issuedAt.isoformat(),
+                'secAgo': (now - self.issuedAt).total_seconds(),
+            },
+            'expiration': {
+                't': self.expiration.isoformat(),
+                'inSec': (self.expiration - now).total_seconds(),
+            },
+            'now': now.isoformat(),
+        }
+
+
+def foafAgentFromAuthEmail(email: str) -> URIRef:
+    return URIRef({
+        'drewpca@gmail.com': 'http://bigasterisk.com/foaf.rdf#drewp',
+        'kelsimp@gmail.com': 'http://bigasterisk.com/kelsi/foaf.rdf#kelsi',
+    }[email])
+
+
+_jwkset: JwkSet | None = None
+
+
+async def loadJwkset() -> JwkSet:
+    global _jwkset
+    if _jwkset is None:
+        log.info("loading jwks.json")
+        async with aiohttp.ClientSession() as session:
+            async with session.get('https://authenticate2.bigasterisk.com/.well-known/pomerium/jwks.json') as response:
+                if response.status != 200:
+                    raise ValueError(f'{response.status=}')
+
+                ct = json.loads(await response.text())
+                _jwkset = JwkSet(ct)
+    return _jwkset
+
+
+async def getAgent(req: Request) -> Agent | None:
+    jwkset = await loadJwkset()
+    pomAssertion = req.headers.get('X-Pomerium-Jwt-Assertion', None)
+    if pomAssertion is None:
+        raise ValueError("missing X-Pomerium-Jwt-Assertion")
+    jwt = Jwt(pomAssertion)
+    jwt.validate(
+        jwkset['keys'][0],  #??
+        algs=['ES256'],
+        issuer='authenticate2.bigasterisk.com',
+        audience='bigasterisk.com')
+    claims: Dict[str, Any] = jwt.claims
+    log.debug('claims=%r', claims)
+    if not claims['email']:
+        return None
+
+    return Agent(
+        email=claims['email'],
+        agent=foafAgentFromAuthEmail(claims['email']),
+        expiration=datetime.datetime.fromtimestamp(claims['exp']).astimezone(tzlocal),
+        issuedAt=datetime.datetime.fromtimestamp(claims['iat']).astimezone(tzlocal),
+        name=claims['name'],
+    )