changeset 720:e157afd642b5

rewrite reasoning PutOutputs Ignore-this: 9c7c4b67b1f42992920572d147544a4f
author drewp@bigasterisk.com
date Wed, 05 Feb 2020 00:19:43 -0800
parents 34343fb39fbe
children 4fa5c6a61282
files service/reasoning/actions.py service/reasoning/httpputoutputs.py
diffstat 2 files changed, 191 insertions(+), 188 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/actions.py	Wed Feb 05 00:18:20 2020 -0800
+++ b/service/reasoning/actions.py	Wed Feb 05 00:19:43 2020 -0800
@@ -1,99 +1,40 @@
-from rdflib import URIRef, Namespace, RDF, Literal
-from twisted.internet import reactor
+import json
 import logging
 import urllib
-import json
-import time
 
+from rdflib import URIRef, Namespace, RDF, Literal
+from twisted.internet import reactor
 import treq
+
+from httpputoutputs import HttpPutOutputs
+
 log = logging.getLogger('output')
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/")
 
-class HttpPutOutput(object):
-    def __init__(self, url, mockOutput=False):
-        self.url = url
-        self.mockOutput = mockOutput
-        self.payload = None
-        self.foafAgent = None
-        self.nextCall = None
-        self.lastErr = None
-        self.numRequests = 0
-
-    def report(self):
-        return {
-            'url': self.url,
-            'urlAbbrev': self.url
-            .replace('http%3A%2F%2Fprojects.bigasterisk.com%2Froom%2F', ':')
-            .replace('http://projects.bigasterisk.com/room/', ':')
-            .replace('.vpn-home.bigasterisk.com', '.vpn-home'),
-            'payload': self.payload,
-            'numRequests': self.numRequests,
-            'lastChangeTime': round(self.lastChangeTime, 2),
-            'lastErr': str(self.lastErr) if self.lastErr is not None else None,
-            }
-
-    def setPayload(self, payload, foafAgent):
-        if self.numRequests > 0 and (self.payload == payload and
-                                     self.foafAgent == foafAgent):
-            return
-        self.payload = payload
-        self.foafAgent = foafAgent
-        self.lastChangeTime = time.time()
-        self.makeRequest()
+def secsFromLiteral(v):
+    if v[-1] != 's':
+        raise NotImplementedError(v)
+    return float(v[:-1])
 
-    def makeRequest(self):
-        if self.payload is None:
-            log.debug("PUT None to %s - waiting", self.url)
-            return
-        h = {}
-        if self.foafAgent:
-            h['x-foaf-agent'] = self.foafAgent
-        if self.nextCall and self.nextCall.active():
-            self.nextCall.cancel()
-            self.nextCall = None
-        self.lastErr = None
-        log.debug("PUT %s payload=%s agent=%s", self.url, self.payload, self.foafAgent)
-        if not self.mockOutput:
-            self.currentRequest = treq.put(self.url, data=self.payload, headers=h, timeout=3)
-            self.currentRequest.addCallback(self.onResponse).addErrback(self.onError)
-        else:
-            reactor.callLater(.2, self.onResponse, None)
-
-        self.numRequests += 1
+def ntStatement(stmt):
+    def compact(u):
+        if isinstance(u, URIRef) and u.startswith(ROOM):
+            return 'room:' + u[len(ROOM):]
+        return u.n3()
+    return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2]))
 
-    def onResponse(self, resp):
-        log.debug("  PUT %s ok", self.url)
-        self.lastErr = None
-        self.currentRequest = None
-        self.nextCall = reactor.callLater(30, self.makeRequest)
-
-    def onError(self, err):
-        self.lastErr = err
-        log.debug('  PUT %s failed: %s', self.url, err)
-        self.currentRequest = None
-        self.nextCall = reactor.callLater(50, self.makeRequest)
-
-class HttpPutOutputs(object):
-    """these grow forever"""
-    def __init__(self, mockOutput=False):
-        self.mockOutput = mockOutput
-        self.state = {} # url: HttpPutOutput
-
-    def put(self, url, payload, foafAgent):
-        if url not in self.state:
-            self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput)
-        self.state[url].setPayload(payload, foafAgent)
 
 class Actions(object):
-    def __init__(self, sendToLiveClients, mockOutput=False):
+    def __init__(self, inputGraph, sendToLiveClients, mockOutput=False):
+        self.inputGraph = inputGraph
         self.mockOutput = mockOutput
         self.putOutputs = HttpPutOutputs(mockOutput=mockOutput)
         self.sendToLiveClients = sendToLiveClients
 
-    def putResults(self, deviceGraph, inferred):
+    def putResults(self, inferred):
         """
         some conclusions in the inferred graph lead to PUT requests
         getting made
@@ -106,73 +47,38 @@
         If the graph doesn't contain any matches, we use (?d
         :zeroValue ?val) for the value and PUT that.
         """
+        deviceGraph = self.inputGraph.getGraph()
         activated = set()  # (subj,pred) pairs for which we're currently putting some value
         activated.update(self._putDevices(deviceGraph, inferred))
         self._oneShotPostActions(deviceGraph, inferred)
-        for dev, pred in [
-                #(URIRef('http://bigasterisk.com/host/bang/monitor'), ROOM.powerState),
-                (URIRef('http://bigasterisk.com/host/dash/monitor'), ROOM.powerState),
-                (URIRef('http://bigasterisk.com/host/frontdoor/monitor'), ROOM.powerState),
-                (ROOM['storageCeilingLedLong'], ROOM.brightness),
-                (ROOM['storageCeilingLedCross'], ROOM.brightness),
-                (ROOM['garageOverhead'], ROOM.brightness),
-                (ROOM['headboardWhite'], ROOM.brightness),
-                (ROOM['changingWhite'], ROOM.brightness),
-                (ROOM['starTrekLight'], ROOM.brightness),
-                (ROOM['kitchenLight'], ROOM.brightness),
-                (ROOM['kitchenCounterLight'], ROOM.brightness),
-                (ROOM['livingRoomLamp1'], ROOM.brightness),
-                (ROOM['livingRoomLamp2'], ROOM.brightness),
-                (ROOM['loftDeskStrip'], ROOM.x),
-                (ROOM['bedLedStrip'], ROOM.color),
-            ]:
-            url = deviceGraph.value(dev, ROOM.putUrl)
-
-            log.debug('inferredObjects of dev=%s pred=%s',
-                      deviceGraph.qname(dev),
-                      deviceGraph.qname(pred))
-            inferredObjects = list(inferred.objects(dev, pred))
-            if len(inferredObjects) == 0:
-                # rm this- use activated instead
-                self._putZero(deviceGraph, dev, pred, url)
-            elif len(inferredObjects) == 1:
-                log.debug('  inferredObject: %s %s %r',
-                          deviceGraph.qname(dev),
-                          deviceGraph.qname(pred),
-                          inferredObjects[0].toPython())
-                activated.add((dev, pred))
-                self._putInferred(deviceGraph, url, inferredObjects[0])
-            elif len(inferredObjects) > 1:
-                log.info("  conflict, ignoring: %s has %s of %s" %
-                         (dev, pred, inferredObjects))
-                # write about it to the inferred graph?
         self.putDefaults(deviceGraph, activated)
 
-    def putDefaults(self, deviceGraph, activated):
-        """
-        If inferring (:a :b :c) would cause a PUT, you can say
-
-        reasoning:defaultOutput reasoning:default [
-          :subject :a
-          :predicate :b
-          :defaultObject :c
-        ]
-
-        and we'll do that PUT if no rule has put anything else with
-        (:a :b *).
-        """
-
-        defaultStmts = set()
-        for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'],
-                                               REASONING['default']):
-            s = deviceGraph.value(defaultDesc, ROOM['subject'])
-            p = deviceGraph.value(defaultDesc, ROOM['predicate'])
-            if (s, p) not in activated:
-                obj = deviceGraph.value(defaultDesc, ROOM['defaultObject'])
-
-                defaultStmts.add((s, p, obj))
-                log.debug('defaultStmts %s %s %s', s, p, obj)
-        self._putDevices(deviceGraph, defaultStmts)
+    def _putDevices(self, deviceGraph, inferred):
+        activated = set()
+        agentFor = {}
+        for stmt in inferred:
+            if stmt[1] == ROOM['putAgent']:
+                agentFor[stmt[0]] = stmt[2]
+        for stmt in inferred:
+            log.debug('inferred stmt we might PUT: %s', ntStatement(stmt))
+            putUrl = deviceGraph.value(stmt[0], ROOM['putUrl'])
+            putPred = deviceGraph.value(stmt[0], ROOM['putPredicate'])
+            matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'],
+                                          default=putPred)
+            if putUrl and matchPred == stmt[1]:
+                log.debug('putDevices: stmt %s leads to putting at %s',
+                          ntStatement(stmt), putUrl.n3())
+                self._put(putUrl + '?' + urllib.urlencode([
+                    ('s', str(stmt[0])),
+                    ('p', str(putPred))]),
+                          str(stmt[2].toPython()),
+                          agent=agentFor.get(stmt[0], None),
+                          refreshSecs=self._getRefreshSecs(stmt[0]))
+                activated.add((stmt[0],
+                               # didn't test that this should be
+                               # stmt[1] and not putPred
+                               stmt[1]))
+        return activated
 
     def _oneShotPostActions(self, deviceGraph, inferred):
         """
@@ -207,58 +113,41 @@
                 if not self.mockOutput:
                     treq.post(postTarget, timeout=2).addErrback(err)
 
-    def _putDevices(self, deviceGraph, inferred):
-        activated = set()
-        agentFor = {}
-        for stmt in inferred:
-            if stmt[1] == ROOM['putAgent']:
-                agentFor[stmt[0]] = stmt[2]
-        for stmt in inferred:
-            log.debug('inferred stmt we might PUT: %s', stmt)
-            putUrl = deviceGraph.value(stmt[0], ROOM['putUrl'])
-            putPred = deviceGraph.value(stmt[0], ROOM['putPredicate'])
-            matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'],
-                                          default=putPred)
-            if putUrl and matchPred == stmt[1]:
-                log.debug('putDevices: stmt %r %r %r leds to putting at %r',
-                         stmt[0], stmt[1], stmt[2], putUrl)
-                self._put(putUrl + '?' + urllib.urlencode([
-                    ('s', str(stmt[0])),
-                    ('p', str(putPred))]),
-                          str(stmt[2].toPython()),
-                          agent=agentFor.get(stmt[0], None))
-                activated.add((stmt[0],
-                               # didn't test that this should be
-                               # stmt[1] and not putPred
-                               stmt[1]))
-        return activated
+    def putDefaults(self, deviceGraph, activated):
+        """
+        If inferring (:a :b :c) would cause a PUT, you can say
+
+        reasoning:defaultOutput reasoning:default [
+          :subject :a
+          :predicate :b
+          :defaultObject :c
+        ]
+
+        and we'll do that PUT if no rule has put anything else with
+        (:a :b *).
+        """
 
-    def _putInferred(self, deviceGraph, putUrl, obj):
-        """
-        HTTP PUT to putUrl, with a payload that's either obj's :putValue
-        or obj itself.
-        """
-        value = deviceGraph.value(obj, ROOM.putValue)
-        if value is not None:
-            self._put(putUrl, payload=str(value))
-        elif isinstance(obj, Literal):
-            self._put(putUrl, payload=str(obj))
-        else:
-            log.warn("    don't know what payload to put for %s. obj=%r",
-                        putUrl, obj)
+        defaultStmts = set()
+        for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'],
+                                               REASONING['default']):
+            s = deviceGraph.value(defaultDesc, ROOM['subject'])
+            p = deviceGraph.value(defaultDesc, ROOM['predicate'])
+            if (s, p) not in activated:
+                obj = deviceGraph.value(defaultDesc, ROOM['defaultObject'])
 
-    def _putZero(self, deviceGraph, dev, pred, putUrl):
-        # zerovalue should be a function of pred as well.
-        value = deviceGraph.value(dev, ROOM.zeroValue)
-        if value is not None:
-            log.debug("    put zero (%r) to %s", value.toPython(), putUrl)
-            self._put(putUrl, payload=str(value))
-            # this should be written back into the inferred graph
-            # for feedback
+                defaultStmts.add((s, p, obj))
+                log.debug('defaultStmts %s', ntStatement((s, p, obj)))
+        self._putDevices(deviceGraph, defaultStmts)
 
-    def _put(self, url, payload, agent=None):
+    def _getRefreshSecs(self, target):
+        # should be able to map(secsFromLiteral) in here somehow and
+        # remove the workaround in httpputoutputs.currentRefreshSecs
+        return self.inputGraph.rxValue(target, ROOM['refreshPutValue'],
+                                       default=Literal('30s'))#.map(secsFromLiteral)
+
+    def _put(self, url, payload, refreshSecs, agent=None):
         assert isinstance(payload, bytes)
-        self.putOutputs.put(url, payload, agent)
+        self.putOutputs.put(url, payload, agent, refreshSecs)
 
 import cyclone.sse
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/httpputoutputs.py	Wed Feb 05 00:19:43 2020 -0800
@@ -0,0 +1,114 @@
+import logging
+import time
+
+from rx.subjects import BehaviorSubject
+from twisted.internet import reactor
+import treq
+
+log = logging.getLogger('httpputoutputs')
+
+class HttpPutOutput(object):
+    def __init__(self, url,
+                 refreshSecs,#: BehaviorSubject,
+                 mockOutput=False):
+        self.url = url
+        self.mockOutput = mockOutput
+        self.payload = None
+        self.foafAgent = None
+        self.nextCall = None
+        self.lastErr = None
+        self.numRequests = 0
+        self.refreshSecs = refreshSecs
+
+    def report(self):
+        return {
+            'url': self.url,
+            'urlAbbrev': self.url
+            .replace('http%3A%2F%2Fprojects.bigasterisk.com%2Froom%2F', ':')
+            .replace('http://projects.bigasterisk.com/room/', ':')
+            .replace('.vpn-home.bigasterisk.com', '.vpn-home'),
+            'payload': self.payload,
+            'numRequests': self.numRequests,
+            'lastChangeTime': round(self.lastChangeTime, 2),
+            'lastErr': str(self.lastErr) if self.lastErr is not None else None,
+            }
+
+    def setPayload(self, payload, foafAgent):
+        if self.numRequests > 0 and (self.payload == payload and
+                                     self.foafAgent == foafAgent):
+            return
+        self.payload = payload
+        self.foafAgent = foafAgent
+        self.lastChangeTime = time.time()
+        self.makeRequest()
+
+    def makeRequest(self):
+        if self.payload is None:
+            log.debug("PUT None to %s - waiting", self.url)
+            return
+        h = {}
+        if self.foafAgent:
+            h['x-foaf-agent'] = self.foafAgent
+        if self.nextCall and self.nextCall.active():
+            self.nextCall.cancel()
+            self.nextCall = None
+        self.lastErr = None
+        log.debug("PUT %s payload=%s agent=%s",
+                  self.url, self.payload, self.foafAgent)
+        if not self.mockOutput:
+            self.currentRequest = treq.put(self.url, data=self.payload,
+                                           headers=h, timeout=3)
+            self.currentRequest.addCallback(self.onResponse).addErrback(
+                self.onError)
+        else:
+            reactor.callLater(.2, self.onResponse, None)
+
+        self.numRequests += 1
+
+    def currentRefreshSecs(self):
+        out = None
+        if 1:
+            # workaround
+            def secsFromLiteral(v):
+                if v[-1] != 's':
+                    raise NotImplementedError(v)
+                return float(v[:-1])
+
+            out = secsFromLiteral(self.refreshSecs.value)
+        else:
+            # goal: caller should map secsFromLiteral on the
+            # observable, so we see a float
+            def recv(v):
+                log.info('recv %r', v)
+            import ipdb;ipdb.set_trace()
+            self.refreshSecs.subscribe(recv)
+            if out is None:
+                raise ValueError('refreshSecs had no value')
+        log.debug('    got refresh %r', out)
+        return out
+
+    def onResponse(self, resp):
+        log.debug("  PUT %s ok", self.url)
+        self.lastErr = None
+        self.currentRequest = None
+        self.nextCall = reactor.callLater(self.currentRefreshSecs(),
+                                          self.makeRequest)
+
+    def onError(self, err):
+        self.lastErr = err
+        log.debug('  PUT %s failed: %s', self.url, err)
+        self.currentRequest = None
+        self.nextCall = reactor.callLater(self.currentRefreshSecs(),
+                                          self.makeRequest)
+
+class HttpPutOutputs(object):
+    """these grow forever"""
+    def __init__(self, mockOutput=False):
+        self.mockOutput = mockOutput
+        self.state = {} # url: HttpPutOutput
+
+    def put(self, url, payload, foafAgent, refreshSecs):
+        if url not in self.state:
+            self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput,
+                                            refreshSecs=refreshSecs)
+        self.state[url].setPayload(payload, foafAgent)