comparison service/reasoning/reasoning.py @ 851:0d86b3955bcd

rewriting reasoning to use graphs for config Ignore-this: 2bcd9ea1c9fffe2dce123596587ac70a darcs-hash:20121231042638-312f9-d9cc79c24c73e7cdbac3660d6bcf4e347b5707a9
author drewp <drewp@bigasterisk.com>
date Sun, 30 Dec 2012 20:26:38 -0800
parents 887d47682d94
children 0448fbd96a31
comparison
equal deleted inserted replaced
850:887d47682d94 851:0d86b3955bcd
1 #!bin/python 1 #!bin/python
2 """ 2 """
3 Graph consists of:
4 input/* (read at startup)
5 webinput/* (new files are noticed in here)
6 any number of remote graphs, specified in the other graph as objects of (:reasoning, :source, *), reread constantly
7
3 gather subgraphs from various services, run them through a rules 8 gather subgraphs from various services, run them through a rules
4 engine, and make http requests with the conclusions. 9 engine, and make http requests with the conclusions.
5 10
6 E.g. 'when drew's phone is near the house, and someone is awake, 11 E.g. 'when drew's phone is near the house, and someone is awake,
7 unlock the door when the door's motion sensor is activated' 12 unlock the door when the door's motion sensor is activated'
11 """ 16 """
12 17
13 18
14 from twisted.internet import reactor, task 19 from twisted.internet import reactor, task
15 from twisted.web.client import getPage 20 from twisted.web.client import getPage
16 import time, traceback, sys, json 21 import time, traceback, sys, json, logging
17 from rdflib.Graph import Graph, ConjunctiveGraph 22 from rdflib.Graph import Graph, ConjunctiveGraph
18 from rdflib import Namespace, URIRef, Literal, RDF 23 from rdflib import Namespace, URIRef, Literal, RDF
19 import restkit 24 import restkit
20 from FuXi.Rete.RuleStore import N3RuleStore 25 from FuXi.Rete.RuleStore import N3RuleStore
21 import cyclone.web 26 import cyclone.web
22 from inference import addTrig, infer 27 from inference import addTrig, infer
23 28
24 sys.path.append("../../lib") 29 sys.path.append("../../lib")
25 from logsetup import log 30 from logsetup import log
31 log.setLevel(logging.DEBUG)
26 32
27 ROOM = Namespace("http://projects.bigasterisk.com/room/") 33 ROOM = Namespace("http://projects.bigasterisk.com/room/")
28 DEV = Namespace("http://projects.bigasterisk.com/device/") 34 DEV = Namespace("http://projects.bigasterisk.com/device/")
29
30 def gatherGraph():
31 g = ConjunctiveGraph()
32 for source in ["http://bang:9069/graph", # arduino watchpins
33 "http://bang:9070/graph", # wifi usage
34 "http://bang:9075/graph", # env
35 "http://slash:9050/graph", # garageArduino for front motion
36 "http://dash:9095/graph", # dash monitor
37 "http://bang:9095/graph", # bang monitor
38 ]:
39 try:
40 fetchTime = addTrig(g, source)
41 except Exception, e:
42 log.error("adding source %s: %s", source, e)
43 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e))))
44 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
45 else:
46 g.add((URIRef(source), ROOM['graphLoadSecs'], Literal(fetchTime)))
47 return g
48 35
49 def graphWithoutMetadata(g, ignorePredicates=[]): 36 def graphWithoutMetadata(g, ignorePredicates=[]):
50 """ 37 """
51 graph filter that removes any statements whose subjects are 38 graph filter that removes any statements whose subjects are
52 contexts in the graph and also any statements with the given 39 contexts in the graph and also any statements with the given
66 (especially modification times) and also any statements using the 53 (especially modification times) and also any statements using the
67 given predicates 54 given predicates
68 """ 55 """
69 stmtsA = graphWithoutMetadata(a, ignorePredicates) 56 stmtsA = graphWithoutMetadata(a, ignorePredicates)
70 stmtsB = graphWithoutMetadata(b, ignorePredicates) 57 stmtsB = graphWithoutMetadata(b, ignorePredicates)
71 return set(stmtsA) == set(stmtsB) 58 return set(stmtsA) == set(stmtsB)
59
60 class InputGraph(object):
61 def __init__(self, inputDirs, onChange):
62 """
63 all .n3 files from inputDirs will be read.
64
65 onChange(self) is called if the contents of the full graph change
66 (in an interesting way) during updateFileData or
67 updateRemoteData. Interesting means statements other than the
68 ones with the predicates on the boring list.
69 """
70 self.inputDirs = inputDirs
71 self.onChange = onChange
72 self._fileGraph = Graph()
73 self._remoteGraph = None
74 self.updateFileData()
75
76 def updateFileData(self):
77 """
78 make sure we contain the correct data from the files in inputDirs
79 """
80 # this sample one is actually only needed for the output, but I don't
81 # think I want to have a separate graph for the output
82 # handling
83 log.debug("read file graphs")
84 self._fileGraph.parse("/home/drewp/oldplus/home/drewp/projects/room/devices.n3", format="n3")
85 self._fileGraph.parse("/home/drewp/projects/homeauto/service/reasoning/input/startup.n3", format="n3")
86 self.onChange(self)
87
88 def updateRemoteData(self):
89 """
90 read all remote graphs (which are themselves enumerated within
91 the file data)
92 """
93 log.debug("read remote graphs")
94 g = ConjunctiveGraph()
95 for source in self._fileGraph.objects(ROOM['reasoning'],
96 ROOM['source']):
97 try:
98 fetchTime = addTrig(g, source)
99 except Exception, e:
100 log.error("adding source %s: %s", source, e)
101 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e))))
102 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
103 else:
104 g.add((URIRef(source), ROOM['graphLoadSecs'],
105 Literal(fetchTime)))
106 prevGraph = self._remoteGraph
107 self._remoteGraph = g
108 if prevGraph is not None:
109 log.debug("prev %s now %s", len(prevGraph), len(g))
110 if (prevGraph is None or
111 not graphEqual(g, prevGraph, ignorePredicates=[
112 ROOM.signalStrength,
113 ROOM.graphLoadSecs])):
114 log.debug("remote graph changed")
115 self.onChange(self)
116 else:
117 log.debug("remote graph is unchanged")
118
119 def getGraph(self):
120 """rdflib Graph with the file+remote contents of the input graph"""
121 # use the combined readonly graph view for this?
122 g = Graph()
123 if self._fileGraph:
124 for s in self._fileGraph:
125 g.add(s)
126 if self._remoteGraph:
127 for s in self._remoteGraph:
128 g.add(s)
129 return g
72 130
73 class Reasoning(object): 131 class Reasoning(object):
74 def __init__(self): 132 def __init__(self):
75 self.prevGraph = None 133 self.prevGraph = None
76 self.lastPollTime = 0 134 self.lastPollTime = 0
77 self.lastError = "" 135 self.lastError = ""
78 136
79 self.deviceGraph = Graph()
80 self.deviceGraph.parse("/my/proj/room/devices.n3", format="n3")
81
82 self.rulesN3 = "(not read yet)" 137 self.rulesN3 = "(not read yet)"
83 self.inferred = Graph() # gets replaced in each graphChanged call 138 self.inferred = Graph() # gets replaced in each graphChanged call
139
140 self.inputGraph = InputGraph([], self.graphChanged)
84 141
85 def readRules(self): 142 def readRules(self):
86 self.rulesN3 = open('rules.n3').read() # for web display 143 self.rulesN3 = open('rules.n3').read() # for web display
87 self.ruleStore = N3RuleStore() 144 self.ruleStore = N3RuleStore()
88 self.ruleGraph = Graph(self.ruleStore) 145 self.ruleGraph = Graph(self.ruleStore)
89 self.ruleGraph.parse('rules.n3', format='n3') # for inference 146 self.ruleGraph.parse('rules.n3', format='n3') # for inference
90 147
91 def poll(self): 148 def poll(self):
92 try: 149 try:
93 self._poll() 150 self.inputGraph.updateRemoteData()
94 self.lastPollTime = time.time() 151 self.lastPollTime = time.time()
95 except Exception, e: 152 except Exception, e:
96 log.error(traceback.format_exc()) 153 log.error(traceback.format_exc())
97 self.lastError = str(e) 154 self.lastError = str(e)
98 155
99 def _poll(self): 156 def graphChanged(self, inputGraph):
100 g = gatherGraph()
101 if (self.prevGraph is None or
102 not graphEqual(g, self.prevGraph,
103 ignorePredicates=[ROOM.signalStrength])):
104 self.graphChanged(g)
105
106 self.prevGraph = g
107
108 def graphChanged(self, g):
109 # i guess these are getting consumed each inference 157 # i guess these are getting consumed each inference
110 try: 158 try:
111 t1 = time.time() 159 t1 = time.time()
112 self.readRules() 160 self.readRules()
113 ruleParseTime = time.time() - t1 161 ruleParseTime = time.time() - t1
117 self.inferred = Graph() 165 self.inferred = Graph()
118 self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], 166 self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'],
119 Literal(traceback.format_exc()))) 167 Literal(traceback.format_exc())))
120 raise 168 raise
121 169
170 g = inputGraph.getGraph()
122 t1 = time.time() 171 t1 = time.time()
123 self.inferred = infer(g, self.ruleStore) 172 self.inferred = infer(g, self.ruleStore)
124 inferenceTime = time.time() - t1 173 inferenceTime = time.time() - t1
125 174
126 self.inferred.add((ROOM['reasoner'], ROOM['ruleParseTime'], 175 self.inferred.add((ROOM['reasoner'], ROOM['ruleParseTime'],
127 Literal(ruleParseTime))) 176 Literal(ruleParseTime)))
128 self.inferred.add((ROOM['reasoner'], ROOM['inferenceTime'], 177 self.inferred.add((ROOM['reasoner'], ROOM['inferenceTime'],
129 Literal(inferenceTime))) 178 Literal(inferenceTime)))
130 179
131 self.putResults(self.inferred) 180 self.putResults(self.inferred)
132 181
133 try: 182 try:
134 inputGraphNt = g.serialize(format="nt") 183 inputGraphNt = g.serialize(format="nt")
135 inferredNt = self.inferred.serialize(format="nt") 184 inferredNt = self.inferred.serialize(format="nt")
136 body = json.dumps({"input": inputGraphNt, 185 body = json.dumps({"input": inputGraphNt,
137 "inferred": inferredNt}) 186 "inferred": inferredNt})
140 headers={"content-type" : "application/json"}) 189 headers={"content-type" : "application/json"})
141 except Exception, e: 190 except Exception, e:
142 traceback.print_exc() 191 traceback.print_exc()
143 log.error("while sending changes to magma:") 192 log.error("while sending changes to magma:")
144 log.error(e) 193 log.error(e)
145 194
146 195
147 def putResults(self, inferred): 196 def putResults(self, inferred):
148 """ 197 """
149 some conclusions in the inferred graph lead to PUT requests 198 some conclusions in the inferred graph lead to PUT requests
150 getting made 199 getting made
155 PUT ?url <- ?val 204 PUT ?url <- ?val
156 205
157 If the graph doesn't contain any matches, we use (?d 206 If the graph doesn't contain any matches, we use (?d
158 :zeroValue ?val) for the value and PUT that. 207 :zeroValue ?val) for the value and PUT that.
159 """ 208 """
160 209 return
161 for dev, pred in [ 210 for dev, pred in [
162 # the config of each putUrl should actually be in the 211 # the config of each putUrl should actually be in the
163 # context of a dev and predicate pair, and then that would 212 # context of a dev and predicate pair, and then that would
164 # be the source of this list 213 # be the source of this list
165 (DEV.theaterDoorLock, ROOM.state), 214 (DEV.theaterDoorLock, ROOM.state),