changeset 1726:7d3797ed6681

rough port to starlette and reactivex
author drewp@bigasterisk.com
date Tue, 20 Jun 2023 23:14:28 -0700
parents 55ace62af7f9
children 23e6154e6c11
files service/mqtt_to_rdf/button_events.py service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/rdf_debug.py
diffstat 3 files changed, 96 insertions(+), 156 deletions(-) [+]
line wrap: on
line diff
--- a/service/mqtt_to_rdf/button_events.py	Tue Jun 20 23:13:26 2023 -0700
+++ b/service/mqtt_to_rdf/button_events.py	Tue Jun 20 23:14:28 2023 -0700
@@ -1,23 +1,27 @@
 """given repeated RF codes for a button press, make interesting button events"""
 
+import asyncio
 import time
 from typing import Any, Callable, List
-from rx.core.typing import Disposable
-from rx.core import Observable, typing
-from rx.disposable import CompositeDisposable, SingleAssignmentDisposable, SerialDisposable
-from rx.scheduler import TimeoutScheduler
+
 from rdflib import Namespace
+from reactivex import Observable
+from reactivex.abc.disposable import DisposableBase
+from reactivex.disposable import (CompositeDisposable, SerialDisposable, SingleAssignmentDisposable)
+from reactivex.scheduler.eventloop import AsyncIOScheduler
+
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
 
-def button_events(min_hold_sec: float,
-                  release_after_sec: float,
-                  scheduler=typing.Scheduler) -> Callable[[Observable], Observable]:
+def button_events(
+    min_hold_sec: float,
+    release_after_sec: float,
+) -> Callable[[Observable], Observable]:
 
     def op(source: Observable) -> Observable:
 
-        def subscribe(observer, scheduler_=None) -> Disposable:
-            _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
+        def subscribe(observer, scheduler_=None) -> DisposableBase:
+            _scheduler = AsyncIOScheduler(asyncio.get_event_loop())
             cancelable = SerialDisposable()
             button_state = [None]
 
@@ -58,7 +62,7 @@
             def on_completed() -> None:
                 raise NotImplementedError
 
-            subscription = source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler_)
+            subscription = source.subscribe(on_next, on_error, on_completed, scheduler=scheduler_)
             return CompositeDisposable(subscription, cancelable)
 
         return Observable(subscribe)
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Tue Jun 20 23:13:26 2023 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Tue Jun 20 23:14:28 2023 -0700
@@ -1,52 +1,39 @@
 """
 Subscribe to mqtt topics; generate RDF statements.
 """
-import glob
+import asyncio
 import json
 import logging
-import os
-
-from rdfdb.patch import Patch
-
-from mqtt_message import graphFromMessage
-import os
 import time
 from dataclasses import dataclass
-from pathlib import Path
-from typing import Callable, Set, Tuple, Union, cast
+from typing import Callable, List, Set, Tuple, Union, cast
+from mqttrx import MqttClient
+from reactivex import Observable, empty, operators
+import reactivex
+from reactivex.scheduler.eventloop.asyncioscheduler import AsyncIOScheduler
 
-import cyclone.sse
-import cyclone.web
-import export_to_influxdb
-import prometheus_client
-import rx
-import rx.operators
-import rx.scheduler.eventloop
-from docopt import docopt
-from export_to_influxdb import InfluxExporter
-from mqtt_client import MqttClient
-from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
-from prometheus_client import Counter, Gauge, Histogram, Summary
-from prometheus_client.exposition import generate_latest
-from prometheus_client.registry import REGISTRY
+from patchablegraph import PatchableGraph
+from patchablegraph.handler import GraphEvents, StaticGraph
+from prometheus_client import Counter
 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
 from rdflib.graph import ConjunctiveGraph
 from rdflib.term import Node
-from rx.core.observable.observable import Observable
-from rx.core.typing import Mapper
-from standardservice.logsetup import log, verboseLogging
-from twisted.internet import reactor, task
+from starlette.applications import Starlette
+from starlette.routing import Route
+from starlette.staticfiles import StaticFiles
+from reactivex.typing import Mapper
+from starlette_exporter import handle_metrics
+from starlette_exporter.middleware import PrometheusMiddleware
+
+from button_events import button_events
 from inference import Inference
-from button_events import button_events
-from patch_cyclone_sse import patchCycloneSse
+from mqtt_message import graphFromMessage
 
+log = logging.getLogger()
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
 collectors = {}
 
-patchCycloneSse()
-
-
 def logGraph(debug: Callable, label: str, graph: Graph):
     n3 = cast(bytes, graph.serialize(format="n3"))
     debug(label + ':\n' + n3.decode('utf8'))
@@ -83,7 +70,7 @@
                 msg = json.loads(jsonBytes.decode('utf8'))
                 return msg == required
 
-            outStream = rx.operators.filter(eq)(inStream)
+            outStream = operators.filter(eq)(inStream)
         else:
             outStream = inStream
         return outStream
@@ -98,7 +85,7 @@
     def getParser(self) -> Callable[[Observable], Observable]:
         parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser']))
         func = self.getParserFunc(parserType)
-        return rx.operators.map(cast(Mapper, func))
+        return operators.map(cast(Mapper, func))
 
     def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]:
         if parserType == XSD.double:
@@ -160,13 +147,12 @@
         g = self.config
         if conv == ROOM['celsiusToFarenheit']:
 
-            return rx.operators.map(cast(Mapper, self.c2f))
+            return operators.map(cast(Mapper, self.c2f))
         elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
             threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython()
-            return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
+            return operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
         elif conv == ROOM['buttonPress']:
-            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
-            return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
+            return button_events(min_hold_sec=1.0, release_after_sec=1.0)
         else:
             raise NotImplementedError(conv)
 
@@ -180,11 +166,11 @@
         plans = list(self.config.objects(self.uri, ROOM['graphStatements']))
         log.debug(f'{self.uri=} has {len(plans)=}')
         if not plans:
-            return rx.empty()
-        outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
+            return empty()
+        outputQuadsSets = reactivex.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
         return outputQuadsSets
 
-    def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:
+    def makeQuads(self, inStream: Observable, plan: Node) -> Observable:
 
         def quadsFromValue(valueNode):
             return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
@@ -192,16 +178,16 @@
         def emptyQuads(element) -> Set[Tuple]:
             return set([])
 
-        quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream)
+        quads = operators.map(cast(Mapper, quadsFromValue))(inStream)
 
         dur = self.config.value(plan, ROOM['statementLifetime'])
         if dur is not None:
             sec = parseDurationLiteral(dur)
-            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
+            loop = AsyncIOScheduler(asyncio.get_event_loop())
             quads = quads.pipe(
-                rx.operators.debounce(sec, loop),
-                rx.operators.map(cast(Mapper, emptyQuads)),
-                rx.operators.merge(quads),
+                operators.debounce(sec, loop),
+                operators.map(cast(Mapper, emptyQuads)),
+                operators.merge(quads),
             )
 
         return quads
@@ -260,7 +246,7 @@
         rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
         rawBytes.subscribe(on_next=self.countIncomingMessage)
 
-        rawBytes.subscribe_(self.onMessage)
+        rawBytes.subscribe(self.onMessage)
 
     def onMessage(self, raw: bytes):
         g = graphFromMessage(self.uri, self.mqttTopic, raw)
@@ -284,10 +270,6 @@
         self.debugPageData['messagesSeen'] += 1
         MESSAGES_SEEN.inc()
 
-    def updateInflux(self, newGraphs):
-        for g in newGraphs:
-            self.influxExport.exportToInflux(g)
-
     def updateMasterGraph(self, newGraph):
         log.debug(f'{self.uri} update to {len(newGraph)} statements')
 
@@ -310,64 +292,32 @@
         self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
 
 
-class Metrics(cyclone.web.RequestHandler):
 
-    def get(self):
-        self.add_header('content-type', 'text/plain')
-        self.write(generate_latest(REGISTRY))
-
-
-class DebugPageData(cyclone.sse.SSEHandler):
-
-    def __init__(self, application, request):
-        cyclone.sse.SSEHandler.__init__(self, application, request)
-        self.lastSent = None
+# class DebugPageData(cyclone.sse.SSEHandler):
 
-    def watch(self):
-        try:
-            dpd = self.settings.debugPageData
-            js = json.dumps(dpd, sort_keys=True)
-            if js != self.lastSent:
-                log.debug('sending dpd update')
-                self.sendEvent(message=js.encode('utf8'))
-                self.lastSent = js
-        except Exception:
-            import traceback
-            traceback.print_exc()
-
-    def bind(self):
-        self.loop = task.LoopingCall(self.watch)
-        self.loop.start(1, now=True)
+#     def __init__(self, application, request):
+#         cyclone.sse.SSEHandler.__init__(self, application, request)
+#         self.lastSent = None
 
-    def unbind(self):
-        self.loop.stop()
-
-
-# @dataclass
-# class WatchFiles:
-#     # could be merged with rdfdb.service and GraphFile code
-#     globPattern: str
-#     outGraph: Graph
-
-#     def __post_init__(self):
-#         self.lastUpdate = 0
-#         task.LoopingCall(self.refresh).start(1)
-#         log.info(f'start watching {self.globPattern}')
+#     def watch(self):
+#         try:
+#             dpd = self.settings.debugPageData
+#             js = json.dumps(dpd, sort_keys=True)
+#             if js != self.lastSent:
+#                 log.debug('sending dpd update')
+#                 self.sendEvent(message=js.encode('utf8'))
+#                 self.lastSent = js
+#         except Exception:
+#             import traceback
+#             traceback.print_exc()
 
-#     def refresh(self):
-#         files = glob.glob(self.globPattern)
-#         for fn in files:
-#             if os.path.getmtime(fn) > self.lastUpdate:
-#                 break
-#         else:
-#             return
-#         self.lastUpdate = time.time()
-#         self.outGraph.remove((None, None, None))
-#         log.info('reread config')
-#         for fn in files:
-#             # todo: handle read errors
-#             self.outGraph.parse(fn, format='n3')
-#         # and notify this change,so we can recalc the latest output
+#     def bind(self):
+#         self.loop = task.LoopingCall(self.watch)
+#         self.loop.start(1, now=True)
+
+#     def unbind(self):
+#         self.loop.stop()
+
 
 
 class RunState:
@@ -392,12 +342,15 @@
         expandedConfigPatchableCopy.setToGraph(ecWithQuads)
 
         self.srcs = []
-        for src in sorted(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
+        srcs = cast(List[URIRef], list(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])))
+        srcs.sort(key=str)
+        for src in srcs:
             log.info(f'setup source {src=}')
             try:
                 self.srcs.append(
                     MqttStatementSource(src, self.topicFromConfig(self.expandedConfig, src),
-                                        masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData,
+                                        masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, 
+                                        debugPageData={},#debugPageData,
                                         # influxExport=influxExport, 
                                         inference=inference))
             except EmptyTopicError:
@@ -408,24 +361,15 @@
         topicParts = list(config.items(config.value(src, ROOM['mqttTopic'])))
         return b'/'.join(t.encode('ascii') for t in topicParts)
 
-
-if __name__ == '__main__':
-    arg = docopt("""
-    Usage: mqtt_to_rdf.py [options]
-
-    -v        Verbose
-    --cs=STR  Only process config filenames with this substring
-    """)
-    verboseLogging(arg['-v'])
-    logging.getLogger('mqtt').setLevel(logging.INFO)
-    logging.getLogger('mqtt_client').setLevel(logging.INFO)
-    logging.getLogger('infer').setLevel(logging.INFO)
-    logging.getLogger('cbind').setLevel(logging.INFO)
-    # log.setLevel(logging.DEBUG)
+def main():
+    
+    logging.getLogger('mqtt').setLevel(logging.DEBUG)
+    logging.getLogger('mqtt_client').setLevel(logging.DEBUG)
+    logging.getLogger('infer').setLevel(logging.DEBUG)
+    logging.getLogger('cbind').setLevel(logging.DEBUG)
+    log.setLevel(logging.DEBUG)
     log.info('log start')
 
-    # config = ConjunctiveGraph()
-    # watcher = WatchFiles('conf/rules.n3', config)
     masterGraph = PatchableGraph()
     inference = Inference()
 
@@ -435,9 +379,6 @@
     mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)  # deprecated
     internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
 
-    # needs rework since the config can change:
-    # influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
-
     debugPageData = {
         # schema in index.ts
         'server': f'{brokerHost}:{brokerPort}',
@@ -451,23 +392,18 @@
 
     runState = RunState(expandedConfigPatchableCopy, masterGraph, mqtt, internalMqtt, inference)
 
-    port = 10018
-    reactor.listenTCP(port,
-                      cyclone.web.Application([
-                          (r"/()", cyclone.web.StaticFileHandler, {"path": ".", "default_filename": "index.html"}),
-                          (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}),
-                          (r"/graph/config", CycloneGraphHandler, {'masterGraph': expandedConfigPatchableCopy}),
-                          (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}),
-                          (r"/graph/mqtt/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
-                          (r'/debugPageData', DebugPageData),
-                          (r'/metrics', Metrics),
-                      ],
-                                            #   mqtt=mqtt,
-                                            #   internalMqtt=internalMqtt,
-                                            #   masterGraph=masterGraph,
-                                              debugPageData=debugPageData,
-                                              debug=arg['-v']),
-                      interface='::')
-    log.info('serving on %s', port)
 
-    reactor.run()
+    app = Starlette(
+        routes=[
+                          Route("/", StaticFiles(directory='.'), name='index.html'),
+                        #   Route("/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}),
+                          Route("/graph/config", StaticGraph(expandedConfigPatchableCopy)),
+                          Route("/graph/mqtt", StaticGraph(masterGraph)),
+                          Route("/graph/mqtt/events", GraphEvents(masterGraph)),
+                        #   Route('/debugPageData', DebugPageData),
+        ])
+    
+    app.add_middleware(PrometheusMiddleware, app_name='environment')
+    app.add_route("/metrics", handle_metrics)
+    return app
+app = main()
\ No newline at end of file
--- a/service/mqtt_to_rdf/rdf_debug.py	Tue Jun 20 23:13:26 2023 -0700
+++ b/service/mqtt_to_rdf/rdf_debug.py	Tue Jun 20 23:14:28 2023 -0700
@@ -22,7 +22,7 @@
             g = g2
         g.bind('', ROOM)
         g.bind('ex', Namespace('http://example.com/'))
-        lines = cast(bytes, g.serialize(format='n3')).decode('utf8').splitlines()
+        lines = g.serialize(format='n3').splitlines()
         lines = [line for line in lines if not line.startswith('@prefix')]
         if oneLine:
             lines = [line.strip() for line in lines]