view patchsink.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents 032e59be8fe9
children
line wrap: on
line source

"""sends patches out an SSE response to a collector client who wants a collection of graphs"""
import time
from typing import Dict, Any, Optional
import asyncio
from starlette.requests import Request
from sse_starlette.sse import EventSourceResponse
import logging
import queue
log = logging.getLogger('writer')

async def iq(q):
    while True:
        elem = await q.get()
        yield elem

class PatchSinkResponse(EventSourceResponse):

    def __init__(
        self,
        
        status_code: int = 200,
        headers: Optional[Dict] = None,
        ping: Optional[int] = None,
        user_agent="",
        stream_id: Optional[str] = None,
    ) -> None:
        self.q = asyncio.Queue()
        EventSourceResponse.__init__(self, iq(self.q), status_code, headers, ping=ping)
        self.created = time.time()
        self.user_agent = user_agent
        self.lastPatchSentTime = 0.0
        self.streamId = stream_id
        
    def sendEvent(self, message: str, event: str):
        self.q.put_nowait(dict(data=message, event=event))

    def state(self) -> Dict:
        return {
            'created': round(self.created, 2),
            'ageHours': round((time.time() - self.created) / 3600, 2),
            'streamId': self.streamId,
            # 'remoteIp': self.request.remote_ip,  # wrong, need some forwarded-for thing
            # 'foafAgent': self.request.headers.get('X-Foaf-Agent'),
            # 'userAgent': self.request.headers.get('user-agent'),
        }


async def PatchSink(request: Request) -> PatchSinkResponse:
    log.debug(f"PatchSink for {request.path_params['stream_id']=}")

    ret= PatchSinkResponse( ping=30, user_agent=request.headers['user-agent'], stream_id=request.path_params['stream_id'])
    request.app.state.graphClients.addSseHandler(ret)
    return ret


# class PatchSink_old:
#     _handlerSerial = 0

#     def __init__(self, application: cyclone.web.Application, request):
#         cyclone.sse.SSEHandler.__init__(self, application, request)
#         self.bound = False
#         self.created = time.time()
#         self.graphClients = self.settings.graphClients

#         self._serial = PatchSink._handlerSerial
#         PatchSink._handlerSerial += 1
#         self.lastPatchSentTime: float = 0.0

#     def __repr__(self) -> str:
#         return '<Handler #%s>' % self._serial

#     def bind(self, *args, **kwargs):
#         self.streamId = args[0]

#         self.graphClients.addSseHandler(self)
#         # If something goes wrong with addSseHandler, I don't want to
#         # try removeSseHandler.
#         self.bound = True

#     def unbind(self) -> None:
#         if self.bound:
#             self.graphClients.removeSseHandler(self)