diff patchsink.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents 032e59be8fe9
children
line wrap: on
line diff
--- a/patchsink.py	Fri Nov 25 20:58:08 2022 -0800
+++ b/patchsink.py	Sat Nov 26 14:13:51 2022 -0800
@@ -1,43 +1,82 @@
+"""sends patches out an SSE response to a collector client who wants a collection of graphs"""
 import time
-from typing import Dict
+from typing import Dict, Any, Optional
+import asyncio
+from starlette.requests import Request
+from sse_starlette.sse import EventSourceResponse
+import logging
+import queue
+log = logging.getLogger('writer')
 
-import cyclone.sse
-import cyclone.web
+async def iq(q):
+    while True:
+        elem = await q.get()
+        yield elem
 
-class PatchSink(cyclone.sse.SSEHandler):
-    _handlerSerial = 0
+class PatchSinkResponse(EventSourceResponse):
 
-    def __init__(self, application: cyclone.web.Application, request):
-        cyclone.sse.SSEHandler.__init__(self, application, request)
-        self.bound = False
+    def __init__(
+        self,
+        
+        status_code: int = 200,
+        headers: Optional[Dict] = None,
+        ping: Optional[int] = None,
+        user_agent="",
+        stream_id: Optional[str] = None,
+    ) -> None:
+        self.q = asyncio.Queue()
+        EventSourceResponse.__init__(self, iq(self.q), status_code, headers, ping=ping)
         self.created = time.time()
-        self.graphClients = self.settings.graphClients
-
-        self._serial = PatchSink._handlerSerial
-        PatchSink._handlerSerial += 1
-        self.lastPatchSentTime: float = 0.0
-
-    def __repr__(self) -> str:
-        return '<Handler #%s>' % self._serial
+        self.user_agent = user_agent
+        self.lastPatchSentTime = 0.0
+        self.streamId = stream_id
+        
+    def sendEvent(self, message: str, event: str):
+        self.q.put_nowait(dict(data=message, event=event))
 
     def state(self) -> Dict:
         return {
             'created': round(self.created, 2),
             'ageHours': round((time.time() - self.created) / 3600, 2),
             'streamId': self.streamId,
-            'remoteIp': self.request.remote_ip,  # wrong, need some forwarded-for thing
-            'foafAgent': self.request.headers.get('X-Foaf-Agent'),
-            'userAgent': self.request.headers.get('user-agent'),
+            # 'remoteIp': self.request.remote_ip,  # wrong, need some forwarded-for thing
+            # 'foafAgent': self.request.headers.get('X-Foaf-Agent'),
+            # 'userAgent': self.request.headers.get('user-agent'),
         }
 
-    def bind(self, *args, **kwargs):
-        self.streamId = args[0]
+
+async def PatchSink(request: Request) -> PatchSinkResponse:
+    log.debug(f"PatchSink for {request.path_params['stream_id']=}")
+
+    ret= PatchSinkResponse( ping=30, user_agent=request.headers['user-agent'], stream_id=request.path_params['stream_id'])
+    request.app.state.graphClients.addSseHandler(ret)
+    return ret
+
+
+# class PatchSink_old:
+#     _handlerSerial = 0
+
+#     def __init__(self, application: cyclone.web.Application, request):
+#         cyclone.sse.SSEHandler.__init__(self, application, request)
+#         self.bound = False
+#         self.created = time.time()
+#         self.graphClients = self.settings.graphClients
 
-        self.graphClients.addSseHandler(self)
-        # If something goes wrong with addSseHandler, I don't want to
-        # try removeSseHandler.
-        self.bound = True
+#         self._serial = PatchSink._handlerSerial
+#         PatchSink._handlerSerial += 1
+#         self.lastPatchSentTime: float = 0.0
+
+#     def __repr__(self) -> str:
+#         return '<Handler #%s>' % self._serial
 
-    def unbind(self) -> None:
-        if self.bound:
-            self.graphClients.removeSseHandler(self)
+#     def bind(self, *args, **kwargs):
+#         self.streamId = args[0]
+
+#         self.graphClients.addSseHandler(self)
+#         # If something goes wrong with addSseHandler, I don't want to
+#         # try removeSseHandler.
+#         self.bound = True
+
+#     def unbind(self) -> None:
+#         if self.bound:
+#             self.graphClients.removeSseHandler(self)