diff collector.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents 032e59be8fe9
children
line wrap: on
line diff
--- a/collector.py	Fri Nov 25 20:58:08 2022 -0800
+++ b/collector.py	Sat Nov 26 14:13:51 2022 -0800
@@ -7,48 +7,29 @@
 - filter out unneeded stmts from the sources
 - give a time resolution and concatenate any patches that come faster than that res
 """
+import asyncio
 import json
 import logging
 import time
 from typing import Dict, List, Optional, Set, Union
 
-import cyclone.sse
-import cyclone.web
-from docopt import docopt
 from patchablegraph.patchablegraph import jsonFromPatch
-from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource
 from prometheus_client import Summary
-from prometheus_client.exposition import generate_latest
-from prometheus_client.registry import REGISTRY
 from rdfdb.patch import Patch
 from rdflib import Namespace, URIRef
-from standardservice.logsetup import enableTwistedLog, log
-from twisted.internet import defer, reactor
+
+from starlette.applications import Starlette
+from starlette.requests import Request
+from starlette.responses import JSONResponse
+from starlette.routing import Route
+from starlette_exporter import PrometheusMiddleware, handle_metrics
 
 from collector_config import config
 from merge import SourceUri, ActiveStatements, LocalStatements
-from patchsink import PatchSink
-
-import cyclone.sse
-def py3_sendEvent(self, message, event=None, eid=None, retry=None):
-
-    if isinstance(message, dict):
-        message = cyclone.sse.escape.json_encode(message)
-    if isinstance(message, str):
-        message = message.encode("utf-8")
-    assert isinstance(message, bytes)
-    if eid:
-        self.transport.write(b"id: %s\n" % eid)
-    if event:
-        self.transport.write(b"event: %s\n" % event)
-    if retry:
-        self.transport.write(b"retry: %s\n" % retry)
-    self.transport.write(b"data: %s\n\n" % message)
-
-
-cyclone.sse.SSEHandler.sendEvent = py3_sendEvent
-
-
+from patchsink import PatchSink, PatchSinkResponse
+from patchsource import PatchSource
+logging.basicConfig(level=logging.DEBUG)
+log=logging.getLogger()
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
 
@@ -57,14 +38,6 @@
 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls')
 
 
-class Metrics(cyclone.web.RequestHandler):
-
-    def get(self):
-        self.add_header('content-type', 'text/plain')
-        self.write(generate_latest(REGISTRY))
-
-
-
 class GraphClients(object):
     """
     All the active PatchSources and SSEHandlers
@@ -76,20 +49,20 @@
     """
 
     def __init__(self):
-        self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {}  # (COLLECTOR is not listed)
-        self.handlers: Set[PatchSink] = set()
+        self.clients: Dict[SourceUri, PatchSource] = {}  # (COLLECTOR is not listed)
+        self.handlers: Set[PatchSinkResponse] = set()
         self.statements: ActiveStatements = ActiveStatements()
 
         self._localStatements = LocalStatements(self._onPatch)
 
     def state(self) -> Dict:
         return {
-            'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']),
+            'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['url']),
             'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])),
             'statements': self.statements.state(),
         }
 
-    def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]:
+    def _sourcesForHandler(self, handler: PatchSinkResponse) -> List[SourceUri]:
         streamId = handler.streamId
         matches = [s for s in config['streams'] if s['id'] == streamId]
         if len(matches) != 1:
@@ -107,14 +80,14 @@
 
         self._sendUpdatePatch()
 
-        if log.isEnabledFor(logging.DEBUG):
+        if 0 and log.isEnabledFor(logging.DEBUG):
             self.statements.pprintTable()
 
         if source != COLLECTOR:
             self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived'])
 
     @SEND_UPDATE_PATCH_CALLS.time()
-    def _sendUpdatePatch(self, handler: Optional[PatchSink] = None):
+    def _sendUpdatePatch(self, handler: Optional[PatchSinkResponse] = None):
         """
         send a patch event out this handler to bring it up to date with
         self.statements
@@ -129,7 +102,7 @@
         # reduce loops here- prepare all patches at once
         for h in selected:
             period = .9
-            if 'Raspbian' in h.request.headers.get('user-agent', ''):
+            if 'Raspbian' in h.user_agent:
                 period = 5
             if h.lastPatchSentTime > now - period:
                 continue
@@ -142,12 +115,12 @@
                 # it up into multiple sends, although there's no
                 # guarantee at all since any single stmt could be any
                 # length.
-                h.sendEvent(message=jsonFromPatch(p), event=b'patch')
+                h.sendEvent(message=jsonFromPatch(p), event='patch')
                 h.lastPatchSentTime = now
             else:
                 log.debug('nothing to send to %s', h)
 
-    def addSseHandler(self, handler: PatchSink):
+    def addSseHandler(self, handler: PatchSinkResponse):
         log.info('addSseHandler %r %r', handler, handler.streamId)
 
         # fail early if id doesn't match
@@ -159,14 +132,14 @@
             if source not in self.clients and source != COLLECTOR:
                 log.debug('connect to patch source %s', source)
                 self._localStatements.setSourceState(source, ROOM['connect'])
-                self.clients[source] = ReconnectingPatchSource(source,
-                                                               listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph),
-                                                               reconnectSecs=10)
+                self.clients[source] = PatchSource(source,
+                                                   listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph),
+                                                   reconnectSecs=10)
         log.debug('bring new client up to date')
 
         self._sendUpdatePatch(handler)
 
-    def removeSseHandler(self, handler: PatchSink):
+    def removeSseHandler(self, handler: PatchSinkResponse):
         log.info('removeSseHandler %r', handler)
         self.statements.discardHandler(handler)
         for source in self._sourcesForHandler(handler):
@@ -204,50 +177,32 @@
                     garbage.add(stmt)
 
 
-class State(cyclone.web.RequestHandler):
-
-    @GET_STATE_CALLS.time()
-    def get(self) -> None:
-        try:
-            state = self.settings.graphClients.state()
-            msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>')
-            log.info(msg)
-            self.write(msg)
-        except Exception:
-            import traceback
-            traceback.print_exc()
-            raise
-
-
-class GraphList(cyclone.web.RequestHandler):
-
-    def get(self) -> None:
-        self.write(json.dumps(config['streams']))
+@GET_STATE_CALLS.time()
+def State(request: Request) -> JSONResponse:
+    state = request.app.state.graphClients.state()
+    msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>')
+    log.info(msg)
+    return JSONResponse({'graphClients': state})
 
 
-if __name__ == '__main__':
-    arg = docopt("""
-    Usage: sse_collector.py [options]
+def GraphList(request: Request) -> JSONResponse:
+    return JSONResponse(config['streams'])
 
-    -v   Verbose
-    -i  Info level only
-    """)
-
-    if True:
-        enableTwistedLog()
-        log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO)
-        defer.setDebugging(True)
-
+def main():
     graphClients = GraphClients()
 
-    reactor.listenTCP(
-        9072,
-        cyclone.web.Application(  #
-            handlers=[
-                (r'/state', State),
-                (r'/graph/', GraphList),
-                (r'/graph/(.+)', PatchSink),
-                (r'/metrics', Metrics),
-            ], graphClients=graphClients),
-        interface='::')
-    reactor.run()
+    app = Starlette(
+        debug=True,
+        routes=[
+            Route('/state', State),
+            Route('/graph/', GraphList),
+            Route('/graph/{stream_id:str}', PatchSink),
+        ])
+    app.state.graphClients = graphClients
+
+    app.add_middleware(PrometheusMiddleware, app_name='collector')
+    app.add_route("/metrics", handle_metrics)
+    return app
+
+
+app = main()
\ No newline at end of file