Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 1583:b0608eb6e90c
dead code, sort reqs
author | drewp@bigasterisk.com |
---|---|
date | Sun, 29 Aug 2021 13:43:14 -0700 |
parents | 6ddc5e037f15 |
children | 1c36ad1eb8b3 |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Sun Aug 29 13:36:08 2021 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Sun Aug 29 13:43:14 2021 -0700 @@ -8,6 +8,7 @@ from pathlib import Path from typing import Callable, Sequence, Set, Tuple, Union, cast from cyclone.util import ObjectDict +from rdflib.graph import ConjunctiveGraph from rx.core.typing import Mapper @@ -33,15 +34,18 @@ from twisted.internet import reactor, task from dataclasses import dataclass from button_events import button_events +from patch_cyclone_sse import patchCycloneSse ROOM = Namespace('http://projects.bigasterisk.com/room/') - +MESSAGES_SEEN = Counter('mqtt_messages_seen', '') collectors = {} import export_to_influxdb print(f'merge me back {export_to_influxdb}') +patchCycloneSse() + def appendLimit(lst, elem, n=10): del lst[:len(lst) - n + 1] @@ -168,8 +172,11 @@ class Rdfizer(StreamPipelineStep): def makeOutputStream(self, inStream: Observable) -> Observable: - outputQuadsSets = rx.combine_latest( - *[self.makeQuads(inStream, plan) for plan in self.config.objects(self.uri, ROOM['graphStatements'])]) + plans = list(self.config.objects(self.uri, ROOM['graphStatements'])) + log.debug(f'{self.uri=} has {len(plans)=}') + if not plans: + return rx.empty() + outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans]) return outputQuadsSets def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable: @@ -203,8 +210,8 @@ return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:') -def serializeWithNs(graph: PatchableGraph) -> bytes: - graph._graph.bind('', 'http://projects.bigasterisk.com/room/') +def serializeWithNs(graph: ConjunctiveGraph) -> bytes: + graph.bind('', ROOM) return cast(bytes, graph.serialize(format='n3')) @@ -248,6 +255,7 @@ convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)})) outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs) + outputQuadsSets.subscribe_(self.updateInflux) outputQuadsSets.subscribe_(self.updateMasterGraph) @@ -262,12 +270,17 @@ def countIncomingMessage(self, msg: bytes): self.debugPageData['messagesSeen'] += 1 + MESSAGES_SEEN.inc() appendLimit(self.debugSub['recentMessages'], { 't': truncTime(), 'msg': msg.decode('ascii'), }) + def updateInflux(self, newGraphs): + for g in newGraphs: + self.influxExport.exportToInflux(g) + def updateMasterGraph(self, newGraphs): newQuads = set.union(*newGraphs) g = graphFromQuads(newQuads) @@ -287,7 +300,7 @@ collectors[metric].labels(**tags).set(val) self.masterGraph.patchSubgraph(self.uri, g) - self.debugSub['currentOutputGraph']['n3'] = cast(bytes, self.masterGraph.serialize(format='n3')).decode('utf8') + self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(g).decode('utf8') class Metrics(cyclone.web.RequestHandler):