Mercurial > code > home > repos > homeauto
changeset 1726:7d3797ed6681
rough port to starlette and reactivex
author | drewp@bigasterisk.com |
---|---|
date | Tue, 20 Jun 2023 23:14:28 -0700 |
parents | 55ace62af7f9 |
children | 23e6154e6c11 |
files | service/mqtt_to_rdf/button_events.py service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/rdf_debug.py |
diffstat | 3 files changed, 96 insertions(+), 156 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/button_events.py Tue Jun 20 23:13:26 2023 -0700 +++ b/service/mqtt_to_rdf/button_events.py Tue Jun 20 23:14:28 2023 -0700 @@ -1,23 +1,27 @@ """given repeated RF codes for a button press, make interesting button events""" +import asyncio import time from typing import Any, Callable, List -from rx.core.typing import Disposable -from rx.core import Observable, typing -from rx.disposable import CompositeDisposable, SingleAssignmentDisposable, SerialDisposable -from rx.scheduler import TimeoutScheduler + from rdflib import Namespace +from reactivex import Observable +from reactivex.abc.disposable import DisposableBase +from reactivex.disposable import (CompositeDisposable, SerialDisposable, SingleAssignmentDisposable) +from reactivex.scheduler.eventloop import AsyncIOScheduler + ROOM = Namespace('http://projects.bigasterisk.com/room/') -def button_events(min_hold_sec: float, - release_after_sec: float, - scheduler=typing.Scheduler) -> Callable[[Observable], Observable]: +def button_events( + min_hold_sec: float, + release_after_sec: float, +) -> Callable[[Observable], Observable]: def op(source: Observable) -> Observable: - def subscribe(observer, scheduler_=None) -> Disposable: - _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() + def subscribe(observer, scheduler_=None) -> DisposableBase: + _scheduler = AsyncIOScheduler(asyncio.get_event_loop()) cancelable = SerialDisposable() button_state = [None] @@ -58,7 +62,7 @@ def on_completed() -> None: raise NotImplementedError - subscription = source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler_) + subscription = source.subscribe(on_next, on_error, on_completed, scheduler=scheduler_) return CompositeDisposable(subscription, cancelable) return Observable(subscribe)
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Tue Jun 20 23:13:26 2023 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Tue Jun 20 23:14:28 2023 -0700 @@ -1,52 +1,39 @@ """ Subscribe to mqtt topics; generate RDF statements. """ -import glob +import asyncio import json import logging -import os - -from rdfdb.patch import Patch - -from mqtt_message import graphFromMessage -import os import time from dataclasses import dataclass -from pathlib import Path -from typing import Callable, Set, Tuple, Union, cast +from typing import Callable, List, Set, Tuple, Union, cast +from mqttrx import MqttClient +from reactivex import Observable, empty, operators +import reactivex +from reactivex.scheduler.eventloop.asyncioscheduler import AsyncIOScheduler -import cyclone.sse -import cyclone.web -import export_to_influxdb -import prometheus_client -import rx -import rx.operators -import rx.scheduler.eventloop -from docopt import docopt -from export_to_influxdb import InfluxExporter -from mqtt_client import MqttClient -from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) -from prometheus_client import Counter, Gauge, Histogram, Summary -from prometheus_client.exposition import generate_latest -from prometheus_client.registry import REGISTRY +from patchablegraph import PatchableGraph +from patchablegraph.handler import GraphEvents, StaticGraph +from prometheus_client import Counter from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef from rdflib.graph import ConjunctiveGraph from rdflib.term import Node -from rx.core.observable.observable import Observable -from rx.core.typing import Mapper -from standardservice.logsetup import log, verboseLogging -from twisted.internet import reactor, task +from starlette.applications import Starlette +from starlette.routing import Route +from starlette.staticfiles import StaticFiles +from reactivex.typing import Mapper +from starlette_exporter import handle_metrics +from starlette_exporter.middleware import PrometheusMiddleware + +from button_events import button_events from inference import Inference -from button_events import button_events -from patch_cyclone_sse import patchCycloneSse +from mqtt_message import graphFromMessage +log = logging.getLogger() ROOM = Namespace('http://projects.bigasterisk.com/room/') MESSAGES_SEEN = Counter('mqtt_messages_seen', '') collectors = {} -patchCycloneSse() - - def logGraph(debug: Callable, label: str, graph: Graph): n3 = cast(bytes, graph.serialize(format="n3")) debug(label + ':\n' + n3.decode('utf8')) @@ -83,7 +70,7 @@ msg = json.loads(jsonBytes.decode('utf8')) return msg == required - outStream = rx.operators.filter(eq)(inStream) + outStream = operators.filter(eq)(inStream) else: outStream = inStream return outStream @@ -98,7 +85,7 @@ def getParser(self) -> Callable[[Observable], Observable]: parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser'])) func = self.getParserFunc(parserType) - return rx.operators.map(cast(Mapper, func)) + return operators.map(cast(Mapper, func)) def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]: if parserType == XSD.double: @@ -160,13 +147,12 @@ g = self.config if conv == ROOM['celsiusToFarenheit']: - return rx.operators.map(cast(Mapper, self.c2f)) + return operators.map(cast(Mapper, self.c2f)) elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython() - return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold) + return operators.filter(lambda value: cast(Literal, value).toPython() >= threshold) elif conv == ROOM['buttonPress']: - loop = rx.scheduler.eventloop.TwistedScheduler(reactor) - return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) + return button_events(min_hold_sec=1.0, release_after_sec=1.0) else: raise NotImplementedError(conv) @@ -180,11 +166,11 @@ plans = list(self.config.objects(self.uri, ROOM['graphStatements'])) log.debug(f'{self.uri=} has {len(plans)=}') if not plans: - return rx.empty() - outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans]) + return empty() + outputQuadsSets = reactivex.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans]) return outputQuadsSets - def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable: + def makeQuads(self, inStream: Observable, plan: Node) -> Observable: def quadsFromValue(valueNode): return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) @@ -192,16 +178,16 @@ def emptyQuads(element) -> Set[Tuple]: return set([]) - quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream) + quads = operators.map(cast(Mapper, quadsFromValue))(inStream) dur = self.config.value(plan, ROOM['statementLifetime']) if dur is not None: sec = parseDurationLiteral(dur) - loop = rx.scheduler.eventloop.TwistedScheduler(reactor) + loop = AsyncIOScheduler(asyncio.get_event_loop()) quads = quads.pipe( - rx.operators.debounce(sec, loop), - rx.operators.map(cast(Mapper, emptyQuads)), - rx.operators.merge(quads), + operators.debounce(sec, loop), + operators.map(cast(Mapper, emptyQuads)), + operators.merge(quads), ) return quads @@ -260,7 +246,7 @@ rawBytes: Observable = self.subscribeMqtt(self.mqttTopic) rawBytes.subscribe(on_next=self.countIncomingMessage) - rawBytes.subscribe_(self.onMessage) + rawBytes.subscribe(self.onMessage) def onMessage(self, raw: bytes): g = graphFromMessage(self.uri, self.mqttTopic, raw) @@ -284,10 +270,6 @@ self.debugPageData['messagesSeen'] += 1 MESSAGES_SEEN.inc() - def updateInflux(self, newGraphs): - for g in newGraphs: - self.influxExport.exportToInflux(g) - def updateMasterGraph(self, newGraph): log.debug(f'{self.uri} update to {len(newGraph)} statements') @@ -310,64 +292,32 @@ self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True) -class Metrics(cyclone.web.RequestHandler): - def get(self): - self.add_header('content-type', 'text/plain') - self.write(generate_latest(REGISTRY)) - - -class DebugPageData(cyclone.sse.SSEHandler): - - def __init__(self, application, request): - cyclone.sse.SSEHandler.__init__(self, application, request) - self.lastSent = None +# class DebugPageData(cyclone.sse.SSEHandler): - def watch(self): - try: - dpd = self.settings.debugPageData - js = json.dumps(dpd, sort_keys=True) - if js != self.lastSent: - log.debug('sending dpd update') - self.sendEvent(message=js.encode('utf8')) - self.lastSent = js - except Exception: - import traceback - traceback.print_exc() - - def bind(self): - self.loop = task.LoopingCall(self.watch) - self.loop.start(1, now=True) +# def __init__(self, application, request): +# cyclone.sse.SSEHandler.__init__(self, application, request) +# self.lastSent = None - def unbind(self): - self.loop.stop() - - -# @dataclass -# class WatchFiles: -# # could be merged with rdfdb.service and GraphFile code -# globPattern: str -# outGraph: Graph - -# def __post_init__(self): -# self.lastUpdate = 0 -# task.LoopingCall(self.refresh).start(1) -# log.info(f'start watching {self.globPattern}') +# def watch(self): +# try: +# dpd = self.settings.debugPageData +# js = json.dumps(dpd, sort_keys=True) +# if js != self.lastSent: +# log.debug('sending dpd update') +# self.sendEvent(message=js.encode('utf8')) +# self.lastSent = js +# except Exception: +# import traceback +# traceback.print_exc() -# def refresh(self): -# files = glob.glob(self.globPattern) -# for fn in files: -# if os.path.getmtime(fn) > self.lastUpdate: -# break -# else: -# return -# self.lastUpdate = time.time() -# self.outGraph.remove((None, None, None)) -# log.info('reread config') -# for fn in files: -# # todo: handle read errors -# self.outGraph.parse(fn, format='n3') -# # and notify this change,so we can recalc the latest output +# def bind(self): +# self.loop = task.LoopingCall(self.watch) +# self.loop.start(1, now=True) + +# def unbind(self): +# self.loop.stop() + class RunState: @@ -392,12 +342,15 @@ expandedConfigPatchableCopy.setToGraph(ecWithQuads) self.srcs = [] - for src in sorted(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])): + srcs = cast(List[URIRef], list(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource']))) + srcs.sort(key=str) + for src in srcs: log.info(f'setup source {src=}') try: self.srcs.append( MqttStatementSource(src, self.topicFromConfig(self.expandedConfig, src), - masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData, + masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, + debugPageData={},#debugPageData, # influxExport=influxExport, inference=inference)) except EmptyTopicError: @@ -408,24 +361,15 @@ topicParts = list(config.items(config.value(src, ROOM['mqttTopic']))) return b'/'.join(t.encode('ascii') for t in topicParts) - -if __name__ == '__main__': - arg = docopt(""" - Usage: mqtt_to_rdf.py [options] - - -v Verbose - --cs=STR Only process config filenames with this substring - """) - verboseLogging(arg['-v']) - logging.getLogger('mqtt').setLevel(logging.INFO) - logging.getLogger('mqtt_client').setLevel(logging.INFO) - logging.getLogger('infer').setLevel(logging.INFO) - logging.getLogger('cbind').setLevel(logging.INFO) - # log.setLevel(logging.DEBUG) +def main(): + + logging.getLogger('mqtt').setLevel(logging.DEBUG) + logging.getLogger('mqtt_client').setLevel(logging.DEBUG) + logging.getLogger('infer').setLevel(logging.DEBUG) + logging.getLogger('cbind').setLevel(logging.DEBUG) + log.setLevel(logging.DEBUG) log.info('log start') - # config = ConjunctiveGraph() - # watcher = WatchFiles('conf/rules.n3', config) masterGraph = PatchableGraph() inference = Inference() @@ -435,9 +379,6 @@ mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) - # needs rework since the config can change: - # influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) - debugPageData = { # schema in index.ts 'server': f'{brokerHost}:{brokerPort}', @@ -451,23 +392,18 @@ runState = RunState(expandedConfigPatchableCopy, masterGraph, mqtt, internalMqtt, inference) - port = 10018 - reactor.listenTCP(port, - cyclone.web.Application([ - (r"/()", cyclone.web.StaticFileHandler, {"path": ".", "default_filename": "index.html"}), - (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}), - (r"/graph/config", CycloneGraphHandler, {'masterGraph': expandedConfigPatchableCopy}), - (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), - (r"/graph/mqtt/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), - (r'/debugPageData', DebugPageData), - (r'/metrics', Metrics), - ], - # mqtt=mqtt, - # internalMqtt=internalMqtt, - # masterGraph=masterGraph, - debugPageData=debugPageData, - debug=arg['-v']), - interface='::') - log.info('serving on %s', port) - reactor.run() + app = Starlette( + routes=[ + Route("/", StaticFiles(directory='.'), name='index.html'), + # Route("/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}), + Route("/graph/config", StaticGraph(expandedConfigPatchableCopy)), + Route("/graph/mqtt", StaticGraph(masterGraph)), + Route("/graph/mqtt/events", GraphEvents(masterGraph)), + # Route('/debugPageData', DebugPageData), + ]) + + app.add_middleware(PrometheusMiddleware, app_name='environment') + app.add_route("/metrics", handle_metrics) + return app +app = main() \ No newline at end of file
--- a/service/mqtt_to_rdf/rdf_debug.py Tue Jun 20 23:13:26 2023 -0700 +++ b/service/mqtt_to_rdf/rdf_debug.py Tue Jun 20 23:14:28 2023 -0700 @@ -22,7 +22,7 @@ g = g2 g.bind('', ROOM) g.bind('ex', Namespace('http://example.com/')) - lines = cast(bytes, g.serialize(format='n3')).decode('utf8').splitlines() + lines = g.serialize(format='n3').splitlines() lines = [line for line in lines if not line.startswith('@prefix')] if oneLine: lines = [line.strip() for line in lines]