annotate patchsink.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents 032e59be8fe9
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
1 """sends patches out an SSE response to a collector client who wants a collection of graphs"""
12
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
2 import time
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
3 from typing import Dict, Any, Optional
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
4 import asyncio
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
5 from starlette.requests import Request
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
6 from sse_starlette.sse import EventSourceResponse
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
7 import logging
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
8 import queue
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
9 log = logging.getLogger('writer')
12
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
10
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
11 async def iq(q):
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
12 while True:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
13 elem = await q.get()
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
14 yield elem
12
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
15
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
16 class PatchSinkResponse(EventSourceResponse):
12
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
17
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
18 def __init__(
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
19 self,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
20
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
21 status_code: int = 200,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
22 headers: Optional[Dict] = None,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
23 ping: Optional[int] = None,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
24 user_agent="",
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
25 stream_id: Optional[str] = None,
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
26 ) -> None:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
27 self.q = asyncio.Queue()
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
28 EventSourceResponse.__init__(self, iq(self.q), status_code, headers, ping=ping)
12
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
29 self.created = time.time()
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
30 self.user_agent = user_agent
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
31 self.lastPatchSentTime = 0.0
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
32 self.streamId = stream_id
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
33
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
34 def sendEvent(self, message: str, event: str):
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
35 self.q.put_nowait(dict(data=message, event=event))
12
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
36
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
37 def state(self) -> Dict:
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
38 return {
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
39 'created': round(self.created, 2),
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
40 'ageHours': round((time.time() - self.created) / 3600, 2),
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
41 'streamId': self.streamId,
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
42 # 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
43 # 'foafAgent': self.request.headers.get('X-Foaf-Agent'),
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
44 # 'userAgent': self.request.headers.get('user-agent'),
12
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
45 }
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
46
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
47
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
48 async def PatchSink(request: Request) -> PatchSinkResponse:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
49 log.debug(f"PatchSink for {request.path_params['stream_id']=}")
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
50
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
51 ret= PatchSinkResponse( ping=30, user_agent=request.headers['user-agent'], stream_id=request.path_params['stream_id'])
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
52 request.app.state.graphClients.addSseHandler(ret)
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
53 return ret
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
54
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
55
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
56 # class PatchSink_old:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
57 # _handlerSerial = 0
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
58
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
59 # def __init__(self, application: cyclone.web.Application, request):
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
60 # cyclone.sse.SSEHandler.__init__(self, application, request)
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
61 # self.bound = False
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
62 # self.created = time.time()
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
63 # self.graphClients = self.settings.graphClients
12
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
64
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
65 # self._serial = PatchSink._handlerSerial
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
66 # PatchSink._handlerSerial += 1
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
67 # self.lastPatchSentTime: float = 0.0
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
68
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
69 # def __repr__(self) -> str:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
70 # return '<Handler #%s>' % self._serial
12
032e59be8fe9 refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff changeset
71
13
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
72 # def bind(self, *args, **kwargs):
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
73 # self.streamId = args[0]
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
74
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
75 # self.graphClients.addSseHandler(self)
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
76 # # If something goes wrong with addSseHandler, I don't want to
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
77 # # try removeSseHandler.
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
78 # self.bound = True
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
79
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
80 # def unbind(self) -> None:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
81 # if self.bound:
bfd95926be6e initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents: 12
diff changeset
82 # self.graphClients.removeSseHandler(self)