changeset 1444:4afb1830bb5e

use rx version 3.x Ignore-this: 4232f8e780d35a8d0642e86521eb2801 darcs-hash:747608892b607f78260f4772a4ff2b24c7392f73
author drewp <drewp@bigasterisk.com>
date Tue, 24 Sep 2019 14:04:02 -0700
parents 99540f1a11f7
children 0087017efecb
files lib/mqtt_client/setup.py service/piNode/piNode.py service/piNode/requirements.txt service/wifi/index.html service/wifi/requirements.txt service/wifi/scrape.py
diffstat 6 files changed, 160 insertions(+), 181 deletions(-) [+]
line wrap: on
line diff
--- a/lib/mqtt_client/setup.py	Mon Aug 12 16:46:52 2019 -0700
+++ b/lib/mqtt_client/setup.py	Tue Sep 24 14:04:02 2019 -0700
@@ -5,7 +5,7 @@
     version='0.7.0',
     packages=['mqtt_client'],
     package_dir={'mqtt_client': ''},
-    install_requires=['rx', 'twisted-mqtt'],
+    install_requires=['rx>=3.0.0', 'twisted-mqtt'],
     url='https://projects.bigasterisk.com/mqtt-client/mqtt_client-0.7.0.tar.gz',
     author='Drew Perttula',
     author_email='drewp@bigasterisk.com',
--- a/service/piNode/piNode.py	Mon Aug 12 16:46:52 2019 -0700
+++ b/service/piNode/piNode.py	Tue Sep 24 14:04:02 2019 -0700
@@ -32,7 +32,6 @@
                           scales.PmfStat('configReread'),
                           scales.IntStat('pollException'),
                           scales.PmfStat('pollAll'),
-                          scales.PmfStat('boardPoll'),
                           scales.PmfStat('sendOneshot'),
                           scales.PmfStat('outputStatements'),
 
@@ -101,13 +100,24 @@
 
 
 class DeviceRunner(object):
-    def __init__(self, dev):
+    def __init__(self, dev, masterGraph, sendOneshot, influx):
         self.dev = dev
+        self.masterGraph = masterGraph
+        self.sendOneshot = sendOneshot
+        self.influx = influx
         self.period = getattr(self.dev, 'pollPeriod', .05)
-        #self._lastPollTime.get(i.uri, 0) + self.pollPeriod > now):
+        self.latestStatementsFromInputs = set()
+        self.lastPollTime = None
 
         reactor.callLater(0, self.poll)
 
+    def syncMasterGraphToHostStatements(self):
+        hostStmtCtx = URIRef(self.dev.uri + '/host')
+        newQuads = inContext(self.dev.hostStatements(), hostStmtCtx)
+        p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads)
+        if p:
+            log.debug("patch master with these host stmts %s", p)
+
     @inlineCallbacks
     def poll(self):
         now = time.time()
@@ -115,9 +125,56 @@
             with self.dev.stats.poll.time():
                 new = yield maybeDeferred(self.dev.poll)
         finally:
-            reactor.callLater(max(0, time.time() - (now + self.period)), self.poll)
+            reactor.callLater(max(0, self.period - (time.time() - now)), self.poll)
+
+        if isinstance(new, dict): # new style
+            oneshot = set(new['oneshot'])
+            new = set(new['latest'])
+        else:
+            oneshot = set()
+            new = set(new)
+
+        prev = self.latestStatementsFromInputs
+        # it's important that quads from different devices
+        # don't clash, since that can lead to inconsistent
+        # patches (e.g.
+        #   dev1 changes value from 1 to 2;
+        #   dev2 changes value from 2 to 3;
+        #   dev1 changes from 2 to 4 but this patch will
+        #     fail since the '2' statement is gone)
+        self.masterGraph.patch(Patch.fromDiff(inContext(prev, self.dev.uri),
+                                              inContext(new, self.dev.uri)))
+        self.latestStatementsFromInputs = new
+
+        self.syncMasterGraphToHostStatements() # needed?
+
+        if oneshot:
+            self.sendOneshot(oneshot)
+        self.lastPollTime = now
+
+        if self.latestStatementsFromInputs:
+            self.influx.exportToInflux(set.union(set(self.latestStatementsFromInputs)))
+
         returnValue(new)
 
+    def filterIncomingStatements(self, stmts):
+        wanted = set()
+        unwanted = set(stmts)
+        for pat in self.dev.outputPatterns():
+            if [term is None for term in pat] != [False, False, True]:
+                raise NotImplementedError
+            for stmt in stmts:
+                if stmt[:2] == pat[:2]:
+                    wanted.add(stmt)
+                    unwanted.discard(stmt)
+        return wanted, unwanted
+
+    def onPutStatements(self, stmts):
+        log.info("output goes to action handler for %s" % self.dev.uri)
+        with self.dev.stats.output.time():
+            self.dev.sendOutput(stmts)
+        self.syncMasterGraphToHostStatements()
+
 class Board(object):
     """similar to arduinoNode.Board but without the communications stuff"""
     def __init__(self, graph, masterGraph, uri, hubHost):
@@ -127,76 +184,21 @@
 
         self.masterGraph.setToGraph(self.staticStmts())
         self.pi = pigpio.pi()
-        self._devs = [DeviceRunner(d) for d in devices.makeDevices(graph, self.uri, self.pi)]
-        log.debug('found %s devices', len(self._devs))
-        self._statementsFromInputs = {} # input device uri: latest statements
-        self._lastPollTime = {} # input device uri: time()
+
         self._influx = InfluxExporter(self.graph)
-        for d in self._devs:
-            self.syncMasterGraphToHostStatements(d.dev)
-
-    def startPolling(self):
-        task.LoopingCall(self._poll).start(.05)
-
-    @STATS.boardPoll.time() # not differentiating multiple boards here
-    def _poll(self):
-        try:
-            self._pollMaybeError()
-        except Exception:
-            STATS.pollException += 1
-            log.exception("During poll:")
-
-
-    @inlineCallbacks
-    def _pollOneDev(self, i):
-        now = time.time()
-
-        new = i.poll()
-        if isinstance(new, dict): # new style
-            oneshot = new['oneshot']
-            new = new['latest']
-        else:
-            oneshot = None
-
-        self._updateMasterWithNewPollStatements(i.uri, new)
-
-        if oneshot:
-            self._sendOneshot(oneshot)
-        self._lastPollTime[i.uri] = now
-
-    @inlineCallbacks
-    def _pollMaybeError(self):
-        pollTime = {} # uri: sec
-        yield gatherResults([self._pollOneDev(i.dev, pollTime)
-                             for i in self._devs], consumeErrors=True)
-
-        pollResults = map(set, self._statementsFromInputs.values())
-        if pollResults:
-            self._influx.exportToInflux(set.union(*pollResults))
-
-    def _updateMasterWithNewPollStatements(self, dev, new):
-        prev = self._statementsFromInputs.get(dev, set())
-
-        # it's important that quads from different devices
-        # don't clash, since that can lead to inconsistent
-        # patches (e.g.
-        #   dev1 changes value from 1 to 2;
-        #   dev2 changes value from 2 to 3;
-        #   dev1 changes from 2 to 4 but this patch will
-        #     fail since the '2' statement is gone)
-        self.masterGraph.patch(Patch.fromDiff(inContext(prev, dev),
-                                              inContext(new, dev)))
-        self._statementsFromInputs[dev] = new
+        self._runners = [DeviceRunner(d, self.masterGraph, self.sendOneshot, self._influx)
+                         for d in devices.makeDevices(graph, self.uri, self.pi)]
+        log.debug('found %s devices', len(self._runners))
 
     @STATS.sendOneshot.time()
-    def _sendOneshot(self, oneshot):
+    def sendOneshot(self, oneshot):
         body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
                          for s,p,o in oneshot)).encode('utf8')
         url = 'http://%s:9071/oneShot' % self.hubHost
-        d = fetch(method='POST',
-                  url=url,
-                  headers={'Content-Type': ['text/n3']},
-                  postdata=body,
+        d = treq.post(
+                  url=url.encode('ascii'),
+                  headers={b'Content-Type': [b'text/n3']},
+                  data=body,
                   timeout=5)
         def err(e):
             log.info('oneshot post to %r failed:  %s',
@@ -204,41 +206,24 @@
         d.addErrback(err)
 
     @STATS.outputStatements.time()
-    def outputStatements(self, stmts):
-        unused = set(stmts)
-        for devRunner in self._devs:
-            dev = devRunner.dev
-            stmtsForDev = []
-            for pat in dev.outputPatterns():
-                if [term is None for term in pat] != [False, False, True]:
-                    raise NotImplementedError
-                for stmt in stmts:
-                    if stmt[:2] == pat[:2]:
-                        stmtsForDev.append(stmt)
-                        unused.discard(stmt)
-            if stmtsForDev:
-                log.info("output goes to action handler for %s" % dev.uri)
-                with dev.stats.output.time():
-                    dev.sendOutput(stmtsForDev)
-
-                # Dev *could* change hostStatements at any time, and
-                # we're not currently tracking that, but the usual is
-                # to change them in response to sendOutput so this
-                # should be good enough. The right answer is to give
-                # each dev the masterGraph for it to write to.
-                self.syncMasterGraphToHostStatements(dev)
-                log.info("output and masterGraph sync complete")
-        if unused:
+    def outputStatements(self, stmts: set):
+        if not stmts:
+            return
+        for devRunner in self._runners:
+            wanted, unwanted = devRunner.filterIncomingStatements(stmts)
+            log.info(f'\ndev {devRunner.dev.uri}:n wanted {wanted}. unwanted {unwanted}')
+            if len(wanted) == len(stmts):
+                devRunner.onPutStatements(stmts)
+                break
+            elif len(unwanted) == len(stmts):
+                continue
+            else:
+                raise NotImplementedError(f'dev {devRunner.dev.uri} wanted only {wanted}')
+        else:
             log.info("Board %s doesn't care about these statements:", self.uri)
             for s in unused:
                 log.warn("%r", s)
 
-    def syncMasterGraphToHostStatements(self, dev):
-        hostStmtCtx = URIRef(dev.uri + '/host')
-        newQuads = inContext(dev.hostStatements(), hostStmtCtx)
-        p = self.masterGraph.patchSubgraph(hostStmtCtx, newQuads)
-        log.debug("patch master with these host stmts %s", p)
-
     def staticStmts(self):
         return [(HOST[hostname], ROOM['connectedTo'], self.uri, CTX)]
 
@@ -246,7 +231,7 @@
         """for web page"""
         return {
             'uri': self.uri,
-            'devices': [d.dev.description() for d in self._devs],
+            'devices': [d.dev.description() for d in self._runners],
             'graph': 'http://sticker:9059/graph', #todo
         }
 
--- a/service/piNode/requirements.txt	Mon Aug 12 16:46:52 2019 -0700
+++ b/service/piNode/requirements.txt	Tue Sep 24 14:04:02 2019 -0700
@@ -14,10 +14,11 @@
 rdflib==4.2.2
 rpi_ws281x==3.1.0
 service_identity
+treq==18.6.0
 w1thermsensor
 
 cycloneerr
-devices_shared==0.3.0
+devices_shared==0.5.0
 export_to_influxdb==0.1.0
 homeauto_anynode==0.8.0
 patchablegraph==0.5.0
--- a/service/wifi/index.html	Mon Aug 12 16:46:52 2019 -0700
+++ b/service/wifi/index.html	Tue Sep 24 14:04:02 2019 -0700
@@ -1,82 +1,74 @@
 <!doctype html>
 <html>
-  <head>
-    <title>wifi</title>
-    <meta charset="utf-8" />
-    <script src="/lib/polymer/1.0.9/webcomponentsjs/webcomponents-lite.min.js"></script>
-    <link rel="import" href="/lib/polymer/1.0.9/polymer/polymer.html">
-    <script src="/lib/underscore-1.5.2.min.js"></script>
-    <link rel="import" href="/lib/polymer/1.0.9/iron-ajax/iron-ajax.html">
-    <link rel="import" href="/rdf/n3+polymer/trig-store.html">
-  </head>
-  <body>
-    <h1>Devices on wifi</h1>
-    <p><a href="../dhcpleases">go to dhcpleases</a></p>
-    <dom-module id="wifi-table">
-      <template>
-        <iron-ajax auto url="graph"
-                   handle-as="text"
-                   last-response="{{ajaxResponse}}"></iron-ajax>
-        <trig-store id="ts" trig-input="{{ajaxResponse}}"></trig-store>
+
+<head>
+  <title>wifi</title>
+  <meta charset="utf-8">
+  <script src="/lib/polymer/1.0.9/webcomponentsjs/webcomponents.min.js"></script>
+  <script src="/lib/require/require-2.3.3.js"></script>
+  <script src="/rdf/common_paths_and_ns.js"></script>
+
+  <link rel="stylesheet" href="/rdf/browse/style.css">
+
+  <link rel="import" href="/rdf/streamed-graph.html">
+  <link rel="import" href="/lib/polymer/1.0.9/polymer/polymer.html">
+</head>
+
+<body class="rdfBrowsePage">
+  <template id="t" is="dom-bind">
+    <style>
+     body {
+         background: black;
+         color: white;
+     }
+     a {
+         color: #b1b1fd;
+         text-shadow: 1px 1px 0px #0400ff94;
+         text-decoration-color: #00007714;
+     }
+     #subjectRequest {
+         width: 50em;
+     }
+    </style>
+
+    <streamed-graph url="/sse_collector/graph/network" graph="{{graph}}"></streamed-graph>
+    <div id="out">
+    </div>
 
-        <table>
-          <tr>
-            <th>name</th>
-            <th>MAC</th>
-            <th>Connected</th>
-          </tr>
-          <template is="dom-repeat" items="{{devices}}">
-            <tr>
-              <td>{{item.deviceName}}</td>
-              <td>{{item.mac}}</td>
-              <td>{{item.connectedAgo}}</td>
-            </tr>
-          </template>
-        </table>
-      </template>
-      
-      <script>
-       HTMLImports.whenReady(function () {
-       Polymer({
-         is: "wifi-table",
-         ready: function() {
-           this.$.ts.addEventListener('store-changed', this.storeChanged.bind(this));
-           this.devices = [];
-         },
-         storeChanged: function(ev) {
-           var store = ev.detail.value;
-           var find = function(s, p, o) { return store.findAllGraphs(s, p, o); };
-           var findOne = function(s, p, o) {
-             var rows = find(s, p, o);
-             return rows[0];
-           };
+    <script type="module">
+      import { render } from '/lib/lit-html/1.0.0/lit-html.js';
+      import { graphView } from './wifi.js';
+      const sg = document.querySelector('streamed-graph');
+
+      const out = document.querySelector('#out');
+      const startPainting = () => {
+        if (!sg.graph || !sg.graph.graph) {
+          setTimeout(startPainting, 100);
+          return;
+        }
+
+        let dirty = true;
 
-           this.devices = [];
-           
-           find(null, "room:connectedToNetwork", "http://bigasterisk.com/wifiAccessPoints"
-                ).forEach(function(row) {
-             var out = {
-               mac: N3.Util.getLiteralValue(
-                 findOne(row.subject, "room:macAddress", null).object),
-               connectedAgo: N3.Util.getLiteralValue(
-                 findOne(row.subject, "room:connectedAgo", null).object)
-               
-             };
-             try {
-               var dev = findOne(row.subject, "room:deviceName", null).object;
-               out.deviceName = N3.Util.getLiteralValue(dev);
-             } catch(e) {
+        const repaint = () => {
+          if (!dirty) {
+            return;
+          }
+          render(graphView(sg.graph.graph), out);
+          dirty = false;
+        };
 
-             }           
-             
-             this.devices.push(out);
-           }.bind(this));
-           this.devices = _.sortBy(this.devices, 'deviceName');
-         }
-       });
-       });
-      </script>
-    </dom-module>
-    <wifi-table></wifi-table>
-  </body>
+        sg.addEventListener('graph-changed', (ev) => {
+          dirty = true;
+          requestAnimationFrame(repaint);
+        });
+        repaint();
+      };
+      setTimeout(startPainting, 10);
+    </script>
+  </template>
+  <script>
+
+  </script>
+</body>
+
 </html>
--- a/service/wifi/requirements.txt	Mon Aug 12 16:46:52 2019 -0700
+++ b/service/wifi/requirements.txt	Tue Sep 24 14:04:02 2019 -0700
@@ -12,5 +12,5 @@
 cycloneerr
 patchablegraph==0.5.0
 rdfdb==0.8.0
-standardservice==0.4.0
+standardservice==0.6.0
 
--- a/service/wifi/scrape.py	Mon Aug 12 16:46:52 2019 -0700
+++ b/service/wifi/scrape.py	Tue Sep 24 14:04:02 2019 -0700
@@ -64,10 +64,11 @@
         
         if row['contype'] in ['2.4G', '5G']:
             orbi = macUri(row['conn_orbi_mac'])
-            triples.add((orbi, ROOM['wifiBand'],
-                           ROOM['wifiBand/%s' % row['contype']]))
+            ct = ROOM['wifiBand/%s' % row['contype']]
             triples.add((uri, ROOM['connectedToAp'], orbi))
+            triples.add((uri, ROOM['wifiBand'], ct))
             triples.add((orbi, RDF.type, ROOM['AccessPoint']))
+            triples.add((orbi, ROOM['wifiBand'], ct))
             triples.add((orbi, ROOM['macAddress'],
                            Literal(row['conn_orbi_mac'].lower())))
             triples.add((orbi, RDFS.label, Literal(row['conn_orbi_name'])))