changeset 1045:a328cc370b22

ipv6 fetch support. refactor Actions to new class and file Ignore-this: 200d7093919cf001706ad9c02347fabb darcs-hash:176d0511a0f5f4b9dab9317103c2b095196326fb
author drewp <drewp@bigasterisk.com>
date Mon, 01 Feb 2016 03:28:17 -0800
parents 724cb8ea49b4
children 79c566654151
files service/reasoning/actions.py service/reasoning/rdflibtrig.py service/reasoning/reasoning.py
diffstat 3 files changed, 207 insertions(+), 205 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/actions.py	Mon Feb 01 03:28:17 2016 -0800
@@ -0,0 +1,171 @@
+from rdflib import URIRef, Namespace, RDF, Literal
+import logging
+import urllib
+
+from cyclone.httpclient import fetch
+log = logging.getLogger('output')
+log.setLevel(logging.WARN)
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+DEV = Namespace("http://projects.bigasterisk.com/device/")
+
+class Actions(object):
+    def __init__(self, sendToLiveClients):
+        self.sendToLiveClients = sendToLiveClients
+        
+    def putResults(self, deviceGraph, inferred):
+        """
+        some conclusions in the inferred graph lead to PUT requests
+        getting made
+
+        if the graph contains (?d ?p ?o) and ?d and ?p are a device
+        and predicate we support PUTs for, then we look up
+        (?d :putUrl ?url) and (?o :putValue ?val) and call
+        PUT ?url <- ?val
+
+        If the graph doesn't contain any matches, we use (?d
+        :zeroValue ?val) for the value and PUT that.
+        """
+        
+        self._oneShotPostActions(deviceGraph, inferred)
+        for dev, pred in [
+            # the config of each putUrl should actually be in the
+            # context of a dev and predicate pair, and then that would
+            # be the source of this list
+            #(DEV.theaterDoorLock, ROOM.state),
+            #(URIRef('http://bigasterisk.com/host/bang/monitor'), ROOM.powerState),
+            (URIRef('http://bigasterisk.com/host/dash/monitor'), ROOM.powerState),
+            (URIRef('http://projects.bigasterisk.com/room/storageCeilingLedLong'), ROOM.brightness),
+            (URIRef('http://projects.bigasterisk.com/room/storageCeilingLedCross'), ROOM.brightness),               
+            ]:
+            url = deviceGraph.value(dev, ROOM.putUrl)
+
+            if url and dev == DEV.theaterDoorLock: # ew
+                self._put(url+"/mode", payload="output")
+
+            inferredObjects = list(inferred.objects(dev, pred))
+            if len(inferredObjects) == 0:
+                self._putZero(deviceGraph, dev, pred, url)
+            elif len(inferredObjects) == 1:
+                self._putInferred(deviceGraph, url, inferredObjects[0])
+            elif len(inferredObjects) > 1:
+                log.info("conflict, ignoring: %s has %s of %s" %
+                         (dev, pred, inferredObjects))
+                # write about it to the inferred graph?
+
+        #self._frontDoorPuts(deviceGraph, inferred)
+
+
+    def _put(self, url, payload):
+        def err(e):
+            log.warn("    put %s failed (%r)", url, e)
+        log.info("    PUT %s payload=%r", url, payload)
+        fetch(url, method="PUT", postdata=payload, timeout=2).addErrback(err)
+
+    def _oneShotPostActions(self, deviceGraph, inferred):
+        """
+        Inferred graph may contain some one-shot statements. We'll send
+        statement objects to anyone on web sockets, and also generate
+        POST requests as described in the graph.
+
+        one-shot statement ?s ?p ?o
+        with this in the graph:
+          ?osp a :OneShotPost
+          ?osp :subject ?s
+          ?osp :predicate ?p
+        this will cause a post to ?o 
+        """
+        # nothing in this actually makes them one-shot yet. they'll
+        # just fire as often as we get in here, which is not desirable
+        log.info("_oneShotPostActions")
+        def err(e):
+            log.warn("post %s failed", postTarget)
+        for osp in deviceGraph.subjects(RDF.type, ROOM['OneShotPost']):
+            s = deviceGraph.value(osp, ROOM['subject'])
+            p = deviceGraph.value(osp, ROOM['predicate'])
+            if s is None or p is None:
+                continue
+            for postTarget in inferred.objects(s, p):
+                log.info("post target %r", postTarget)
+                # this packet ought to have 'oneShot' in it somewhere
+                self.sendToLiveClients({"s":s, "p":p, "o":postTarget})
+
+                log.info("    POST %s", postTarget)
+                fetch(postTarget, method="POST", timeout=2).addErrback(err)
+        self._postMpdCommands(inferred)
+        
+    def _postMpdCommands(self, inferred):
+        """special case to be eliminated. mpd play urls are made of an
+        mpd service and a song/album/playlist uri to be played.
+        Ideally the graph rules would assemble these like
+        http://{mpd}/addAndPlay?uri={toPlay} or maybe toPlay as the payload
+        which would be fairly general but still allow toPlay uris to
+        be matched with any player."""
+        def post(postTarget):
+            log.info("special mpd POST %s", postTarget)
+            def err(e):
+                log.warn("post %s failed", postTarget)
+            fetch(postTarget, method="POST", timeout=2).addErrback(err)
+
+        rootSkippingAuth = "http://brace:9009/"
+        for mpd in [URIRef("http://bigasterisk.com/host/brace/mpd")]:
+
+
+            for song in inferred.objects(mpd, ROOM['startMusic']):
+                log.info("mpd statement: %r" % song)
+                assert song.startswith('http://bigasterisk.com/music/')
+                post(rootSkippingAuth + "addAndPlay" + urllib.quote(song[len("http://bigasterisk.com/music"):]))
+
+            for state in inferred.objects(mpd, ROOM['playState']):
+                log.info('hello playstate %s', state)
+                if state == ROOM['pause']:
+                    log.info("mpd %s %s", mpd, state)
+                    post(rootSkippingAuth + "mpd/pause")
+            for vol in inferred.objects(mpd, ROOM['audioState']):
+                if vol == ROOM['volumeStepUp']:
+                    post(rootSkippingAuth + "volumeAdjust?amount=6&max=70")
+                if vol == ROOM['volumeStepDown']:
+                    post(rootSkippingAuth + "volumeAdjust?amount=-6&min=10")
+            
+    def _putZero(self, deviceGraph, dev, pred, putUrl):
+        # zerovalue should be a function of pred as well.
+        value = deviceGraph.value(dev, ROOM.zeroValue)
+        if value is not None:
+            log.info("put zero (%r) to %s", value, putUrl)
+            self._put(putUrl, payload=str(value))
+            # this should be written back into the inferred graph
+            # for feedback
+
+    def _putInferred(self, deviceGraph, putUrl, obj):
+        """
+        HTTP PUT to putUrl, with a payload that's either obj's :putValue
+        or obj itself.
+        """
+        value = deviceGraph.value(obj, ROOM.putValue)
+        if value is not None:
+            log.info("put %s to %s", value, putUrl)
+            self._put(putUrl, payload=str(value))
+        elif isinstance(obj, Literal):
+            log.info("put %s to %s", obj, putUrl)
+            self._put(putUrl, payload=str(obj))
+        else:
+            log.warn("don't know what payload to put for %s. obj=%r",
+                        putUrl, obj)
+
+    def _frontDoorPuts(self, deviceGraph, inferred):
+        # todo: shouldn't have to be a special case
+        brt = inferred.value(DEV.frontDoorLcd, ROOM.brightness)
+        if brt is None:
+            return
+        url = deviceGraph.value(DEV.frontDoorLcdBrightness, ROOM.putUrl)
+        log.info("put lcd %s brightness %s", url, brt)
+        self._put(str(url) + "?brightness=%s" % str(brt), payload='')
+
+        msg = "open %s motion %s" % (
+            inferred.value(DEV['frontDoorOpenIndicator'], ROOM.text),
+            inferred.value(DEV['frontDoorMotionIndicator'], ROOM.text))
+        # this was meant to be 2 chars in the bottom row, but the
+        # easier test was to replace the whole top msg
+        #restkit.Resource("http://slash:9080/").put("lcd", message=msg)
+
+    
--- a/service/reasoning/rdflibtrig.py	Mon Feb 01 03:09:08 2016 -0800
+++ b/service/reasoning/rdflibtrig.py	Mon Feb 01 03:28:17 2016 -0800
@@ -1,16 +1,14 @@
-import re, time
-import restkit
-from rdflib.parser import StringInputSource
+import time
+import requests
 from rdflib import ConjunctiveGraph
         
 def addTrig(graph, url, timeout=2):
     t1 = time.time()
-    response = restkit.request(url, timeout=timeout)
-    if response.status_int != 200:
+    response = requests.get(url, stream=True, timeout=timeout)
+    if response.status_code != 200:
         raise ValueError("status %s from %s" % (response.status, url))
-    trig = response.body_string()
+    g = ConjunctiveGraph()
+    g.parse(response.raw, format='trig')
     fetchTime = time.time() - t1
-    g = ConjunctiveGraph()
-    g.parse(StringInputSource(trig), format='trig')
     graph.addN(g.quads())
     return fetchTime
--- a/service/reasoning/reasoning.py	Mon Feb 01 03:09:08 2016 -0800
+++ b/service/reasoning/reasoning.py	Mon Feb 01 03:28:17 2016 -0800
@@ -22,13 +22,13 @@
 from rdflib import Graph, ConjunctiveGraph
 from rdflib import Namespace, URIRef, Literal, RDF
 from rdflib.parser import StringInputSource
-from cyclone.httpclient import fetch
+
 import cyclone.web, cyclone.websocket
 from inference import infer
 from rdflibtrig import addTrig
 from graphop import graphEqual
 from docopt import docopt
-
+from actions import Actions
 from FuXi.Rete.RuleStore import N3RuleStore
 
 sys.path.append("../../lib")
@@ -102,8 +102,8 @@
                 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e))))
                 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
             else:
-                g.add((URIRef(source), ROOM['graphLoadSecs'],
-                       Literal(fetchTime)))
+                g.add((URIRef(source), ROOM['graphLoadMs'],
+                       Literal(round(fetchTime * 1000, 1))))
         prevGraph = self._remoteGraph
         self._remoteGraph = g
         self._combinedGraph = None
@@ -158,18 +158,22 @@
 
         return self._combinedGraph
 
+        
 class Reasoning(object):
     def __init__(self):
         self.prevGraph = None
         self.lastPollTime = 0
         self.lastError = ""
 
+        self.actions = Actions(sendToLiveClients)
+
         self.rulesN3 = "(not read yet)"
         self.inferred = Graph() # gets replaced in each graphChanged call
 
-        self.inputGraph = InputGraph([], self.graphChanged)
+        self.inputGraph = InputGraph([], self.graphChanged)      
         self.inputGraph.updateFileData()
 
+
     def readRules(self):
         self.rulesN3 = open('rules.n3').read() # for web display
         self.ruleStore = N3RuleStore()
@@ -184,26 +188,30 @@
             log.error(traceback.format_exc())
             self.lastError = str(e)
 
+    def updateRules(self):
+        try:
+            t1 = time.time()
+            self.readRules()
+            ruleParseTime = time.time() - t1
+        except ValueError:
+            # this is so if you're just watching the inferred output,
+            # you'll see the error too
+            self.inferred = Graph()
+            self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'],
+                               Literal(traceback.format_exc())))
+            raise
+        return [(ROOM['reasoner'], ROOM['ruleParseTime'],
+                               Literal(ruleParseTime))]
+
     def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None):
         t1 = time.time()
         oldInferred = self.inferred
         try:
-            try:
-                t1 = time.time()
-                self.readRules()
-                ruleParseTime = time.time() - t1
-            except ValueError, e:
-                # this is so if you're just watching the inferred output,
-                # you'll see the error too
-                self.inferred = Graph()
-                self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'],
-                                   Literal(traceback.format_exc())))
-                raise
-
+            ruleStmts = self.updateRules()
+            
             g = inputGraph.getGraph()
             self.inferred = self._makeInferred(g)
-            self.inferred.add((ROOM['reasoner'], ROOM['ruleParseTime'],
-                               Literal(ruleParseTime)))
+            [self.inferred.add(s) for s in ruleStmts]
 
             if oneShot:
                 # unclear where this should go, but the oneshot'd
@@ -213,18 +221,14 @@
                     self.inferred.add(s)
 
             t2 = time.time()
-            self.putResults(self.inferred)
+            self.actions.putResults(self.inputGraph.getGraph(), self.inferred)
             putResultsTime = time.time() - t2
-            t3 = time.time()
-            self._postToMagma(g)
-            postMagmaTime = time.time() - t3
         finally:
             if oneShot:
                 self.inferred = oldInferred
-        log.info("graphChanged %.1f ms (putResults %.1f ms; postToMagma %.1f ms)" %
+        log.info("graphChanged %.1f ms (putResults %.1f ms)" %
                  ((time.time() - t1) * 1000,
-                  putResultsTime * 1000,
-                  postMagmaTime * 1000))
+                  putResultsTime * 1000))
 
     def _makeInferred(self, inputGraph):
         t1 = time.time()
@@ -235,177 +239,6 @@
                  Literal(inferenceTime)))
         return out
 
-    def _postToMagma(self, inputGraph):
-        return # not up right now
-        inputGraphNt = inputGraph.serialize(format="nt")
-        inferredNt = self.inferred.serialize(format="nt")
-        body = json.dumps({"input": inputGraphNt,
-                           "inferred": inferredNt})
-        def err(e):
-            log.error("while sending changes to magma:")
-            log.error(e)
-
-        fetch("http://bang:8014/reasoningChange",
-              method="POST",
-              timeout=2,
-              postdata=body,
-              headers={"content-type" : ["application/json"]}).addErrback(err)
-
-    def _put(self, url, payload):
-        def err(e):
-            outlog.warn("    put %s failed", url)
-        outlog.info("    PUT %s payload=%r", url, payload)
-        fetch(url, method="PUT", postdata=payload, timeout=2).addErrback(err)
-
-    def putResults(self, inferred):
-        """
-        some conclusions in the inferred graph lead to PUT requests
-        getting made
-
-        if the graph contains (?d ?p ?o) and ?d and ?p are a device
-        and predicate we support PUTs for, then we look up
-        (?d :putUrl ?url) and (?o :putValue ?val) and call
-        PUT ?url <- ?val
-
-        If the graph doesn't contain any matches, we use (?d
-        :zeroValue ?val) for the value and PUT that.
-        """
-        deviceGraph = self.inputGraph.getGraph()
-        self.oneShotPostActions(deviceGraph, inferred)
-        for dev, pred in [
-            # the config of each putUrl should actually be in the
-            # context of a dev and predicate pair, and then that would
-            # be the source of this list
-            #(DEV.theaterDoorLock, ROOM.state),
-            #(URIRef('http://bigasterisk.com/host/bang/monitor'), ROOM.powerState),
-            (URIRef('http://bigasterisk.com/host/dash/monitor'), ROOM.powerState),
-            (URIRef('http://projects.bigasterisk.com/room/storageCeilingLedLong'), ROOM.brightness),
-            (URIRef('http://projects.bigasterisk.com/room/storageCeilingLedCross'), ROOM.brightness),               
-            ]:
-            url = deviceGraph.value(dev, ROOM.putUrl)
-
-            if url and dev == DEV.theaterDoorLock: # ew
-                self._put(url+"/mode", payload="output")
-
-            inferredObjects = list(inferred.objects(dev, pred))
-            if len(inferredObjects) == 0:
-                self.putZero(deviceGraph, dev, pred, url)
-            elif len(inferredObjects) == 1:
-                self.putInferred(deviceGraph, url, inferredObjects[0])
-            elif len(inferredObjects) > 1:
-                log.info("conflict, ignoring: %s has %s of %s" %
-                         (dev, pred, inferredObjects))
-                # write about it to the inferred graph?
-
-        #self.frontDoorPuts(deviceGraph, inferred)
-
-
-    def oneShotPostActions(self, deviceGraph, inferred):
-        """
-        Inferred graph may contain some one-shot statements. We'll send
-        statement objects to anyone on web sockets, and also generate
-        POST requests as described in the graph.
-
-        one-shot statement ?s ?p ?o
-        with this in the graph:
-          ?osp a :OneShotPost
-          ?osp :subject ?s
-          ?osp :predicate ?p
-        this will cause a post to ?o 
-        """
-        # nothing in this actually makes them one-shot yet. they'll
-        # just fire as often as we get in here, which is not desirable
-        log.info("oneShotPostActions")
-        def err(e):
-            outlog.warn("post %s failed", postTarget)
-        for osp in deviceGraph.subjects(RDF.type, ROOM['OneShotPost']):
-            s = deviceGraph.value(osp, ROOM['subject'])
-            p = deviceGraph.value(osp, ROOM['predicate'])
-            if s is None or p is None:
-                continue
-            for postTarget in inferred.objects(s, p):
-                log.info("post target %r", postTarget)
-                # this packet ought to have 'oneShot' in it somewhere
-                sendToLiveClients({"s":s, "p":p, "o":postTarget})
-
-                outlog.info("    POST %s", postTarget)
-                fetch(postTarget, method="POST", timeout=2).addErrback(err)
-        self.postMpdCommands(inferred)
-        
-    def postMpdCommands(self, inferred):
-        """special case to be eliminated. mpd play urls are made of an
-        mpd service and a song/album/playlist uri to be played.
-        Ideally the graph rules would assemble these like
-        http://{mpd}/addAndPlay?uri={toPlay} or maybe toPlay as the payload
-        which would be fairly general but still allow toPlay uris to
-        be matched with any player."""
-        def post(postTarget):
-            outlog.info("special mpd POST %s", postTarget)
-            def err(e):
-                outlog.warn("post %s failed", postTarget)
-            fetch(postTarget, method="POST", timeout=2).addErrback(err)
-
-        rootSkippingAuth = "http://brace:9009/"
-        for mpd in [URIRef("http://bigasterisk.com/host/brace/mpd")]:
-
-
-            for song in inferred.objects(mpd, ROOM['startMusic']):
-                outlog.info("mpd statement: %r" % song)
-                assert song.startswith('http://bigasterisk.com/music/')
-                post(rootSkippingAuth + "addAndPlay" + urllib.quote(song[len("http://bigasterisk.com/music"):]))
-
-            for state in inferred.objects(mpd, ROOM['playState']):
-                log.info('hello playstate %s', state)
-                if state == ROOM['pause']:
-                    log.info("mpd %s %s", mpd, state)
-                    post(rootSkippingAuth + "mpd/pause")
-            for vol in inferred.objects(mpd, ROOM['audioState']):
-                if vol == ROOM['volumeStepUp']:
-                    post(rootSkippingAuth + "volumeAdjust?amount=6&max=70")
-                if vol == ROOM['volumeStepDown']:
-                    post(rootSkippingAuth + "volumeAdjust?amount=-6&min=10")
-            
-    def putZero(self, deviceGraph, dev, pred, putUrl):
-        # zerovalue should be a function of pred as well.
-        value = deviceGraph.value(dev, ROOM.zeroValue)
-        if value is not None:
-            outlog.info("put zero (%r) to %s", value, putUrl)
-            self._put(putUrl, payload=str(value))
-            # this should be written back into the inferred graph
-            # for feedback
-
-    def putInferred(self, deviceGraph, putUrl, obj):
-        """
-        HTTP PUT to putUrl, with a payload that's either obj's :putValue
-        or obj itself.
-        """
-        value = deviceGraph.value(obj, ROOM.putValue)
-        if value is not None:
-            outlog.info("put %s to %s", value, putUrl)
-            self._put(putUrl, payload=str(value))
-        elif isinstance(obj, Literal):
-            outlog.info("put %s to %s", obj, putUrl)
-            self._put(putUrl, payload=str(obj))
-        else:
-            outlog.warn("don't know what payload to put for %s. obj=%r",
-                        putUrl, obj)
-
-    def frontDoorPuts(self, deviceGraph, inferred):
-        # todo: shouldn't have to be a special case
-        brt = inferred.value(DEV.frontDoorLcd, ROOM.brightness)
-        if brt is None:
-            return
-        url = deviceGraph.value(DEV.frontDoorLcdBrightness, ROOM.putUrl)
-        outlog.info("put lcd %s brightness %s", url, brt)
-        self._put(str(url) + "?brightness=%s" % str(brt), payload='')
-
-        msg = "open %s motion %s" % (
-            inferred.value(DEV['frontDoorOpenIndicator'], ROOM.text),
-            inferred.value(DEV['frontDoorMotionIndicator'], ROOM.text))
-        # this was meant to be 2 chars in the bottom row, but the
-        # easier test was to replace the whole top msg
-        #restkit.Resource("http://slash:9080/").put("lcd", message=msg)
-
 
 
 class Index(cyclone.web.RequestHandler):