Mercurial > code > home > repos > homeauto
changeset 1444:4afb1830bb5e
use rx version 3.x
Ignore-this: 4232f8e780d35a8d0642e86521eb2801
darcs-hash:747608892b607f78260f4772a4ff2b24c7392f73
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 24 Sep 2019 14:04:02 -0700 |
parents | 99540f1a11f7 |
children | 0087017efecb |
files | lib/mqtt_client/setup.py service/piNode/piNode.py service/piNode/requirements.txt service/wifi/index.html service/wifi/requirements.txt service/wifi/scrape.py |
diffstat | 6 files changed, 160 insertions(+), 181 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/mqtt_client/setup.py Mon Aug 12 16:46:52 2019 -0700 +++ b/lib/mqtt_client/setup.py Tue Sep 24 14:04:02 2019 -0700 @@ -5,7 +5,7 @@ version='0.7.0', packages=['mqtt_client'], package_dir={'mqtt_client': ''}, - install_requires=['rx', 'twisted-mqtt'], + install_requires=['rx>=3.0.0', 'twisted-mqtt'], url='https://projects.bigasterisk.com/mqtt-client/mqtt_client-0.7.0.tar.gz', author='Drew Perttula', author_email='drewp@bigasterisk.com',
--- a/service/piNode/piNode.py Mon Aug 12 16:46:52 2019 -0700 +++ b/service/piNode/piNode.py Tue Sep 24 14:04:02 2019 -0700 @@ -32,7 +32,6 @@ scales.PmfStat('configReread'), scales.IntStat('pollException'), scales.PmfStat('pollAll'), - scales.PmfStat('boardPoll'), scales.PmfStat('sendOneshot'), scales.PmfStat('outputStatements'), @@ -101,13 +100,24 @@ class DeviceRunner(object): - def __init__(self, dev): + def __init__(self, dev, masterGraph, sendOneshot, influx): self.dev = dev + self.masterGraph = masterGraph + self.sendOneshot = sendOneshot + self.influx = influx self.period = getattr(self.dev, 'pollPeriod', .05) - #self._lastPollTime.get(i.uri, 0) + self.pollPeriod > now): + self.latestStatementsFromInputs = set() + self.lastPollTime = None reactor.callLater(0, self.poll) + def syncMasterGraphToHostStatements(self): + hostStmtCtx = URIRef(self.dev.uri + '/host') + newQuads = inContext(self.dev.hostStatements(), hostStmtCtx) + p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads) + if p: + log.debug("patch master with these host stmts %s", p) + @inlineCallbacks def poll(self): now = time.time() @@ -115,9 +125,56 @@ with self.dev.stats.poll.time(): new = yield maybeDeferred(self.dev.poll) finally: - reactor.callLater(max(0, time.time() - (now + self.period)), self.poll) + reactor.callLater(max(0, self.period - (time.time() - now)), self.poll) + + if isinstance(new, dict): # new style + oneshot = set(new['oneshot']) + new = set(new['latest']) + else: + oneshot = set() + new = set(new) + + prev = self.latestStatementsFromInputs + # it's important that quads from different devices + # don't clash, since that can lead to inconsistent + # patches (e.g. + # dev1 changes value from 1 to 2; + # dev2 changes value from 2 to 3; + # dev1 changes from 2 to 4 but this patch will + # fail since the '2' statement is gone) + self.masterGraph.patch(Patch.fromDiff(inContext(prev, self.dev.uri), + inContext(new, self.dev.uri))) + self.latestStatementsFromInputs = new + + self.syncMasterGraphToHostStatements() # needed? + + if oneshot: + self.sendOneshot(oneshot) + self.lastPollTime = now + + if self.latestStatementsFromInputs: + self.influx.exportToInflux(set.union(set(self.latestStatementsFromInputs))) + returnValue(new) + def filterIncomingStatements(self, stmts): + wanted = set() + unwanted = set(stmts) + for pat in self.dev.outputPatterns(): + if [term is None for term in pat] != [False, False, True]: + raise NotImplementedError + for stmt in stmts: + if stmt[:2] == pat[:2]: + wanted.add(stmt) + unwanted.discard(stmt) + return wanted, unwanted + + def onPutStatements(self, stmts): + log.info("output goes to action handler for %s" % self.dev.uri) + with self.dev.stats.output.time(): + self.dev.sendOutput(stmts) + self.syncMasterGraphToHostStatements() + class Board(object): """similar to arduinoNode.Board but without the communications stuff""" def __init__(self, graph, masterGraph, uri, hubHost): @@ -127,76 +184,21 @@ self.masterGraph.setToGraph(self.staticStmts()) self.pi = pigpio.pi() - self._devs = [DeviceRunner(d) for d in devices.makeDevices(graph, self.uri, self.pi)] - log.debug('found %s devices', len(self._devs)) - self._statementsFromInputs = {} # input device uri: latest statements - self._lastPollTime = {} # input device uri: time() + self._influx = InfluxExporter(self.graph) - for d in self._devs: - self.syncMasterGraphToHostStatements(d.dev) - - def startPolling(self): - task.LoopingCall(self._poll).start(.05) - - @STATS.boardPoll.time() # not differentiating multiple boards here - def _poll(self): - try: - self._pollMaybeError() - except Exception: - STATS.pollException += 1 - log.exception("During poll:") - - - @inlineCallbacks - def _pollOneDev(self, i): - now = time.time() - - new = i.poll() - if isinstance(new, dict): # new style - oneshot = new['oneshot'] - new = new['latest'] - else: - oneshot = None - - self._updateMasterWithNewPollStatements(i.uri, new) - - if oneshot: - self._sendOneshot(oneshot) - self._lastPollTime[i.uri] = now - - @inlineCallbacks - def _pollMaybeError(self): - pollTime = {} # uri: sec - yield gatherResults([self._pollOneDev(i.dev, pollTime) - for i in self._devs], consumeErrors=True) - - pollResults = map(set, self._statementsFromInputs.values()) - if pollResults: - self._influx.exportToInflux(set.union(*pollResults)) - - def _updateMasterWithNewPollStatements(self, dev, new): - prev = self._statementsFromInputs.get(dev, set()) - - # it's important that quads from different devices - # don't clash, since that can lead to inconsistent - # patches (e.g. - # dev1 changes value from 1 to 2; - # dev2 changes value from 2 to 3; - # dev1 changes from 2 to 4 but this patch will - # fail since the '2' statement is gone) - self.masterGraph.patch(Patch.fromDiff(inContext(prev, dev), - inContext(new, dev))) - self._statementsFromInputs[dev] = new + self._runners = [DeviceRunner(d, self.masterGraph, self.sendOneshot, self._influx) + for d in devices.makeDevices(graph, self.uri, self.pi)] + log.debug('found %s devices', len(self._runners)) @STATS.sendOneshot.time() - def _sendOneshot(self, oneshot): + def sendOneshot(self, oneshot): body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) for s,p,o in oneshot)).encode('utf8') url = 'http://%s:9071/oneShot' % self.hubHost - d = fetch(method='POST', - url=url, - headers={'Content-Type': ['text/n3']}, - postdata=body, + d = treq.post( + url=url.encode('ascii'), + headers={b'Content-Type': [b'text/n3']}, + data=body, timeout=5) def err(e): log.info('oneshot post to %r failed: %s', @@ -204,41 +206,24 @@ d.addErrback(err) @STATS.outputStatements.time() - def outputStatements(self, stmts): - unused = set(stmts) - for devRunner in self._devs: - dev = devRunner.dev - stmtsForDev = [] - for pat in dev.outputPatterns(): - if [term is None for term in pat] != [False, False, True]: - raise NotImplementedError - for stmt in stmts: - if stmt[:2] == pat[:2]: - stmtsForDev.append(stmt) - unused.discard(stmt) - if stmtsForDev: - log.info("output goes to action handler for %s" % dev.uri) - with dev.stats.output.time(): - dev.sendOutput(stmtsForDev) - - # Dev *could* change hostStatements at any time, and - # we're not currently tracking that, but the usual is - # to change them in response to sendOutput so this - # should be good enough. The right answer is to give - # each dev the masterGraph for it to write to. - self.syncMasterGraphToHostStatements(dev) - log.info("output and masterGraph sync complete") - if unused: + def outputStatements(self, stmts: set): + if not stmts: + return + for devRunner in self._runners: + wanted, unwanted = devRunner.filterIncomingStatements(stmts) + log.info(f'\ndev {devRunner.dev.uri}:n wanted {wanted}. unwanted {unwanted}') + if len(wanted) == len(stmts): + devRunner.onPutStatements(stmts) + break + elif len(unwanted) == len(stmts): + continue + else: + raise NotImplementedError(f'dev {devRunner.dev.uri} wanted only {wanted}') + else: log.info("Board %s doesn't care about these statements:", self.uri) for s in unused: log.warn("%r", s) - def syncMasterGraphToHostStatements(self, dev): - hostStmtCtx = URIRef(dev.uri + '/host') - newQuads = inContext(dev.hostStatements(), hostStmtCtx) - p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads) - log.debug("patch master with these host stmts %s", p) - def staticStmts(self): return [(HOST[hostname], ROOM['connectedTo'], self.uri, CTX)] @@ -246,7 +231,7 @@ """for web page""" return { 'uri': self.uri, - 'devices': [d.dev.description() for d in self._devs], + 'devices': [d.dev.description() for d in self._runners], 'graph': 'http://sticker:9059/graph', #todo }
--- a/service/piNode/requirements.txt Mon Aug 12 16:46:52 2019 -0700 +++ b/service/piNode/requirements.txt Tue Sep 24 14:04:02 2019 -0700 @@ -14,10 +14,11 @@ rdflib==4.2.2 rpi_ws281x==3.1.0 service_identity +treq==18.6.0 w1thermsensor cycloneerr -devices_shared==0.3.0 +devices_shared==0.5.0 export_to_influxdb==0.1.0 homeauto_anynode==0.8.0 patchablegraph==0.5.0
--- a/service/wifi/index.html Mon Aug 12 16:46:52 2019 -0700 +++ b/service/wifi/index.html Tue Sep 24 14:04:02 2019 -0700 @@ -1,82 +1,74 @@ <!doctype html> <html> - <head> - <title>wifi</title> - <meta charset="utf-8" /> - <script src="/lib/polymer/1.0.9/webcomponentsjs/webcomponents-lite.min.js"></script> - <link rel="import" href="/lib/polymer/1.0.9/polymer/polymer.html"> - <script src="/lib/underscore-1.5.2.min.js"></script> - <link rel="import" href="/lib/polymer/1.0.9/iron-ajax/iron-ajax.html"> - <link rel="import" href="/rdf/n3+polymer/trig-store.html"> - </head> - <body> - <h1>Devices on wifi</h1> - <p><a href="../dhcpleases">go to dhcpleases</a></p> - <dom-module id="wifi-table"> - <template> - <iron-ajax auto url="graph" - handle-as="text" - last-response="{{ajaxResponse}}"></iron-ajax> - <trig-store id="ts" trig-input="{{ajaxResponse}}"></trig-store> + +<head> + <title>wifi</title> + <meta charset="utf-8"> + <script src="/lib/polymer/1.0.9/webcomponentsjs/webcomponents.min.js"></script> + <script src="/lib/require/require-2.3.3.js"></script> + <script src="/rdf/common_paths_and_ns.js"></script> + + <link rel="stylesheet" href="/rdf/browse/style.css"> + + <link rel="import" href="/rdf/streamed-graph.html"> + <link rel="import" href="/lib/polymer/1.0.9/polymer/polymer.html"> +</head> + +<body class="rdfBrowsePage"> + <template id="t" is="dom-bind"> + <style> + body { + background: black; + color: white; + } + a { + color: #b1b1fd; + text-shadow: 1px 1px 0px #0400ff94; + text-decoration-color: #00007714; + } + #subjectRequest { + width: 50em; + } + </style> + + <streamed-graph url="/sse_collector/graph/network" graph="{{graph}}"></streamed-graph> + <div id="out"> + </div> - <table> - <tr> - <th>name</th> - <th>MAC</th> - <th>Connected</th> - </tr> - <template is="dom-repeat" items="{{devices}}"> - <tr> - <td>{{item.deviceName}}</td> - <td>{{item.mac}}</td> - <td>{{item.connectedAgo}}</td> - </tr> - </template> - </table> - </template> - - <script> - HTMLImports.whenReady(function () { - Polymer({ - is: "wifi-table", - ready: function() { - this.$.ts.addEventListener('store-changed', this.storeChanged.bind(this)); - this.devices = []; - }, - storeChanged: function(ev) { - var store = ev.detail.value; - var find = function(s, p, o) { return store.findAllGraphs(s, p, o); }; - var findOne = function(s, p, o) { - var rows = find(s, p, o); - return rows[0]; - }; + <script type="module"> + import { render } from '/lib/lit-html/1.0.0/lit-html.js'; + import { graphView } from './wifi.js'; + const sg = document.querySelector('streamed-graph'); + + const out = document.querySelector('#out'); + const startPainting = () => { + if (!sg.graph || !sg.graph.graph) { + setTimeout(startPainting, 100); + return; + } + + let dirty = true; - this.devices = []; - - find(null, "room:connectedToNetwork", "http://bigasterisk.com/wifiAccessPoints" - ).forEach(function(row) { - var out = { - mac: N3.Util.getLiteralValue( - findOne(row.subject, "room:macAddress", null).object), - connectedAgo: N3.Util.getLiteralValue( - findOne(row.subject, "room:connectedAgo", null).object) - - }; - try { - var dev = findOne(row.subject, "room:deviceName", null).object; - out.deviceName = N3.Util.getLiteralValue(dev); - } catch(e) { + const repaint = () => { + if (!dirty) { + return; + } + render(graphView(sg.graph.graph), out); + dirty = false; + }; - } - - this.devices.push(out); - }.bind(this)); - this.devices = _.sortBy(this.devices, 'deviceName'); - } - }); - }); - </script> - </dom-module> - <wifi-table></wifi-table> - </body> + sg.addEventListener('graph-changed', (ev) => { + dirty = true; + requestAnimationFrame(repaint); + }); + repaint(); + }; + setTimeout(startPainting, 10); + </script> + </template> + <script> + + </script> +</body> + </html>
--- a/service/wifi/requirements.txt Mon Aug 12 16:46:52 2019 -0700 +++ b/service/wifi/requirements.txt Tue Sep 24 14:04:02 2019 -0700 @@ -12,5 +12,5 @@ cycloneerr patchablegraph==0.5.0 rdfdb==0.8.0 -standardservice==0.4.0 +standardservice==0.6.0
--- a/service/wifi/scrape.py Mon Aug 12 16:46:52 2019 -0700 +++ b/service/wifi/scrape.py Tue Sep 24 14:04:02 2019 -0700 @@ -64,10 +64,11 @@ if row['contype'] in ['2.4G', '5G']: orbi = macUri(row['conn_orbi_mac']) - triples.add((orbi, ROOM['wifiBand'], - ROOM['wifiBand/%s' % row['contype']])) + ct = ROOM['wifiBand/%s' % row['contype']] triples.add((uri, ROOM['connectedToAp'], orbi)) + triples.add((uri, ROOM['wifiBand'], ct)) triples.add((orbi, RDF.type, ROOM['AccessPoint'])) + triples.add((orbi, ROOM['wifiBand'], ct)) triples.add((orbi, ROOM['macAddress'], Literal(row['conn_orbi_mac'].lower()))) triples.add((orbi, RDFS.label, Literal(row['conn_orbi_name'])))