Mercurial > code > home > repos > homeauto
changeset 1583:b0608eb6e90c
dead code, sort reqs
author | drewp@bigasterisk.com |
---|---|
date | Sun, 29 Aug 2021 13:43:14 -0700 |
parents | 88fe4cabf781 |
children | 0ca3228abade |
files | service/mqtt_to_rdf/Dockerfile service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/requirements.txt |
diffstat | 3 files changed, 24 insertions(+), 22 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/Dockerfile Sun Aug 29 13:36:08 2021 -0700 +++ b/service/mqtt_to_rdf/Dockerfile Sun Aug 29 13:43:14 2021 -0700 @@ -3,20 +3,9 @@ WORKDIR /opt RUN echo 2021-08-26 && apt-get update -# RUN apt-get remove -y nodejs -# RUN apt-get install -y wget xz-utils && \ -# wget --output-document=node.tar.xz https://nodejs.org/dist/v14.15.3/node-v14.15.3-linux-x64.tar.xz && \ -# tar xf node.tar.xz && \ -# ln -s node*x64 nodejs - -# ENV PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/workspace/nodejs/bin -# RUN /opt/nodejs/bin/node /opt/nodejs/bin/npm install -g pnpm \ -# && ln -s /opt/nodejs/bin/node /usr/local/bin/node \ -# && ln -s /opt/nodejs/bin/pnpm /usr/local/bin/pnpm RUN apt-get install -y git COPY requirements.txt ./ RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt -RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip?v3' COPY package.json5 pnpm-lock.yaml ./ RUN pnpm install
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Sun Aug 29 13:36:08 2021 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Sun Aug 29 13:43:14 2021 -0700 @@ -8,6 +8,7 @@ from pathlib import Path from typing import Callable, Sequence, Set, Tuple, Union, cast from cyclone.util import ObjectDict +from rdflib.graph import ConjunctiveGraph from rx.core.typing import Mapper @@ -33,15 +34,18 @@ from twisted.internet import reactor, task from dataclasses import dataclass from button_events import button_events +from patch_cyclone_sse import patchCycloneSse ROOM = Namespace('http://projects.bigasterisk.com/room/') - +MESSAGES_SEEN = Counter('mqtt_messages_seen', '') collectors = {} import export_to_influxdb print(f'merge me back {export_to_influxdb}') +patchCycloneSse() + def appendLimit(lst, elem, n=10): del lst[:len(lst) - n + 1] @@ -168,8 +172,11 @@ class Rdfizer(StreamPipelineStep): def makeOutputStream(self, inStream: Observable) -> Observable: - outputQuadsSets = rx.combine_latest( - *[self.makeQuads(inStream, plan) for plan in self.config.objects(self.uri, ROOM['graphStatements'])]) + plans = list(self.config.objects(self.uri, ROOM['graphStatements'])) + log.debug(f'{self.uri=} has {len(plans)=}') + if not plans: + return rx.empty() + outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans]) return outputQuadsSets def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable: @@ -203,8 +210,8 @@ return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:') -def serializeWithNs(graph: PatchableGraph) -> bytes: - graph._graph.bind('', 'http://projects.bigasterisk.com/room/') +def serializeWithNs(graph: ConjunctiveGraph) -> bytes: + graph.bind('', ROOM) return cast(bytes, graph.serialize(format='n3')) @@ -248,6 +255,7 @@ convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)})) outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs) + outputQuadsSets.subscribe_(self.updateInflux) outputQuadsSets.subscribe_(self.updateMasterGraph) @@ -262,12 +270,17 @@ def countIncomingMessage(self, msg: bytes): self.debugPageData['messagesSeen'] += 1 + MESSAGES_SEEN.inc() appendLimit(self.debugSub['recentMessages'], { 't': truncTime(), 'msg': msg.decode('ascii'), }) + def updateInflux(self, newGraphs): + for g in newGraphs: + self.influxExport.exportToInflux(g) + def updateMasterGraph(self, newGraphs): newQuads = set.union(*newGraphs) g = graphFromQuads(newQuads) @@ -287,7 +300,7 @@ collectors[metric].labels(**tags).set(val) self.masterGraph.patchSubgraph(self.uri, g) - self.debugSub['currentOutputGraph']['n3'] = cast(bytes, self.masterGraph.serialize(format='n3')).decode('utf8') + self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(g).decode('utf8') class Metrics(cyclone.web.RequestHandler):
--- a/service/mqtt_to_rdf/requirements.txt Sun Aug 29 13:36:08 2021 -0700 +++ b/service/mqtt_to_rdf/requirements.txt Sun Aug 29 13:43:14 2021 -0700 @@ -1,15 +1,15 @@ cyclone==1.3 +docopt +influxdb==5.3.1 +prometheus_client==0.11.0 rdflib-jsonld==0.5.0 rdflib==4.2.2 +rx==3.2.0 service_identity==21.1.0 twisted-mqtt==0.3.9 -rx==3.2.0 -docopt -prometheus_client==0.11.0 -influxdb==5.3.1 cycloneerr -#export_to_influxdb==0.4.0 +export_to_influxdb==0.5.0 mqtt_client==0.9.0 patchablegraph==0.11.0 rdfdb==0.21.0