changeset 47:0448fbd96a31

scan more input files. oneshot and immediate update features. Ignore-this: bb6c89f44478d27456319b94db2ec81a
author drewp@bigasterisk.com
date Mon, 31 Dec 2012 00:47:12 -0800
parents f5623d9b07fd
children 571e773c77c3
files service/reasoning/index.html service/reasoning/oneShot service/reasoning/reasoning.py
diffstat 3 files changed, 153 insertions(+), 38 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/index.html	Sun Dec 30 20:26:38 2012 -0800
+++ b/service/reasoning/index.html	Mon Dec 31 00:47:12 2012 -0800
@@ -49,6 +49,8 @@
 		  ["http://projects.bigasterisk.com/room/", "room:"],
 		  ["http://projects.bigasterisk.com/device/", "dev:"],
 		  ["http://purl.org/dc/terms/", "dcterms:"],
+		  ["http://www.w3.org/2000/01/rdf-schema#", "rdfs:"],
+		  ["http://bigasterisk.com/map#", "map:"],
 		  ["http://www.w3.org/1999/02/22-rdf-syntax-ns#", "rdf:"]]; 
 	      for (i in repl) {
 		  var p=repl[i];
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/oneShot	Mon Dec 31 00:47:12 2012 -0800
@@ -0,0 +1,28 @@
+#!/usr/bin/python
+"""
+send a statement to the reasoning server for one update cycle. Args
+are s/p/o in n3 notation, with many prefixes predefined here.
+"""
+import sys, restkit
+s, p, o = sys.argv[1:]
+
+prefixes = {
+'room' : 'http://projects.bigasterisk.com/room/',
+    }
+
+def expand(term):
+    if ':' not in term:
+        return term
+    left, right = term.split(':', 1)
+    if left in prefixes:
+        return '<%s%s>' % (prefixes[left], right)
+    return term
+
+stmt = '%s %s %s .' % (expand(s), expand(p), expand(o))
+print "Sending: %s" % stmt
+
+reasoning = restkit.Resource("http://bang:9071/")
+reasoning.post("oneShot",
+               headers={"content-type": "text/n3"},
+               payload=stmt)
+
--- a/service/reasoning/reasoning.py	Sun Dec 30 20:26:38 2012 -0800
+++ b/service/reasoning/reasoning.py	Mon Dec 31 00:47:12 2012 -0800
@@ -18,9 +18,10 @@
 
 from twisted.internet import reactor, task
 from twisted.web.client import getPage
+from twisted.python.filepath import FilePath
 import time, traceback, sys, json, logging
 from rdflib.Graph import Graph, ConjunctiveGraph
-from rdflib import Namespace, URIRef, Literal, RDF
+from rdflib import Namespace, URIRef, Literal, RDF, StringInputSource
 import restkit
 from FuXi.Rete.RuleStore import N3RuleStore
 import cyclone.web
@@ -55,6 +56,12 @@
     """
     stmtsA = graphWithoutMetadata(a, ignorePredicates)
     stmtsB = graphWithoutMetadata(b, ignorePredicates)
+    if log.getEffectiveLevel() <= logging.DEBUG:
+        diff = set(stmtsA).symmetric_difference(set(stmtsB))
+        if diff:
+            log.debug("changing statements:")
+            for s in diff:
+                log.debug(str(s))
     return set(stmtsA) == set(stmtsB)
 
 class InputGraph(object):
@@ -71,7 +78,8 @@
         self.onChange = onChange
         self._fileGraph = Graph()
         self._remoteGraph = None
-        self.updateFileData()
+        self._combinedGraph = None
+        self._oneShotAdditionGraph = None
 
     def updateFileData(self):
         """
@@ -81,8 +89,14 @@
         # think I want to have a separate graph for the output
         # handling
         log.debug("read file graphs")
-        self._fileGraph.parse("/home/drewp/oldplus/home/drewp/projects/room/devices.n3", format="n3")
-        self._fileGraph.parse("/home/drewp/projects/homeauto/service/reasoning/input/startup.n3", format="n3")
+        for fp in FilePath("input").walk():
+            if fp.isdir():
+                continue
+            log.debug("read %s", fp)
+            # todo: if this fails, leave the report in the graph
+            self._fileGraph.parse(fp.open(), format="n3")
+            self._combinedGraph = None
+
         self.onChange(self)
 
     def updateRemoteData(self):
@@ -95,9 +109,10 @@
         for source in self._fileGraph.objects(ROOM['reasoning'],
                                               ROOM['source']):
             try:
+                # this part could be parallelized
                 fetchTime = addTrig(g, source)
             except Exception, e:
-                log.error("adding source %s: %s", source, e)
+                log.error("  adding source %s: %s", source, e)
                 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e))))
                 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
             else:
@@ -105,28 +120,52 @@
                        Literal(fetchTime)))
         prevGraph = self._remoteGraph
         self._remoteGraph = g
-        if prevGraph is not None:
-            log.debug("prev %s now %s", len(prevGraph), len(g))
+        self._combinedGraph = None
         if (prevGraph is None or
             not graphEqual(g, prevGraph, ignorePredicates=[
                 ROOM.signalStrength,
-                ROOM.graphLoadSecs])):
-            log.debug("remote graph changed")
+                ROOM.graphLoadSecs,
+                # perhaps anything with a number-datatype for its
+                # object should be filtered out, and you have to make
+                # an upstream quantization (e.g. 'temp high'/'temp
+                # low') if you want to do reasoning on the difference
+                URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
+                URIRef("http://bigasterisk.com/map#lastSeenAgo"),
+                URIRef("http://projects.bigasterisk.com/room/usingPower"),
+                ])):
+            log.debug("  remote graph changed")
             self.onChange(self)
         else:
-            log.debug("remote graph is unchanged")
+            log.debug("  remote graph is unchanged")
+
+    def addOneShot(self, g):
+        """
+        add this graph to the total, call onChange, and then revert
+        the addition of this graph
+        """
+        self._oneShotAdditionGraph = g
+        try:
+            self.onChange(self)
+        finally:
+            self._oneShotAdditionGraph = None
 
     def getGraph(self):
         """rdflib Graph with the file+remote contents of the input graph"""
-        # use the combined readonly graph view for this?
-        g = Graph()
-        if self._fileGraph:
-            for s in self._fileGraph:
-                g.add(s)
-        if self._remoteGraph:
-            for s in self._remoteGraph:
-                g.add(s)
-        return g
+        # this could be much faster with the combined readonly graph
+        # view from rdflib
+        if self._combinedGraph is None:
+            self._combinedGraph = Graph()
+            if self._fileGraph:
+                for s in self._fileGraph:
+                    self._combinedGraph.add(s)
+            if self._remoteGraph:
+                for s in self._remoteGraph:
+                    self._combinedGraph.add(s)
+            if self._oneShotAdditionGraph:
+                for s in self._oneShotAdditionGraph:
+                    self._combinedGraph.add(s)                
+
+        return self._combinedGraph 
 
 class Reasoning(object):
     def __init__(self):
@@ -138,6 +177,8 @@
         self.inferred = Graph() # gets replaced in each graphChanged call
 
         self.inputGraph = InputGraph([], self.graphChanged)
+        self.inputGraph.updateFileData()
+
 
     def readRules(self):
         self.rulesN3 = open('rules.n3').read() # for web display
@@ -206,42 +247,43 @@
         If the graph doesn't contain any matches, we use (?d
         :zeroValue ?val) for the value and PUT that.
         """
-        return
+        deviceGraph = self.inputGraph.getGraph()
         for dev, pred in [
             # the config of each putUrl should actually be in the
             # context of a dev and predicate pair, and then that would
             # be the source of this list
             (DEV.theaterDoorLock, ROOM.state),
             (URIRef('http://bigasterisk.com/host/bang/monitor'), ROOM.powerState),
+            (URIRef('http://bigasterisk.com/host/dash/monitor'), ROOM.powerState),
             ]:
-            url = self.deviceGraph.value(dev, ROOM.putUrl)
+            url = deviceGraph.value(dev, ROOM.putUrl)
 
             if dev == DEV.theaterDoorLock: # ew
                 restkit.request(url=url+"/mode", method="PUT", body="output")
 
             inferredObjects = list(inferred.objects(dev, pred))
             if len(inferredObjects) == 0:
-                self.putZero(dev, pred, url)
+                self.putZero(deviceGraph, dev, pred, url)
             elif len(inferredObjects) == 1:
-                self.putInferred(dev, pred, url, inferredObjects[0])
+                self.putInferred(deviceGraph, dev, pred, url, inferredObjects[0])
             elif len(inferredObjects) > 1:
                 log.info("conflict, ignoring: %s has %s of %s" %
                          (dev, pred, inferredObjects))
                 # write about it to the inferred graph?
 
-        self.frontDoorPuts(inferred)
+        self.frontDoorPuts(deviceGraph, inferred)
 
-    def putZero(self, dev, pred, putUrl):
+    def putZero(self, deviceGraph, dev, pred, putUrl):
         # zerovalue should be a function of pred as well.
-        value = self.deviceGraph.value(dev, ROOM.zeroValue)
+        value = deviceGraph.value(dev, ROOM.zeroValue)
         if value is not None:
             log.info("put zero (%r) to %s", value, putUrl)
             restkit.request(url=putUrl, method="PUT", body=value)
             # this should be written back into the inferred graph
             # for feedback
 
-    def putInferred(self, dev, pred, putUrl, obj):
-        value = self.deviceGraph.value(obj, ROOM.putValue)
+    def putInferred(self, deviceGraph, dev, pred, putUrl, obj):
+        value = deviceGraph.value(obj, ROOM.putValue)
         if value is not None:
             log.info("put %s to %s", value, putUrl)
             restkit.request(url=putUrl, method="PUT", body=value)
@@ -249,21 +291,27 @@
             log.warn("%s %s %s has no :putValue" %
                      (dev, pred, obj))
 
-    def frontDoorPuts(self, inferred):
+    def frontDoorPuts(self, deviceGraph, inferred):
         # todo: shouldn't have to be a special case
         brt = inferred.value(DEV.frontDoorLcd, ROOM.brightness)
-        url = self.deviceGraph.value(DEV.frontDoorLcdBrightness,
-                                       ROOM.putUrl)
+        if brt is None:
+            return
+        url = deviceGraph.value(DEV.frontDoorLcdBrightness, ROOM.putUrl)
         log.info("put lcd %s brightness %s", url, brt)
-        getPage(str(url) + "?brightness=%s" % str(brt), method="PUT")
+        def failed(err):
+            log.error("lcd brightness: %s", err)
+        getPage(str(url) + "?brightness=%s" % str(brt),
+                method="PUT").addErrback(failed)
 
-        msg = "open %s motion %s" % (inferred.value(DEV['frontDoorOpenIndicator'], ROOM.text),
-                                     inferred.value(DEV['frontDoorMotionIndicator'], ROOM.text))
+        msg = "open %s motion %s" % (
+            inferred.value(DEV['frontDoorOpenIndicator'], ROOM.text),
+            inferred.value(DEV['frontDoorMotionIndicator'], ROOM.text))
         # this was meant to be 2 chars in the bottom row, but the
         # easier test was to replace the whole top msg
         #restkit.Resource("http://slash:9080/").put("lcd", message=msg)
 
 
+
 class Index(cyclone.web.RequestHandler):
     def get(self):
         # make sure GET / fails if our poll loop died
@@ -276,12 +324,46 @@
         self.set_header("Content-Type", "application/xhtml+xml")
         self.write(open('index.html').read())
 
+class ImmediateUpdate(cyclone.web.RequestHandler):
+    def put(self):
+        """
+        request an immediate load of the remote graphs; the thing we
+        do in the background anyway. No payload.
+
+        Using PUT because this is idempotent and retryable and
+        everything.
+        """
+        r.poll()
+        self.set_status(202)
+
+def parseRdf(text, contentType):
+    g = Graph()
+    g.parse(StringInputSource(text), format={
+        'text/n3': 'n3',
+        }[contentType])
+    return g
+
+class OneShot(cyclone.web.RequestHandler):
+    def post(self):
+        """
+        payload is an rdf graph. The statements are momentarily added
+        to the input graph for exactly one update.
+
+        todo: how do we go from a transition like doorclosed-to-open
+        to a oneshot event? the upstream shouldn't have to do it. Do
+        we make those oneshot events here? for every object change?
+        there are probably special cases regarding startup time when
+        everything appears to be a 'change'.
+        """
+        g = parseRdf(self.request.body, self.request.headers['content-type'])
+        self.settings.reasoning.inputGraph.addOneShot(g)
+
 # for reuse
 class GraphResource(cyclone.web.RequestHandler):
     def get(self, which):
         self.set_header("Content-Type", "application/json")
         r = self.settings.reasoning
-        g = {'lastInput': r.prevGraph,
+        g = {'lastInput': r.inputGraph.getGraph(),
              'lastOutput': r.inferred,
              }[which]
         self.write(self.jsonRdf(g))
@@ -293,7 +375,7 @@
     """same as what gets posted above"""
     def get(self):
         r = self.settings.reasoning
-        inputGraphNt = r.prevGraph.serialize(format="nt")
+        inputGraphNt = r.inputGraph.getGraph().serialize(format="nt")
         inferredNt = r.inferred.serialize(format="nt")
         self.set_header("Content-Type", "application/json")
         self.write(json.dumps({"input": inputGraphNt,
@@ -307,13 +389,14 @@
 class Status(cyclone.web.RequestHandler):
     def get(self):
         self.set_header("Content-Type", "text/plain")
-        g = self.settings.reasoning.prevGraph
+        g = self.settings.reasoning.inputGraph.getGraph()
         msg = ""
         for badSource in g.subjects(RDF.type, ROOM['FailedGraphLoad']):
             msg += "GET %s failed (%s). " % (
                 badSource, g.value(badSource, ROOM['graphLoadError']))
         if not msg:
-            self.write("all inputs ok")
+            self.finish("all inputs ok")
+            return
         self.set_status(500)
         self.finish(msg)
 
@@ -325,6 +408,8 @@
     def __init__(self, reasoning):
         handlers = [
             (r"/", Index),
+            (r"/immediateUpdate", ImmediateUpdate),
+            (r"/oneShot", OneShot),
             (r'/(jquery.min.js)', Static),
             (r'/(lastInput|lastOutput)Graph', GraphResource),
             (r'/ntGraphs', NtGraphs),