Mercurial > code > home > repos > collector
view patchsink.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | 032e59be8fe9 |
children |
line wrap: on
line source
"""sends patches out an SSE response to a collector client who wants a collection of graphs""" import time from typing import Dict, Any, Optional import asyncio from starlette.requests import Request from sse_starlette.sse import EventSourceResponse import logging import queue log = logging.getLogger('writer') async def iq(q): while True: elem = await q.get() yield elem class PatchSinkResponse(EventSourceResponse): def __init__( self, status_code: int = 200, headers: Optional[Dict] = None, ping: Optional[int] = None, user_agent="", stream_id: Optional[str] = None, ) -> None: self.q = asyncio.Queue() EventSourceResponse.__init__(self, iq(self.q), status_code, headers, ping=ping) self.created = time.time() self.user_agent = user_agent self.lastPatchSentTime = 0.0 self.streamId = stream_id def sendEvent(self, message: str, event: str): self.q.put_nowait(dict(data=message, event=event)) def state(self) -> Dict: return { 'created': round(self.created, 2), 'ageHours': round((time.time() - self.created) / 3600, 2), 'streamId': self.streamId, # 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing # 'foafAgent': self.request.headers.get('X-Foaf-Agent'), # 'userAgent': self.request.headers.get('user-agent'), } async def PatchSink(request: Request) -> PatchSinkResponse: log.debug(f"PatchSink for {request.path_params['stream_id']=}") ret= PatchSinkResponse( ping=30, user_agent=request.headers['user-agent'], stream_id=request.path_params['stream_id']) request.app.state.graphClients.addSseHandler(ret) return ret # class PatchSink_old: # _handlerSerial = 0 # def __init__(self, application: cyclone.web.Application, request): # cyclone.sse.SSEHandler.__init__(self, application, request) # self.bound = False # self.created = time.time() # self.graphClients = self.settings.graphClients # self._serial = PatchSink._handlerSerial # PatchSink._handlerSerial += 1 # self.lastPatchSentTime: float = 0.0 # def __repr__(self) -> str: # return '<Handler #%s>' % self._serial # def bind(self, *args, **kwargs): # self.streamId = args[0] # self.graphClients.addSseHandler(self) # # If something goes wrong with addSseHandler, I don't want to # # try removeSseHandler. # self.bound = True # def unbind(self) -> None: # if self.bound: # self.graphClients.removeSseHandler(self)