0
|
1 import datetime
|
|
2 import json
|
|
3 import logging
|
|
4 from typing import Any, Dict
|
|
5
|
|
6 import aiohttp
|
|
7 from attr import dataclass
|
|
8 from jwskate import JwkSet, Jwt
|
|
9 from rdflib import URIRef
|
|
10 from starlette.requests import Request
|
|
11
|
|
12 log = logging.getLogger(__name__)
|
|
13 tzlocal = datetime.datetime.now().astimezone().tzinfo
|
|
14
|
|
15 @dataclass
|
|
16 class Agent:
|
|
17 email: str
|
|
18 agent: URIRef
|
|
19 name: str
|
|
20 issuedAt: datetime.datetime
|
|
21 expiration: datetime.datetime
|
|
22
|
|
23 def asDict(self):
|
|
24 now = datetime.datetime.now(tzlocal)
|
|
25 return {
|
|
26 'email': self.email,
|
|
27 'agent': str(self.agent),
|
|
28 'name': self.name,
|
|
29 'issuedAt': {
|
|
30 't': self.issuedAt.isoformat(),
|
|
31 'secAgo': (now - self.issuedAt).total_seconds(),
|
|
32 },
|
|
33 'expiration': {
|
|
34 't': self.expiration.isoformat(),
|
|
35 'inSec': (self.expiration - now).total_seconds(),
|
|
36 },
|
|
37 'now': now.isoformat(),
|
|
38 }
|
|
39
|
|
40
|
|
41 def foafAgentFromAuthEmail(email: str) -> URIRef:
|
|
42 return URIRef({
|
|
43 'drewpca@gmail.com': 'http://bigasterisk.com/foaf.rdf#drewp',
|
|
44 'kelsimp@gmail.com': 'http://bigasterisk.com/kelsi/foaf.rdf#kelsi',
|
|
45 }[email])
|
|
46
|
|
47
|
|
48 _jwkset: JwkSet | None = None
|
|
49
|
|
50
|
|
51 async def loadJwkset() -> JwkSet:
|
|
52 global _jwkset
|
|
53 if _jwkset is None:
|
|
54 log.info("loading jwks.json")
|
|
55 async with aiohttp.ClientSession() as session:
|
|
56 async with session.get('https://authenticate2.bigasterisk.com/.well-known/pomerium/jwks.json') as response:
|
|
57 if response.status != 200:
|
|
58 raise ValueError(f'{response.status=}')
|
|
59
|
|
60 ct = json.loads(await response.text())
|
|
61 _jwkset = JwkSet(ct)
|
|
62 return _jwkset
|
|
63
|
|
64
|
|
65 async def getAgent(req: Request) -> Agent | None:
|
|
66 jwkset = await loadJwkset()
|
|
67 pomAssertion = req.headers.get('X-Pomerium-Jwt-Assertion', None)
|
|
68 if pomAssertion is None:
|
|
69 raise ValueError("missing X-Pomerium-Jwt-Assertion")
|
|
70 jwt = Jwt(pomAssertion)
|
|
71 jwt.validate(
|
|
72 jwkset['keys'][0], #??
|
|
73 algs=['ES256'],
|
|
74 issuer='authenticate2.bigasterisk.com',
|
|
75 audience='bigasterisk.com')
|
|
76 claims: Dict[str, Any] = jwt.claims
|
|
77 log.debug('claims=%r', claims)
|
|
78 if not claims['email']:
|
|
79 return None
|
|
80
|
|
81 return Agent(
|
|
82 email=claims['email'],
|
|
83 agent=foafAgentFromAuthEmail(claims['email']),
|
|
84 expiration=datetime.datetime.fromtimestamp(claims['exp']).astimezone(tzlocal),
|
|
85 issuedAt=datetime.datetime.fromtimestamp(claims['iat']).astimezone(tzlocal),
|
|
86 name=claims['name'],
|
|
87 )
|