Mercurial > code > home > repos > homeauto
changeset 780:729ab70c6212
reformat, update build
author | drewp@bigasterisk.com |
---|---|
date | Sat, 08 Aug 2020 15:06:22 -0700 |
parents | bad87b7dc608 |
children | 6c42c1f64f00 |
files | service/mqtt_to_rdf/Dockerfile service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/requirements.txt service/mqtt_to_rdf/serv.n3 |
diffstat | 4 files changed, 54 insertions(+), 49 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/Dockerfile Sat Aug 08 14:02:46 2020 -0700 +++ b/service/mqtt_to_rdf/Dockerfile Sat Aug 08 15:06:22 2020 -0700 @@ -1,13 +1,13 @@ -FROM bang6:5000/base_x86 +FROM bang5:5000/base_x86 WORKDIR /opt COPY requirements.txt ./ -RUN pip3 install -U pip RUN pip3 uninstall --yes enum34 RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip?v3' +RUN pip3 install -U attrs COPY *.py *.html *.css *.js *.n3 ./ COPY build/bundle.js build/
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Sat Aug 08 14:02:46 2020 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Sat Aug 08 15:06:22 2020 -0700 @@ -2,31 +2,34 @@ Subscribe to mqtt topics; generate RDF statements. """ import json -import sys from pathlib import Path + +import cyclone.web from docopt import docopt -from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD -from rdflib.parser import StringInputSource -from rdflib.term import Node -from twisted.internet import reactor -import cyclone.web -import rx, rx.operators, rx.scheduler.eventloop +from export_to_influxdb import InfluxExporter from greplin import scales from greplin.scales.cyclonehandler import StatsHandler - -from export_to_influxdb import InfluxExporter from mqtt_client import MqttClient - -from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler -from rdfdb.patch import Patch +from patchablegraph import ( + CycloneGraphEventsHandler, + CycloneGraphHandler, + PatchableGraph, +) from rdfdb.rdflibpatch import graphFromQuads +from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD +from rdflib.term import Node +import rx +import rx.operators +import rx.scheduler.eventloop from standardservice.logsetup import log, verboseLogging from standardservice.scalessetup import gatherProcessStats +from twisted.internet import reactor ROOM = Namespace('http://projects.bigasterisk.com/room/') gatherProcessStats() + def parseDurationLiteral(lit: Literal) -> float: if lit.endswith('s'): return float(lit.split('s')[0]) @@ -34,11 +37,12 @@ class MqttStatementSource: + def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): self.uri = uri self.config = config self.masterGraph = masterGraph - self.mqtt = mqtt # deprecated + self.mqtt = mqtt # deprecated self.internalMqtt = internalMqtt self.influx = influx @@ -47,9 +51,7 @@ statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') scales.init(self, statPath) - self._mqttStats = scales.collection( - statPath + '/incoming', scales.IntStat('count'), - scales.RecentFpsStat('fps')) + self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) rawBytes = self.subscribeMqtt(self.mqttTopic) rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) @@ -60,8 +62,7 @@ parsed = self.conversionStep(conv)(parsed) outputQuadsSets = rx.combine_latest( - *[self.makeQuads(parsed, plan) - for plan in g.objects(self.uri, ROOM['graphStatements'])]) + *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) outputQuadsSets.subscribe_(self.updateQuads) @@ -69,7 +70,6 @@ topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) return b'/'.join(t.encode('ascii') for t in topicParts) - def subscribeMqtt(self, topic): # goal is to get everyone on the internal broker and eliminate this mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt @@ -111,13 +111,9 @@ def makeQuads(self, parsed, plan): g = self.config + def quadsFromValue(valueNode): - return set([ - (self.uri, - g.value(plan, ROOM['outputPredicate']), - valueNode, - self.uri) - ]) + return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) def emptyQuads(element): return set([]) @@ -131,7 +127,7 @@ rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), rx.operators.map(emptyQuads), rx.operators.merge(quads), - ) + ) return quads @@ -177,29 +173,41 @@ masterGraph = PatchableGraph() - mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', - brokerPort=1883) # deprecated - internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', - brokerPort=10010) + mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated + internalMqtt = MqttClient(clientId='mqtt_to_rdf', + brokerHost='mosquitto-frontdoor.default.svc.cluster.local', + brokerPort=10210) influx = InfluxExporter(config) srcs = [] for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): - srcs.append(MqttStatementSource(src, config, masterGraph, - mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) + srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) log.info(f'set up {len(srcs)} sources') port = 10018 - reactor.listenTCP(port, cyclone.web.Application([ - (r"/()", cyclone.web.StaticFileHandler, - {"path": ".", "default_filename": "index.html"}), - (r"/build/(bundle.js)", - cyclone.web.StaticFileHandler, {"path": "build"}), - (r'/stats/(.*)', StatsHandler, {'serverName': 'mqtt_to_rdf'}), - (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), - (r"/graph/mqtt/events", CycloneGraphEventsHandler, - {'masterGraph': masterGraph}), - ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']), + reactor.listenTCP(port, + cyclone.web.Application([ + (r"/()", cyclone.web.StaticFileHandler, { + "path": ".", + "default_filename": "index.html" + }), + (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { + "path": "build" + }), + (r'/stats/(.*)', StatsHandler, { + 'serverName': 'mqtt_to_rdf' + }), + (r"/graph/mqtt", CycloneGraphHandler, { + 'masterGraph': masterGraph + }), + (r"/graph/mqtt/events", CycloneGraphEventsHandler, { + 'masterGraph': masterGraph + }), + ], + mqtt=mqtt, + internalMqtt=internalMqtt, + masterGraph=masterGraph, + debug=arg['-v']), interface='::') log.warn('serving on %s', port)
--- a/service/mqtt_to_rdf/requirements.txt Sat Aug 08 14:02:46 2020 -0700 +++ b/service/mqtt_to_rdf/requirements.txt Sat Aug 08 15:06:22 2020 -0700 @@ -1,12 +1,10 @@ -pytype - cyclone rdflib-jsonld==0.4.0 rdflib==4.2.2 twisted-mqtt==0.3.6 git+http://github.com/drewp/scales.git@550bcba42c5e96152ff5c5bd753fbb33ffdfe460#egg=scales git+https://github.com/ReactiveX/RxPY.git@6deb66e827f34a88b4605773d7671322b9cbbd08#egg=rx - +docopt cycloneerr export_to_influxdb==0.4.0 mqtt_client==0.9.0
--- a/service/mqtt_to_rdf/serv.n3 Sat Aug 08 14:02:46 2020 -0700 +++ b/service/mqtt_to_rdf/serv.n3 Sat Aug 08 15:06:22 2020 -0700 @@ -4,8 +4,7 @@ serv:mqtt_to_rdf_image a :DockerImage; :internalPort 10018; - :prodDockerFlags ( - "--net=host"); + :prodDockerFlags (); :localRunDockerFlags ( "-v" "`pwd`:/opt" # "-v" "/my/proj/homeauto/lib:/lib_src"