Mercurial > code > home > repos > homeauto
changeset 46:f5623d9b07fd
rewriting reasoning to use graphs for config
Ignore-this: 2bcd9ea1c9fffe2dce123596587ac70a
author | drewp@bigasterisk.com |
---|---|
date | Sun, 30 Dec 2012 20:26:38 -0800 |
parents | 5b0f970e3d52 |
children | 0448fbd96a31 |
files | service/reasoning/input/startup.n3 service/reasoning/reasoning.py |
diffstat | 2 files changed, 97 insertions(+), 38 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/input/startup.n3 Sun Dec 30 20:26:38 2012 -0800 @@ -0,0 +1,10 @@ +@prefix : <http://projects.bigasterisk.com/room/> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . + +<http://bang:9069/graph> is :source of :reasoning; rdfs:label "arduino watchpins" . +<http://bang:9070/graph> is :source of :reasoning; rdfs:label "wifi usage" . +<http://bang:9075/graph> is :source of :reasoning; rdfs:label "env" . +<http://slash:9050/graph> is :source of :reasoning; rdfs:label "garageArduino for front motion" . +<http://dash:9095/graph> is :source of :reasoning; rdfs:label "dash monitor" . +<http://bang:9095/graph> is :source of :reasoning; rdfs:label "bang monitor" . +<http://bang:9099/graph> is :source of :reasoning; rdfs:label "trails" .
--- a/service/reasoning/reasoning.py Sun Dec 30 03:02:25 2012 -0800 +++ b/service/reasoning/reasoning.py Sun Dec 30 20:26:38 2012 -0800 @@ -1,5 +1,10 @@ #!bin/python """ +Graph consists of: + input/* (read at startup) + webinput/* (new files are noticed in here) + any number of remote graphs, specified in the other graph as objects of (:reasoning, :source, *), reread constantly + gather subgraphs from various services, run them through a rules engine, and make http requests with the conclusions. @@ -13,7 +18,7 @@ from twisted.internet import reactor, task from twisted.web.client import getPage -import time, traceback, sys, json +import time, traceback, sys, json, logging from rdflib.Graph import Graph, ConjunctiveGraph from rdflib import Namespace, URIRef, Literal, RDF import restkit @@ -23,29 +28,11 @@ sys.path.append("../../lib") from logsetup import log +log.setLevel(logging.DEBUG) ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") -def gatherGraph(): - g = ConjunctiveGraph() - for source in ["http://bang:9069/graph", # arduino watchpins - "http://bang:9070/graph", # wifi usage - "http://bang:9075/graph", # env - "http://slash:9050/graph", # garageArduino for front motion - "http://dash:9095/graph", # dash monitor - "http://bang:9095/graph", # bang monitor - ]: - try: - fetchTime = addTrig(g, source) - except Exception, e: - log.error("adding source %s: %s", source, e) - g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) - g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) - else: - g.add((URIRef(source), ROOM['graphLoadSecs'], Literal(fetchTime))) - return g - def graphWithoutMetadata(g, ignorePredicates=[]): """ graph filter that removes any statements whose subjects are @@ -68,7 +55,78 @@ """ stmtsA = graphWithoutMetadata(a, ignorePredicates) stmtsB = graphWithoutMetadata(b, ignorePredicates) - return set(stmtsA) == set(stmtsB) + return set(stmtsA) == set(stmtsB) + +class InputGraph(object): + def __init__(self, inputDirs, onChange): + """ + all .n3 files from inputDirs will be read. + + onChange(self) is called if the contents of the full graph change + (in an interesting way) during updateFileData or + updateRemoteData. Interesting means statements other than the + ones with the predicates on the boring list. + """ + self.inputDirs = inputDirs + self.onChange = onChange + self._fileGraph = Graph() + self._remoteGraph = None + self.updateFileData() + + def updateFileData(self): + """ + make sure we contain the correct data from the files in inputDirs + """ + # this sample one is actually only needed for the output, but I don't + # think I want to have a separate graph for the output + # handling + log.debug("read file graphs") + self._fileGraph.parse("/home/drewp/oldplus/home/drewp/projects/room/devices.n3", format="n3") + self._fileGraph.parse("/home/drewp/projects/homeauto/service/reasoning/input/startup.n3", format="n3") + self.onChange(self) + + def updateRemoteData(self): + """ + read all remote graphs (which are themselves enumerated within + the file data) + """ + log.debug("read remote graphs") + g = ConjunctiveGraph() + for source in self._fileGraph.objects(ROOM['reasoning'], + ROOM['source']): + try: + fetchTime = addTrig(g, source) + except Exception, e: + log.error("adding source %s: %s", source, e) + g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) + g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) + else: + g.add((URIRef(source), ROOM['graphLoadSecs'], + Literal(fetchTime))) + prevGraph = self._remoteGraph + self._remoteGraph = g + if prevGraph is not None: + log.debug("prev %s now %s", len(prevGraph), len(g)) + if (prevGraph is None or + not graphEqual(g, prevGraph, ignorePredicates=[ + ROOM.signalStrength, + ROOM.graphLoadSecs])): + log.debug("remote graph changed") + self.onChange(self) + else: + log.debug("remote graph is unchanged") + + def getGraph(self): + """rdflib Graph with the file+remote contents of the input graph""" + # use the combined readonly graph view for this? + g = Graph() + if self._fileGraph: + for s in self._fileGraph: + g.add(s) + if self._remoteGraph: + for s in self._remoteGraph: + g.add(s) + return g class Reasoning(object): def __init__(self): @@ -76,12 +134,11 @@ self.lastPollTime = 0 self.lastError = "" - self.deviceGraph = Graph() - self.deviceGraph.parse("/my/proj/room/devices.n3", format="n3") - self.rulesN3 = "(not read yet)" self.inferred = Graph() # gets replaced in each graphChanged call + self.inputGraph = InputGraph([], self.graphChanged) + def readRules(self): self.rulesN3 = open('rules.n3').read() # for web display self.ruleStore = N3RuleStore() @@ -90,22 +147,13 @@ def poll(self): try: - self._poll() + self.inputGraph.updateRemoteData() self.lastPollTime = time.time() except Exception, e: log.error(traceback.format_exc()) self.lastError = str(e) - def _poll(self): - g = gatherGraph() - if (self.prevGraph is None or - not graphEqual(g, self.prevGraph, - ignorePredicates=[ROOM.signalStrength])): - self.graphChanged(g) - - self.prevGraph = g - - def graphChanged(self, g): + def graphChanged(self, inputGraph): # i guess these are getting consumed each inference try: t1 = time.time() @@ -119,6 +167,7 @@ Literal(traceback.format_exc()))) raise + g = inputGraph.getGraph() t1 = time.time() self.inferred = infer(g, self.ruleStore) inferenceTime = time.time() - t1 @@ -129,7 +178,7 @@ Literal(inferenceTime))) self.putResults(self.inferred) - + try: inputGraphNt = g.serialize(format="nt") inferredNt = self.inferred.serialize(format="nt") @@ -142,7 +191,7 @@ traceback.print_exc() log.error("while sending changes to magma:") log.error(e) - + def putResults(self, inferred): """ @@ -157,7 +206,7 @@ If the graph doesn't contain any matches, we use (?d :zeroValue ?val) for the value and PUT that. """ - + return for dev, pred in [ # the config of each putUrl should actually be in the # context of a dev and predicate pair, and then that would