changeset 46:f5623d9b07fd

rewriting reasoning to use graphs for config Ignore-this: 2bcd9ea1c9fffe2dce123596587ac70a
author drewp@bigasterisk.com
date Sun, 30 Dec 2012 20:26:38 -0800
parents 5b0f970e3d52
children 0448fbd96a31
files service/reasoning/input/startup.n3 service/reasoning/reasoning.py
diffstat 2 files changed, 97 insertions(+), 38 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/input/startup.n3	Sun Dec 30 20:26:38 2012 -0800
@@ -0,0 +1,10 @@
+@prefix : <http://projects.bigasterisk.com/room/> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+
+<http://bang:9069/graph> is :source of :reasoning; rdfs:label "arduino watchpins" .
+<http://bang:9070/graph> is :source of :reasoning;  rdfs:label "wifi usage" .
+<http://bang:9075/graph> is :source of :reasoning;  rdfs:label "env" .
+<http://slash:9050/graph> is :source of :reasoning;  rdfs:label "garageArduino for front motion" .
+<http://dash:9095/graph> is :source of :reasoning;  rdfs:label "dash monitor" .
+<http://bang:9095/graph> is :source of :reasoning;  rdfs:label "bang monitor" .
+<http://bang:9099/graph> is :source of :reasoning;  rdfs:label "trails" .
--- a/service/reasoning/reasoning.py	Sun Dec 30 03:02:25 2012 -0800
+++ b/service/reasoning/reasoning.py	Sun Dec 30 20:26:38 2012 -0800
@@ -1,5 +1,10 @@
 #!bin/python
 """
+Graph consists of:
+  input/* (read at startup)
+  webinput/* (new files are noticed in here)
+  any number of remote graphs, specified in the other graph as objects of (:reasoning, :source, *), reread constantly
+
 gather subgraphs from various services, run them through a rules
 engine, and make http requests with the conclusions.
 
@@ -13,7 +18,7 @@
 
 from twisted.internet import reactor, task
 from twisted.web.client import getPage
-import time, traceback, sys, json
+import time, traceback, sys, json, logging
 from rdflib.Graph import Graph, ConjunctiveGraph
 from rdflib import Namespace, URIRef, Literal, RDF
 import restkit
@@ -23,29 +28,11 @@
 
 sys.path.append("../../lib")
 from logsetup import log
+log.setLevel(logging.DEBUG)
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 
-def gatherGraph():
-    g = ConjunctiveGraph()
-    for source in ["http://bang:9069/graph", # arduino watchpins 
-                   "http://bang:9070/graph", # wifi usage
-                   "http://bang:9075/graph", # env
-                   "http://slash:9050/graph", # garageArduino for front motion
-                   "http://dash:9095/graph", # dash monitor
-                   "http://bang:9095/graph", # bang monitor
-                   ]:
-        try:
-            fetchTime = addTrig(g, source)
-        except Exception, e:
-            log.error("adding source %s: %s", source, e)
-            g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e))))
-            g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
-        else:
-            g.add((URIRef(source), ROOM['graphLoadSecs'], Literal(fetchTime)))
-    return g
-
 def graphWithoutMetadata(g, ignorePredicates=[]):
     """
     graph filter that removes any statements whose subjects are
@@ -68,7 +55,78 @@
     """
     stmtsA = graphWithoutMetadata(a, ignorePredicates)
     stmtsB = graphWithoutMetadata(b, ignorePredicates)
-    return set(stmtsA) == set(stmtsB)    
+    return set(stmtsA) == set(stmtsB)
+
+class InputGraph(object):
+    def __init__(self, inputDirs, onChange):
+        """
+        all .n3 files from inputDirs will be read.
+
+        onChange(self) is called if the contents of the full graph change
+        (in an interesting way) during updateFileData or
+        updateRemoteData. Interesting means statements other than the
+        ones with the predicates on the boring list.
+        """
+        self.inputDirs = inputDirs
+        self.onChange = onChange
+        self._fileGraph = Graph()
+        self._remoteGraph = None
+        self.updateFileData()
+
+    def updateFileData(self):
+        """
+        make sure we contain the correct data from the files in inputDirs
+        """
+        # this sample one is actually only needed for the output, but I don't
+        # think I want to have a separate graph for the output
+        # handling
+        log.debug("read file graphs")
+        self._fileGraph.parse("/home/drewp/oldplus/home/drewp/projects/room/devices.n3", format="n3")
+        self._fileGraph.parse("/home/drewp/projects/homeauto/service/reasoning/input/startup.n3", format="n3")
+        self.onChange(self)
+
+    def updateRemoteData(self):
+        """
+        read all remote graphs (which are themselves enumerated within
+        the file data)
+        """
+        log.debug("read remote graphs")
+        g = ConjunctiveGraph()
+        for source in self._fileGraph.objects(ROOM['reasoning'],
+                                              ROOM['source']):
+            try:
+                fetchTime = addTrig(g, source)
+            except Exception, e:
+                log.error("adding source %s: %s", source, e)
+                g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e))))
+                g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
+            else:
+                g.add((URIRef(source), ROOM['graphLoadSecs'],
+                       Literal(fetchTime)))
+        prevGraph = self._remoteGraph
+        self._remoteGraph = g
+        if prevGraph is not None:
+            log.debug("prev %s now %s", len(prevGraph), len(g))
+        if (prevGraph is None or
+            not graphEqual(g, prevGraph, ignorePredicates=[
+                ROOM.signalStrength,
+                ROOM.graphLoadSecs])):
+            log.debug("remote graph changed")
+            self.onChange(self)
+        else:
+            log.debug("remote graph is unchanged")
+
+    def getGraph(self):
+        """rdflib Graph with the file+remote contents of the input graph"""
+        # use the combined readonly graph view for this?
+        g = Graph()
+        if self._fileGraph:
+            for s in self._fileGraph:
+                g.add(s)
+        if self._remoteGraph:
+            for s in self._remoteGraph:
+                g.add(s)
+        return g
 
 class Reasoning(object):
     def __init__(self):
@@ -76,12 +134,11 @@
         self.lastPollTime = 0
         self.lastError = ""
 
-        self.deviceGraph = Graph()
-        self.deviceGraph.parse("/my/proj/room/devices.n3", format="n3")
-
         self.rulesN3 = "(not read yet)"
         self.inferred = Graph() # gets replaced in each graphChanged call
 
+        self.inputGraph = InputGraph([], self.graphChanged)
+
     def readRules(self):
         self.rulesN3 = open('rules.n3').read() # for web display
         self.ruleStore = N3RuleStore()
@@ -90,22 +147,13 @@
 
     def poll(self):
         try:
-            self._poll()
+            self.inputGraph.updateRemoteData()
             self.lastPollTime = time.time()
         except Exception, e:
             log.error(traceback.format_exc())
             self.lastError = str(e)
 
-    def _poll(self):
-        g = gatherGraph()
-        if (self.prevGraph is None or
-            not graphEqual(g, self.prevGraph,
-                           ignorePredicates=[ROOM.signalStrength])):
-            self.graphChanged(g)
-
-        self.prevGraph = g
-
-    def graphChanged(self, g):
+    def graphChanged(self, inputGraph):
         # i guess these are getting consumed each inference
         try:
             t1 = time.time()
@@ -119,6 +167,7 @@
                                Literal(traceback.format_exc())))
             raise
 
+        g = inputGraph.getGraph()
         t1 = time.time()
         self.inferred = infer(g, self.ruleStore)
         inferenceTime = time.time() - t1
@@ -129,7 +178,7 @@
                            Literal(inferenceTime)))
 
         self.putResults(self.inferred)
-        
+
         try:
             inputGraphNt = g.serialize(format="nt")
             inferredNt = self.inferred.serialize(format="nt")
@@ -142,7 +191,7 @@
             traceback.print_exc()
             log.error("while sending changes to magma:")
             log.error(e)
-            
+
 
     def putResults(self, inferred):
         """
@@ -157,7 +206,7 @@
         If the graph doesn't contain any matches, we use (?d
         :zeroValue ?val) for the value and PUT that.
         """
-
+        return
         for dev, pred in [
             # the config of each putUrl should actually be in the
             # context of a dev and predicate pair, and then that would