changeset 1583:b0608eb6e90c

dead code, sort reqs
author drewp@bigasterisk.com
date Sun, 29 Aug 2021 13:43:14 -0700
parents 88fe4cabf781
children 0ca3228abade
files service/mqtt_to_rdf/Dockerfile service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/requirements.txt
diffstat 3 files changed, 24 insertions(+), 22 deletions(-) [+]
line wrap: on
line diff
--- a/service/mqtt_to_rdf/Dockerfile	Sun Aug 29 13:36:08 2021 -0700
+++ b/service/mqtt_to_rdf/Dockerfile	Sun Aug 29 13:43:14 2021 -0700
@@ -3,20 +3,9 @@
 WORKDIR /opt
 
 RUN echo 2021-08-26 && apt-get update
-# RUN apt-get remove -y nodejs
-# RUN apt-get install -y wget xz-utils && \
-#     wget --output-document=node.tar.xz https://nodejs.org/dist/v14.15.3/node-v14.15.3-linux-x64.tar.xz && \
-#     tar xf node.tar.xz && \
-#     ln -s node*x64 nodejs
-
-# ENV PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/workspace/nodejs/bin
-# RUN /opt/nodejs/bin/node /opt/nodejs/bin/npm install -g pnpm \
-#     && ln -s /opt/nodejs/bin/node /usr/local/bin/node \
-#     && ln -s /opt/nodejs/bin/pnpm /usr/local/bin/pnpm
 RUN apt-get install -y git
 COPY requirements.txt ./
 RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt
-RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip?v3'
 
 COPY package.json5 pnpm-lock.yaml  ./
 RUN pnpm install
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Sun Aug 29 13:36:08 2021 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Sun Aug 29 13:43:14 2021 -0700
@@ -8,6 +8,7 @@
 from pathlib import Path
 from typing import Callable, Sequence, Set, Tuple, Union, cast
 from cyclone.util import ObjectDict
+from rdflib.graph import ConjunctiveGraph
 
 from rx.core.typing import Mapper
 
@@ -33,15 +34,18 @@
 from twisted.internet import reactor, task
 from dataclasses import dataclass
 from button_events import button_events
+from patch_cyclone_sse import patchCycloneSse
 
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
-
+MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
 collectors = {}
 
 import export_to_influxdb
 
 print(f'merge me back {export_to_influxdb}')
 
+patchCycloneSse()
+
 
 def appendLimit(lst, elem, n=10):
     del lst[:len(lst) - n + 1]
@@ -168,8 +172,11 @@
 class Rdfizer(StreamPipelineStep):
 
     def makeOutputStream(self, inStream: Observable) -> Observable:
-        outputQuadsSets = rx.combine_latest(
-            *[self.makeQuads(inStream, plan) for plan in self.config.objects(self.uri, ROOM['graphStatements'])])
+        plans = list(self.config.objects(self.uri, ROOM['graphStatements']))
+        log.debug(f'{self.uri=} has {len(plans)=}')
+        if not plans:
+            return rx.empty()
+        outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
         return outputQuadsSets
 
     def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:
@@ -203,8 +210,8 @@
     return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
 
 
-def serializeWithNs(graph: PatchableGraph) -> bytes:
-    graph._graph.bind('', 'http://projects.bigasterisk.com/room/')
+def serializeWithNs(graph: ConjunctiveGraph) -> bytes:
+    graph.bind('', ROOM)
     return cast(bytes, graph.serialize(format='n3'))
 
 
@@ -248,6 +255,7 @@
         convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)}))
 
         outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs)
+        outputQuadsSets.subscribe_(self.updateInflux)
 
         outputQuadsSets.subscribe_(self.updateMasterGraph)
 
@@ -262,12 +270,17 @@
 
     def countIncomingMessage(self, msg: bytes):
         self.debugPageData['messagesSeen'] += 1
+        MESSAGES_SEEN.inc()
 
         appendLimit(self.debugSub['recentMessages'], {
             't': truncTime(),
             'msg': msg.decode('ascii'),
         })
 
+    def updateInflux(self, newGraphs):
+        for g in newGraphs:
+            self.influxExport.exportToInflux(g)
+
     def updateMasterGraph(self, newGraphs):
         newQuads = set.union(*newGraphs)
         g = graphFromQuads(newQuads)
@@ -287,7 +300,7 @@
                 collectors[metric].labels(**tags).set(val)
 
         self.masterGraph.patchSubgraph(self.uri, g)
-        self.debugSub['currentOutputGraph']['n3'] = cast(bytes, self.masterGraph.serialize(format='n3')).decode('utf8')
+        self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(g).decode('utf8')
 
 
 class Metrics(cyclone.web.RequestHandler):
--- a/service/mqtt_to_rdf/requirements.txt	Sun Aug 29 13:36:08 2021 -0700
+++ b/service/mqtt_to_rdf/requirements.txt	Sun Aug 29 13:43:14 2021 -0700
@@ -1,15 +1,15 @@
 cyclone==1.3
+docopt
+influxdb==5.3.1
+prometheus_client==0.11.0
 rdflib-jsonld==0.5.0
 rdflib==4.2.2
+rx==3.2.0
 service_identity==21.1.0
 twisted-mqtt==0.3.9
-rx==3.2.0
-docopt
-prometheus_client==0.11.0
-influxdb==5.3.1
 
 cycloneerr
-#export_to_influxdb==0.4.0
+export_to_influxdb==0.5.0
 mqtt_client==0.9.0
 patchablegraph==0.11.0
 rdfdb==0.21.0