comparison patchsink.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents 032e59be8fe9
children
comparison
equal deleted inserted replaced
12:032e59be8fe9 13:bfd95926be6e
1 """sends patches out an SSE response to a collector client who wants a collection of graphs"""
1 import time 2 import time
2 from typing import Dict 3 from typing import Dict, Any, Optional
4 import asyncio
5 from starlette.requests import Request
6 from sse_starlette.sse import EventSourceResponse
7 import logging
8 import queue
9 log = logging.getLogger('writer')
3 10
4 import cyclone.sse 11 async def iq(q):
5 import cyclone.web 12 while True:
13 elem = await q.get()
14 yield elem
6 15
7 class PatchSink(cyclone.sse.SSEHandler): 16 class PatchSinkResponse(EventSourceResponse):
8 _handlerSerial = 0
9 17
10 def __init__(self, application: cyclone.web.Application, request): 18 def __init__(
11 cyclone.sse.SSEHandler.__init__(self, application, request) 19 self,
12 self.bound = False 20
21 status_code: int = 200,
22 headers: Optional[Dict] = None,
23 ping: Optional[int] = None,
24 user_agent="",
25 stream_id: Optional[str] = None,
26 ) -> None:
27 self.q = asyncio.Queue()
28 EventSourceResponse.__init__(self, iq(self.q), status_code, headers, ping=ping)
13 self.created = time.time() 29 self.created = time.time()
14 self.graphClients = self.settings.graphClients 30 self.user_agent = user_agent
15 31 self.lastPatchSentTime = 0.0
16 self._serial = PatchSink._handlerSerial 32 self.streamId = stream_id
17 PatchSink._handlerSerial += 1 33
18 self.lastPatchSentTime: float = 0.0 34 def sendEvent(self, message: str, event: str):
19 35 self.q.put_nowait(dict(data=message, event=event))
20 def __repr__(self) -> str:
21 return '<Handler #%s>' % self._serial
22 36
23 def state(self) -> Dict: 37 def state(self) -> Dict:
24 return { 38 return {
25 'created': round(self.created, 2), 39 'created': round(self.created, 2),
26 'ageHours': round((time.time() - self.created) / 3600, 2), 40 'ageHours': round((time.time() - self.created) / 3600, 2),
27 'streamId': self.streamId, 41 'streamId': self.streamId,
28 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing 42 # 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing
29 'foafAgent': self.request.headers.get('X-Foaf-Agent'), 43 # 'foafAgent': self.request.headers.get('X-Foaf-Agent'),
30 'userAgent': self.request.headers.get('user-agent'), 44 # 'userAgent': self.request.headers.get('user-agent'),
31 } 45 }
32 46
33 def bind(self, *args, **kwargs):
34 self.streamId = args[0]
35 47
36 self.graphClients.addSseHandler(self) 48 async def PatchSink(request: Request) -> PatchSinkResponse:
37 # If something goes wrong with addSseHandler, I don't want to 49 log.debug(f"PatchSink for {request.path_params['stream_id']=}")
38 # try removeSseHandler.
39 self.bound = True
40 50
41 def unbind(self) -> None: 51 ret= PatchSinkResponse( ping=30, user_agent=request.headers['user-agent'], stream_id=request.path_params['stream_id'])
42 if self.bound: 52 request.app.state.graphClients.addSseHandler(ret)
43 self.graphClients.removeSseHandler(self) 53 return ret
54
55
56 # class PatchSink_old:
57 # _handlerSerial = 0
58
59 # def __init__(self, application: cyclone.web.Application, request):
60 # cyclone.sse.SSEHandler.__init__(self, application, request)
61 # self.bound = False
62 # self.created = time.time()
63 # self.graphClients = self.settings.graphClients
64
65 # self._serial = PatchSink._handlerSerial
66 # PatchSink._handlerSerial += 1
67 # self.lastPatchSentTime: float = 0.0
68
69 # def __repr__(self) -> str:
70 # return '<Handler #%s>' % self._serial
71
72 # def bind(self, *args, **kwargs):
73 # self.streamId = args[0]
74
75 # self.graphClients.addSseHandler(self)
76 # # If something goes wrong with addSseHandler, I don't want to
77 # # try removeSseHandler.
78 # self.bound = True
79
80 # def unbind(self) -> None:
81 # if self.bound:
82 # self.graphClients.removeSseHandler(self)