0
|
1 import datetime
|
|
2 import json
|
|
3 import logging
|
|
4 from typing import Any, Dict
|
|
5
|
|
6 import aiohttp
|
|
7 from attr import dataclass
|
|
8 from jwskate import JwkSet, Jwt
|
|
9 from rdflib import URIRef
|
|
10 from starlette.requests import Request
|
|
11
|
|
12 log = logging.getLogger(__name__)
|
|
13 tzlocal = datetime.datetime.now().astimezone().tzinfo
|
|
14
|
2
|
15
|
0
|
16 @dataclass
|
|
17 class Agent:
|
|
18 email: str
|
|
19 agent: URIRef
|
|
20 name: str
|
|
21 issuedAt: datetime.datetime
|
|
22 expiration: datetime.datetime
|
|
23
|
|
24 def asDict(self):
|
|
25 now = datetime.datetime.now(tzlocal)
|
|
26 return {
|
|
27 'email': self.email,
|
|
28 'agent': str(self.agent),
|
|
29 'name': self.name,
|
|
30 'issuedAt': {
|
|
31 't': self.issuedAt.isoformat(),
|
|
32 'secAgo': (now - self.issuedAt).total_seconds(),
|
|
33 },
|
|
34 'expiration': {
|
|
35 't': self.expiration.isoformat(),
|
|
36 'inSec': (self.expiration - now).total_seconds(),
|
|
37 },
|
|
38 'now': now.isoformat(),
|
|
39 }
|
|
40
|
|
41
|
|
42 def foafAgentFromAuthEmail(email: str) -> URIRef:
|
|
43 return URIRef({
|
|
44 'drewpca@gmail.com': 'http://bigasterisk.com/foaf.rdf#drewp',
|
|
45 'kelsimp@gmail.com': 'http://bigasterisk.com/kelsi/foaf.rdf#kelsi',
|
|
46 }[email])
|
|
47
|
|
48
|
|
49 _jwkset: JwkSet | None = None
|
|
50
|
|
51
|
|
52 async def loadJwkset() -> JwkSet:
|
|
53 global _jwkset
|
|
54 if _jwkset is None:
|
|
55 log.info("loading jwks.json")
|
|
56 async with aiohttp.ClientSession() as session:
|
|
57 async with session.get('https://authenticate2.bigasterisk.com/.well-known/pomerium/jwks.json') as response:
|
|
58 if response.status != 200:
|
|
59 raise ValueError(f'{response.status=}')
|
|
60
|
|
61 ct = json.loads(await response.text())
|
|
62 _jwkset = JwkSet(ct)
|
|
63 return _jwkset
|
|
64
|
|
65
|
|
66 async def getAgent(req: Request) -> Agent | None:
|
|
67 jwkset = await loadJwkset()
|
|
68 pomAssertion = req.headers.get('X-Pomerium-Jwt-Assertion', None)
|
|
69 if pomAssertion is None:
|
|
70 raise ValueError("missing X-Pomerium-Jwt-Assertion")
|
|
71 jwt = Jwt(pomAssertion)
|
|
72 jwt.validate(
|
|
73 jwkset['keys'][0], #??
|
|
74 algs=['ES256'],
|
9
|
75 issuer='bigasterisk.com',
|
0
|
76 audience='bigasterisk.com')
|
|
77 claims: Dict[str, Any] = jwt.claims
|
|
78 log.debug('claims=%r', claims)
|
|
79 if not claims['email']:
|
|
80 return None
|
|
81
|
|
82 return Agent(
|
|
83 email=claims['email'],
|
|
84 agent=foafAgentFromAuthEmail(claims['email']),
|
|
85 expiration=datetime.datetime.fromtimestamp(claims['exp']).astimezone(tzlocal),
|
|
86 issuedAt=datetime.datetime.fromtimestamp(claims['iat']).astimezone(tzlocal),
|
|
87 name=claims['name'],
|
|
88 )
|
2
|
89
|
|
90
|
|
91 async def getFoafAgent(req) -> URIRef | None:
|
9
|
92 """this is special because fingerprint needs to be able to send
|
2
|
93 x-foaf-agent AND we need to get agents the normal way from pomerium"""
|
|
94
|
|
95 if 'X-Pomerium-Jwt-Assertion' in req.headers:
|
|
96 agent = await getAgent(req)
|
|
97 if agent:
|
|
98 return agent.agent
|
|
99 else:
|
|
100 if 'x-foaf-agent' in req.headers:
|
|
101 # we can trust fingerprint-unlocker to give us a x-foaf-agent derived from the fingerprint
|
|
102 return URIRef(req.headers['x-foaf-agent'])
|
|
103
|
|
104 return None
|