diff service/reasoning/reasoning.py @ 240:0c306e76d8c5

ipv6 fetch support. refactor Actions to new class and file Ignore-this: 200d7093919cf001706ad9c02347fabb
author drewp@bigasterisk.com
date Mon, 01 Feb 2016 03:28:17 -0800
parents 5ad229334a88
children e5c27d2f11ab
line wrap: on
line diff
--- a/service/reasoning/reasoning.py	Mon Feb 01 03:09:08 2016 -0800
+++ b/service/reasoning/reasoning.py	Mon Feb 01 03:28:17 2016 -0800
@@ -22,13 +22,13 @@
 from rdflib import Graph, ConjunctiveGraph
 from rdflib import Namespace, URIRef, Literal, RDF
 from rdflib.parser import StringInputSource
-from cyclone.httpclient import fetch
+
 import cyclone.web, cyclone.websocket
 from inference import infer
 from rdflibtrig import addTrig
 from graphop import graphEqual
 from docopt import docopt
-
+from actions import Actions
 from FuXi.Rete.RuleStore import N3RuleStore
 
 sys.path.append("../../lib")
@@ -102,8 +102,8 @@
                 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e))))
                 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
             else:
-                g.add((URIRef(source), ROOM['graphLoadSecs'],
-                       Literal(fetchTime)))
+                g.add((URIRef(source), ROOM['graphLoadMs'],
+                       Literal(round(fetchTime * 1000, 1))))
         prevGraph = self._remoteGraph
         self._remoteGraph = g
         self._combinedGraph = None
@@ -158,18 +158,22 @@
 
         return self._combinedGraph
 
+        
 class Reasoning(object):
     def __init__(self):
         self.prevGraph = None
         self.lastPollTime = 0
         self.lastError = ""
 
+        self.actions = Actions(sendToLiveClients)
+
         self.rulesN3 = "(not read yet)"
         self.inferred = Graph() # gets replaced in each graphChanged call
 
-        self.inputGraph = InputGraph([], self.graphChanged)
+        self.inputGraph = InputGraph([], self.graphChanged)      
         self.inputGraph.updateFileData()
 
+
     def readRules(self):
         self.rulesN3 = open('rules.n3').read() # for web display
         self.ruleStore = N3RuleStore()
@@ -184,26 +188,30 @@
             log.error(traceback.format_exc())
             self.lastError = str(e)
 
+    def updateRules(self):
+        try:
+            t1 = time.time()
+            self.readRules()
+            ruleParseTime = time.time() - t1
+        except ValueError:
+            # this is so if you're just watching the inferred output,
+            # you'll see the error too
+            self.inferred = Graph()
+            self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'],
+                               Literal(traceback.format_exc())))
+            raise
+        return [(ROOM['reasoner'], ROOM['ruleParseTime'],
+                               Literal(ruleParseTime))]
+
     def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None):
         t1 = time.time()
         oldInferred = self.inferred
         try:
-            try:
-                t1 = time.time()
-                self.readRules()
-                ruleParseTime = time.time() - t1
-            except ValueError, e:
-                # this is so if you're just watching the inferred output,
-                # you'll see the error too
-                self.inferred = Graph()
-                self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'],
-                                   Literal(traceback.format_exc())))
-                raise
-
+            ruleStmts = self.updateRules()
+            
             g = inputGraph.getGraph()
             self.inferred = self._makeInferred(g)
-            self.inferred.add((ROOM['reasoner'], ROOM['ruleParseTime'],
-                               Literal(ruleParseTime)))
+            [self.inferred.add(s) for s in ruleStmts]
 
             if oneShot:
                 # unclear where this should go, but the oneshot'd
@@ -213,18 +221,14 @@
                     self.inferred.add(s)
 
             t2 = time.time()
-            self.putResults(self.inferred)
+            self.actions.putResults(self.inputGraph.getGraph(), self.inferred)
             putResultsTime = time.time() - t2
-            t3 = time.time()
-            self._postToMagma(g)
-            postMagmaTime = time.time() - t3
         finally:
             if oneShot:
                 self.inferred = oldInferred
-        log.info("graphChanged %.1f ms (putResults %.1f ms; postToMagma %.1f ms)" %
+        log.info("graphChanged %.1f ms (putResults %.1f ms)" %
                  ((time.time() - t1) * 1000,
-                  putResultsTime * 1000,
-                  postMagmaTime * 1000))
+                  putResultsTime * 1000))
 
     def _makeInferred(self, inputGraph):
         t1 = time.time()
@@ -235,177 +239,6 @@
                  Literal(inferenceTime)))
         return out
 
-    def _postToMagma(self, inputGraph):
-        return # not up right now
-        inputGraphNt = inputGraph.serialize(format="nt")
-        inferredNt = self.inferred.serialize(format="nt")
-        body = json.dumps({"input": inputGraphNt,
-                           "inferred": inferredNt})
-        def err(e):
-            log.error("while sending changes to magma:")
-            log.error(e)
-
-        fetch("http://bang:8014/reasoningChange",
-              method="POST",
-              timeout=2,
-              postdata=body,
-              headers={"content-type" : ["application/json"]}).addErrback(err)
-
-    def _put(self, url, payload):
-        def err(e):
-            outlog.warn("    put %s failed", url)
-        outlog.info("    PUT %s payload=%r", url, payload)
-        fetch(url, method="PUT", postdata=payload, timeout=2).addErrback(err)
-
-    def putResults(self, inferred):
-        """
-        some conclusions in the inferred graph lead to PUT requests
-        getting made
-
-        if the graph contains (?d ?p ?o) and ?d and ?p are a device
-        and predicate we support PUTs for, then we look up
-        (?d :putUrl ?url) and (?o :putValue ?val) and call
-        PUT ?url <- ?val
-
-        If the graph doesn't contain any matches, we use (?d
-        :zeroValue ?val) for the value and PUT that.
-        """
-        deviceGraph = self.inputGraph.getGraph()
-        self.oneShotPostActions(deviceGraph, inferred)
-        for dev, pred in [
-            # the config of each putUrl should actually be in the
-            # context of a dev and predicate pair, and then that would
-            # be the source of this list
-            #(DEV.theaterDoorLock, ROOM.state),
-            #(URIRef('http://bigasterisk.com/host/bang/monitor'), ROOM.powerState),
-            (URIRef('http://bigasterisk.com/host/dash/monitor'), ROOM.powerState),
-            (URIRef('http://projects.bigasterisk.com/room/storageCeilingLedLong'), ROOM.brightness),
-            (URIRef('http://projects.bigasterisk.com/room/storageCeilingLedCross'), ROOM.brightness),               
-            ]:
-            url = deviceGraph.value(dev, ROOM.putUrl)
-
-            if url and dev == DEV.theaterDoorLock: # ew
-                self._put(url+"/mode", payload="output")
-
-            inferredObjects = list(inferred.objects(dev, pred))
-            if len(inferredObjects) == 0:
-                self.putZero(deviceGraph, dev, pred, url)
-            elif len(inferredObjects) == 1:
-                self.putInferred(deviceGraph, url, inferredObjects[0])
-            elif len(inferredObjects) > 1:
-                log.info("conflict, ignoring: %s has %s of %s" %
-                         (dev, pred, inferredObjects))
-                # write about it to the inferred graph?
-
-        #self.frontDoorPuts(deviceGraph, inferred)
-
-
-    def oneShotPostActions(self, deviceGraph, inferred):
-        """
-        Inferred graph may contain some one-shot statements. We'll send
-        statement objects to anyone on web sockets, and also generate
-        POST requests as described in the graph.
-
-        one-shot statement ?s ?p ?o
-        with this in the graph:
-          ?osp a :OneShotPost
-          ?osp :subject ?s
-          ?osp :predicate ?p
-        this will cause a post to ?o 
-        """
-        # nothing in this actually makes them one-shot yet. they'll
-        # just fire as often as we get in here, which is not desirable
-        log.info("oneShotPostActions")
-        def err(e):
-            outlog.warn("post %s failed", postTarget)
-        for osp in deviceGraph.subjects(RDF.type, ROOM['OneShotPost']):
-            s = deviceGraph.value(osp, ROOM['subject'])
-            p = deviceGraph.value(osp, ROOM['predicate'])
-            if s is None or p is None:
-                continue
-            for postTarget in inferred.objects(s, p):
-                log.info("post target %r", postTarget)
-                # this packet ought to have 'oneShot' in it somewhere
-                sendToLiveClients({"s":s, "p":p, "o":postTarget})
-
-                outlog.info("    POST %s", postTarget)
-                fetch(postTarget, method="POST", timeout=2).addErrback(err)
-        self.postMpdCommands(inferred)
-        
-    def postMpdCommands(self, inferred):
-        """special case to be eliminated. mpd play urls are made of an
-        mpd service and a song/album/playlist uri to be played.
-        Ideally the graph rules would assemble these like
-        http://{mpd}/addAndPlay?uri={toPlay} or maybe toPlay as the payload
-        which would be fairly general but still allow toPlay uris to
-        be matched with any player."""
-        def post(postTarget):
-            outlog.info("special mpd POST %s", postTarget)
-            def err(e):
-                outlog.warn("post %s failed", postTarget)
-            fetch(postTarget, method="POST", timeout=2).addErrback(err)
-
-        rootSkippingAuth = "http://brace:9009/"
-        for mpd in [URIRef("http://bigasterisk.com/host/brace/mpd")]:
-
-
-            for song in inferred.objects(mpd, ROOM['startMusic']):
-                outlog.info("mpd statement: %r" % song)
-                assert song.startswith('http://bigasterisk.com/music/')
-                post(rootSkippingAuth + "addAndPlay" + urllib.quote(song[len("http://bigasterisk.com/music"):]))
-
-            for state in inferred.objects(mpd, ROOM['playState']):
-                log.info('hello playstate %s', state)
-                if state == ROOM['pause']:
-                    log.info("mpd %s %s", mpd, state)
-                    post(rootSkippingAuth + "mpd/pause")
-            for vol in inferred.objects(mpd, ROOM['audioState']):
-                if vol == ROOM['volumeStepUp']:
-                    post(rootSkippingAuth + "volumeAdjust?amount=6&max=70")
-                if vol == ROOM['volumeStepDown']:
-                    post(rootSkippingAuth + "volumeAdjust?amount=-6&min=10")
-            
-    def putZero(self, deviceGraph, dev, pred, putUrl):
-        # zerovalue should be a function of pred as well.
-        value = deviceGraph.value(dev, ROOM.zeroValue)
-        if value is not None:
-            outlog.info("put zero (%r) to %s", value, putUrl)
-            self._put(putUrl, payload=str(value))
-            # this should be written back into the inferred graph
-            # for feedback
-
-    def putInferred(self, deviceGraph, putUrl, obj):
-        """
-        HTTP PUT to putUrl, with a payload that's either obj's :putValue
-        or obj itself.
-        """
-        value = deviceGraph.value(obj, ROOM.putValue)
-        if value is not None:
-            outlog.info("put %s to %s", value, putUrl)
-            self._put(putUrl, payload=str(value))
-        elif isinstance(obj, Literal):
-            outlog.info("put %s to %s", obj, putUrl)
-            self._put(putUrl, payload=str(obj))
-        else:
-            outlog.warn("don't know what payload to put for %s. obj=%r",
-                        putUrl, obj)
-
-    def frontDoorPuts(self, deviceGraph, inferred):
-        # todo: shouldn't have to be a special case
-        brt = inferred.value(DEV.frontDoorLcd, ROOM.brightness)
-        if brt is None:
-            return
-        url = deviceGraph.value(DEV.frontDoorLcdBrightness, ROOM.putUrl)
-        outlog.info("put lcd %s brightness %s", url, brt)
-        self._put(str(url) + "?brightness=%s" % str(brt), payload='')
-
-        msg = "open %s motion %s" % (
-            inferred.value(DEV['frontDoorOpenIndicator'], ROOM.text),
-            inferred.value(DEV['frontDoorMotionIndicator'], ROOM.text))
-        # this was meant to be 2 chars in the bottom row, but the
-        # easier test was to replace the whole top msg
-        #restkit.Resource("http://slash:9080/").put("lcd", message=msg)
-
 
 
 class Index(cyclone.web.RequestHandler):