Mercurial > code > home > repos > collector
comparison patchsink.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | 032e59be8fe9 |
children |
comparison
equal
deleted
inserted
replaced
12:032e59be8fe9 | 13:bfd95926be6e |
---|---|
1 """sends patches out an SSE response to a collector client who wants a collection of graphs""" | |
1 import time | 2 import time |
2 from typing import Dict | 3 from typing import Dict, Any, Optional |
4 import asyncio | |
5 from starlette.requests import Request | |
6 from sse_starlette.sse import EventSourceResponse | |
7 import logging | |
8 import queue | |
9 log = logging.getLogger('writer') | |
3 | 10 |
4 import cyclone.sse | 11 async def iq(q): |
5 import cyclone.web | 12 while True: |
13 elem = await q.get() | |
14 yield elem | |
6 | 15 |
7 class PatchSink(cyclone.sse.SSEHandler): | 16 class PatchSinkResponse(EventSourceResponse): |
8 _handlerSerial = 0 | |
9 | 17 |
10 def __init__(self, application: cyclone.web.Application, request): | 18 def __init__( |
11 cyclone.sse.SSEHandler.__init__(self, application, request) | 19 self, |
12 self.bound = False | 20 |
21 status_code: int = 200, | |
22 headers: Optional[Dict] = None, | |
23 ping: Optional[int] = None, | |
24 user_agent="", | |
25 stream_id: Optional[str] = None, | |
26 ) -> None: | |
27 self.q = asyncio.Queue() | |
28 EventSourceResponse.__init__(self, iq(self.q), status_code, headers, ping=ping) | |
13 self.created = time.time() | 29 self.created = time.time() |
14 self.graphClients = self.settings.graphClients | 30 self.user_agent = user_agent |
15 | 31 self.lastPatchSentTime = 0.0 |
16 self._serial = PatchSink._handlerSerial | 32 self.streamId = stream_id |
17 PatchSink._handlerSerial += 1 | 33 |
18 self.lastPatchSentTime: float = 0.0 | 34 def sendEvent(self, message: str, event: str): |
19 | 35 self.q.put_nowait(dict(data=message, event=event)) |
20 def __repr__(self) -> str: | |
21 return '<Handler #%s>' % self._serial | |
22 | 36 |
23 def state(self) -> Dict: | 37 def state(self) -> Dict: |
24 return { | 38 return { |
25 'created': round(self.created, 2), | 39 'created': round(self.created, 2), |
26 'ageHours': round((time.time() - self.created) / 3600, 2), | 40 'ageHours': round((time.time() - self.created) / 3600, 2), |
27 'streamId': self.streamId, | 41 'streamId': self.streamId, |
28 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing | 42 # 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing |
29 'foafAgent': self.request.headers.get('X-Foaf-Agent'), | 43 # 'foafAgent': self.request.headers.get('X-Foaf-Agent'), |
30 'userAgent': self.request.headers.get('user-agent'), | 44 # 'userAgent': self.request.headers.get('user-agent'), |
31 } | 45 } |
32 | 46 |
33 def bind(self, *args, **kwargs): | |
34 self.streamId = args[0] | |
35 | 47 |
36 self.graphClients.addSseHandler(self) | 48 async def PatchSink(request: Request) -> PatchSinkResponse: |
37 # If something goes wrong with addSseHandler, I don't want to | 49 log.debug(f"PatchSink for {request.path_params['stream_id']=}") |
38 # try removeSseHandler. | |
39 self.bound = True | |
40 | 50 |
41 def unbind(self) -> None: | 51 ret= PatchSinkResponse( ping=30, user_agent=request.headers['user-agent'], stream_id=request.path_params['stream_id']) |
42 if self.bound: | 52 request.app.state.graphClients.addSseHandler(ret) |
43 self.graphClients.removeSseHandler(self) | 53 return ret |
54 | |
55 | |
56 # class PatchSink_old: | |
57 # _handlerSerial = 0 | |
58 | |
59 # def __init__(self, application: cyclone.web.Application, request): | |
60 # cyclone.sse.SSEHandler.__init__(self, application, request) | |
61 # self.bound = False | |
62 # self.created = time.time() | |
63 # self.graphClients = self.settings.graphClients | |
64 | |
65 # self._serial = PatchSink._handlerSerial | |
66 # PatchSink._handlerSerial += 1 | |
67 # self.lastPatchSentTime: float = 0.0 | |
68 | |
69 # def __repr__(self) -> str: | |
70 # return '<Handler #%s>' % self._serial | |
71 | |
72 # def bind(self, *args, **kwargs): | |
73 # self.streamId = args[0] | |
74 | |
75 # self.graphClients.addSseHandler(self) | |
76 # # If something goes wrong with addSseHandler, I don't want to | |
77 # # try removeSseHandler. | |
78 # self.bound = True | |
79 | |
80 # def unbind(self) -> None: | |
81 # if self.bound: | |
82 # self.graphClients.removeSseHandler(self) |