Mercurial > code > home > repos > collector
diff patchsink.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | 032e59be8fe9 |
children |
line wrap: on
line diff
--- a/patchsink.py Fri Nov 25 20:58:08 2022 -0800 +++ b/patchsink.py Sat Nov 26 14:13:51 2022 -0800 @@ -1,43 +1,82 @@ +"""sends patches out an SSE response to a collector client who wants a collection of graphs""" import time -from typing import Dict +from typing import Dict, Any, Optional +import asyncio +from starlette.requests import Request +from sse_starlette.sse import EventSourceResponse +import logging +import queue +log = logging.getLogger('writer') -import cyclone.sse -import cyclone.web +async def iq(q): + while True: + elem = await q.get() + yield elem -class PatchSink(cyclone.sse.SSEHandler): - _handlerSerial = 0 +class PatchSinkResponse(EventSourceResponse): - def __init__(self, application: cyclone.web.Application, request): - cyclone.sse.SSEHandler.__init__(self, application, request) - self.bound = False + def __init__( + self, + + status_code: int = 200, + headers: Optional[Dict] = None, + ping: Optional[int] = None, + user_agent="", + stream_id: Optional[str] = None, + ) -> None: + self.q = asyncio.Queue() + EventSourceResponse.__init__(self, iq(self.q), status_code, headers, ping=ping) self.created = time.time() - self.graphClients = self.settings.graphClients - - self._serial = PatchSink._handlerSerial - PatchSink._handlerSerial += 1 - self.lastPatchSentTime: float = 0.0 - - def __repr__(self) -> str: - return '<Handler #%s>' % self._serial + self.user_agent = user_agent + self.lastPatchSentTime = 0.0 + self.streamId = stream_id + + def sendEvent(self, message: str, event: str): + self.q.put_nowait(dict(data=message, event=event)) def state(self) -> Dict: return { 'created': round(self.created, 2), 'ageHours': round((time.time() - self.created) / 3600, 2), 'streamId': self.streamId, - 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing - 'foafAgent': self.request.headers.get('X-Foaf-Agent'), - 'userAgent': self.request.headers.get('user-agent'), + # 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing + # 'foafAgent': self.request.headers.get('X-Foaf-Agent'), + # 'userAgent': self.request.headers.get('user-agent'), } - def bind(self, *args, **kwargs): - self.streamId = args[0] + +async def PatchSink(request: Request) -> PatchSinkResponse: + log.debug(f"PatchSink for {request.path_params['stream_id']=}") + + ret= PatchSinkResponse( ping=30, user_agent=request.headers['user-agent'], stream_id=request.path_params['stream_id']) + request.app.state.graphClients.addSseHandler(ret) + return ret + + +# class PatchSink_old: +# _handlerSerial = 0 + +# def __init__(self, application: cyclone.web.Application, request): +# cyclone.sse.SSEHandler.__init__(self, application, request) +# self.bound = False +# self.created = time.time() +# self.graphClients = self.settings.graphClients - self.graphClients.addSseHandler(self) - # If something goes wrong with addSseHandler, I don't want to - # try removeSseHandler. - self.bound = True +# self._serial = PatchSink._handlerSerial +# PatchSink._handlerSerial += 1 +# self.lastPatchSentTime: float = 0.0 + +# def __repr__(self) -> str: +# return '<Handler #%s>' % self._serial - def unbind(self) -> None: - if self.bound: - self.graphClients.removeSseHandler(self) +# def bind(self, *args, **kwargs): +# self.streamId = args[0] + +# self.graphClients.addSseHandler(self) +# # If something goes wrong with addSseHandler, I don't want to +# # try removeSseHandler. +# self.bound = True + +# def unbind(self) -> None: +# if self.bound: +# self.graphClients.removeSseHandler(self)