Mercurial > code > home > repos > homeauto
comparison service/reasoning/reasoning.py @ 851:0d86b3955bcd
rewriting reasoning to use graphs for config
Ignore-this: 2bcd9ea1c9fffe2dce123596587ac70a
darcs-hash:20121231042638-312f9-d9cc79c24c73e7cdbac3660d6bcf4e347b5707a9
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sun, 30 Dec 2012 20:26:38 -0800 |
parents | 887d47682d94 |
children | 0448fbd96a31 |
comparison
equal
deleted
inserted
replaced
850:887d47682d94 | 851:0d86b3955bcd |
---|---|
1 #!bin/python | 1 #!bin/python |
2 """ | 2 """ |
3 Graph consists of: | |
4 input/* (read at startup) | |
5 webinput/* (new files are noticed in here) | |
6 any number of remote graphs, specified in the other graph as objects of (:reasoning, :source, *), reread constantly | |
7 | |
3 gather subgraphs from various services, run them through a rules | 8 gather subgraphs from various services, run them through a rules |
4 engine, and make http requests with the conclusions. | 9 engine, and make http requests with the conclusions. |
5 | 10 |
6 E.g. 'when drew's phone is near the house, and someone is awake, | 11 E.g. 'when drew's phone is near the house, and someone is awake, |
7 unlock the door when the door's motion sensor is activated' | 12 unlock the door when the door's motion sensor is activated' |
11 """ | 16 """ |
12 | 17 |
13 | 18 |
14 from twisted.internet import reactor, task | 19 from twisted.internet import reactor, task |
15 from twisted.web.client import getPage | 20 from twisted.web.client import getPage |
16 import time, traceback, sys, json | 21 import time, traceback, sys, json, logging |
17 from rdflib.Graph import Graph, ConjunctiveGraph | 22 from rdflib.Graph import Graph, ConjunctiveGraph |
18 from rdflib import Namespace, URIRef, Literal, RDF | 23 from rdflib import Namespace, URIRef, Literal, RDF |
19 import restkit | 24 import restkit |
20 from FuXi.Rete.RuleStore import N3RuleStore | 25 from FuXi.Rete.RuleStore import N3RuleStore |
21 import cyclone.web | 26 import cyclone.web |
22 from inference import addTrig, infer | 27 from inference import addTrig, infer |
23 | 28 |
24 sys.path.append("../../lib") | 29 sys.path.append("../../lib") |
25 from logsetup import log | 30 from logsetup import log |
31 log.setLevel(logging.DEBUG) | |
26 | 32 |
27 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 33 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
28 DEV = Namespace("http://projects.bigasterisk.com/device/") | 34 DEV = Namespace("http://projects.bigasterisk.com/device/") |
29 | |
30 def gatherGraph(): | |
31 g = ConjunctiveGraph() | |
32 for source in ["http://bang:9069/graph", # arduino watchpins | |
33 "http://bang:9070/graph", # wifi usage | |
34 "http://bang:9075/graph", # env | |
35 "http://slash:9050/graph", # garageArduino for front motion | |
36 "http://dash:9095/graph", # dash monitor | |
37 "http://bang:9095/graph", # bang monitor | |
38 ]: | |
39 try: | |
40 fetchTime = addTrig(g, source) | |
41 except Exception, e: | |
42 log.error("adding source %s: %s", source, e) | |
43 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) | |
44 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) | |
45 else: | |
46 g.add((URIRef(source), ROOM['graphLoadSecs'], Literal(fetchTime))) | |
47 return g | |
48 | 35 |
49 def graphWithoutMetadata(g, ignorePredicates=[]): | 36 def graphWithoutMetadata(g, ignorePredicates=[]): |
50 """ | 37 """ |
51 graph filter that removes any statements whose subjects are | 38 graph filter that removes any statements whose subjects are |
52 contexts in the graph and also any statements with the given | 39 contexts in the graph and also any statements with the given |
66 (especially modification times) and also any statements using the | 53 (especially modification times) and also any statements using the |
67 given predicates | 54 given predicates |
68 """ | 55 """ |
69 stmtsA = graphWithoutMetadata(a, ignorePredicates) | 56 stmtsA = graphWithoutMetadata(a, ignorePredicates) |
70 stmtsB = graphWithoutMetadata(b, ignorePredicates) | 57 stmtsB = graphWithoutMetadata(b, ignorePredicates) |
71 return set(stmtsA) == set(stmtsB) | 58 return set(stmtsA) == set(stmtsB) |
59 | |
60 class InputGraph(object): | |
61 def __init__(self, inputDirs, onChange): | |
62 """ | |
63 all .n3 files from inputDirs will be read. | |
64 | |
65 onChange(self) is called if the contents of the full graph change | |
66 (in an interesting way) during updateFileData or | |
67 updateRemoteData. Interesting means statements other than the | |
68 ones with the predicates on the boring list. | |
69 """ | |
70 self.inputDirs = inputDirs | |
71 self.onChange = onChange | |
72 self._fileGraph = Graph() | |
73 self._remoteGraph = None | |
74 self.updateFileData() | |
75 | |
76 def updateFileData(self): | |
77 """ | |
78 make sure we contain the correct data from the files in inputDirs | |
79 """ | |
80 # this sample one is actually only needed for the output, but I don't | |
81 # think I want to have a separate graph for the output | |
82 # handling | |
83 log.debug("read file graphs") | |
84 self._fileGraph.parse("/home/drewp/oldplus/home/drewp/projects/room/devices.n3", format="n3") | |
85 self._fileGraph.parse("/home/drewp/projects/homeauto/service/reasoning/input/startup.n3", format="n3") | |
86 self.onChange(self) | |
87 | |
88 def updateRemoteData(self): | |
89 """ | |
90 read all remote graphs (which are themselves enumerated within | |
91 the file data) | |
92 """ | |
93 log.debug("read remote graphs") | |
94 g = ConjunctiveGraph() | |
95 for source in self._fileGraph.objects(ROOM['reasoning'], | |
96 ROOM['source']): | |
97 try: | |
98 fetchTime = addTrig(g, source) | |
99 except Exception, e: | |
100 log.error("adding source %s: %s", source, e) | |
101 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) | |
102 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) | |
103 else: | |
104 g.add((URIRef(source), ROOM['graphLoadSecs'], | |
105 Literal(fetchTime))) | |
106 prevGraph = self._remoteGraph | |
107 self._remoteGraph = g | |
108 if prevGraph is not None: | |
109 log.debug("prev %s now %s", len(prevGraph), len(g)) | |
110 if (prevGraph is None or | |
111 not graphEqual(g, prevGraph, ignorePredicates=[ | |
112 ROOM.signalStrength, | |
113 ROOM.graphLoadSecs])): | |
114 log.debug("remote graph changed") | |
115 self.onChange(self) | |
116 else: | |
117 log.debug("remote graph is unchanged") | |
118 | |
119 def getGraph(self): | |
120 """rdflib Graph with the file+remote contents of the input graph""" | |
121 # use the combined readonly graph view for this? | |
122 g = Graph() | |
123 if self._fileGraph: | |
124 for s in self._fileGraph: | |
125 g.add(s) | |
126 if self._remoteGraph: | |
127 for s in self._remoteGraph: | |
128 g.add(s) | |
129 return g | |
72 | 130 |
73 class Reasoning(object): | 131 class Reasoning(object): |
74 def __init__(self): | 132 def __init__(self): |
75 self.prevGraph = None | 133 self.prevGraph = None |
76 self.lastPollTime = 0 | 134 self.lastPollTime = 0 |
77 self.lastError = "" | 135 self.lastError = "" |
78 | 136 |
79 self.deviceGraph = Graph() | |
80 self.deviceGraph.parse("/my/proj/room/devices.n3", format="n3") | |
81 | |
82 self.rulesN3 = "(not read yet)" | 137 self.rulesN3 = "(not read yet)" |
83 self.inferred = Graph() # gets replaced in each graphChanged call | 138 self.inferred = Graph() # gets replaced in each graphChanged call |
139 | |
140 self.inputGraph = InputGraph([], self.graphChanged) | |
84 | 141 |
85 def readRules(self): | 142 def readRules(self): |
86 self.rulesN3 = open('rules.n3').read() # for web display | 143 self.rulesN3 = open('rules.n3').read() # for web display |
87 self.ruleStore = N3RuleStore() | 144 self.ruleStore = N3RuleStore() |
88 self.ruleGraph = Graph(self.ruleStore) | 145 self.ruleGraph = Graph(self.ruleStore) |
89 self.ruleGraph.parse('rules.n3', format='n3') # for inference | 146 self.ruleGraph.parse('rules.n3', format='n3') # for inference |
90 | 147 |
91 def poll(self): | 148 def poll(self): |
92 try: | 149 try: |
93 self._poll() | 150 self.inputGraph.updateRemoteData() |
94 self.lastPollTime = time.time() | 151 self.lastPollTime = time.time() |
95 except Exception, e: | 152 except Exception, e: |
96 log.error(traceback.format_exc()) | 153 log.error(traceback.format_exc()) |
97 self.lastError = str(e) | 154 self.lastError = str(e) |
98 | 155 |
99 def _poll(self): | 156 def graphChanged(self, inputGraph): |
100 g = gatherGraph() | |
101 if (self.prevGraph is None or | |
102 not graphEqual(g, self.prevGraph, | |
103 ignorePredicates=[ROOM.signalStrength])): | |
104 self.graphChanged(g) | |
105 | |
106 self.prevGraph = g | |
107 | |
108 def graphChanged(self, g): | |
109 # i guess these are getting consumed each inference | 157 # i guess these are getting consumed each inference |
110 try: | 158 try: |
111 t1 = time.time() | 159 t1 = time.time() |
112 self.readRules() | 160 self.readRules() |
113 ruleParseTime = time.time() - t1 | 161 ruleParseTime = time.time() - t1 |
117 self.inferred = Graph() | 165 self.inferred = Graph() |
118 self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], | 166 self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], |
119 Literal(traceback.format_exc()))) | 167 Literal(traceback.format_exc()))) |
120 raise | 168 raise |
121 | 169 |
170 g = inputGraph.getGraph() | |
122 t1 = time.time() | 171 t1 = time.time() |
123 self.inferred = infer(g, self.ruleStore) | 172 self.inferred = infer(g, self.ruleStore) |
124 inferenceTime = time.time() - t1 | 173 inferenceTime = time.time() - t1 |
125 | 174 |
126 self.inferred.add((ROOM['reasoner'], ROOM['ruleParseTime'], | 175 self.inferred.add((ROOM['reasoner'], ROOM['ruleParseTime'], |
127 Literal(ruleParseTime))) | 176 Literal(ruleParseTime))) |
128 self.inferred.add((ROOM['reasoner'], ROOM['inferenceTime'], | 177 self.inferred.add((ROOM['reasoner'], ROOM['inferenceTime'], |
129 Literal(inferenceTime))) | 178 Literal(inferenceTime))) |
130 | 179 |
131 self.putResults(self.inferred) | 180 self.putResults(self.inferred) |
132 | 181 |
133 try: | 182 try: |
134 inputGraphNt = g.serialize(format="nt") | 183 inputGraphNt = g.serialize(format="nt") |
135 inferredNt = self.inferred.serialize(format="nt") | 184 inferredNt = self.inferred.serialize(format="nt") |
136 body = json.dumps({"input": inputGraphNt, | 185 body = json.dumps({"input": inputGraphNt, |
137 "inferred": inferredNt}) | 186 "inferred": inferredNt}) |
140 headers={"content-type" : "application/json"}) | 189 headers={"content-type" : "application/json"}) |
141 except Exception, e: | 190 except Exception, e: |
142 traceback.print_exc() | 191 traceback.print_exc() |
143 log.error("while sending changes to magma:") | 192 log.error("while sending changes to magma:") |
144 log.error(e) | 193 log.error(e) |
145 | 194 |
146 | 195 |
147 def putResults(self, inferred): | 196 def putResults(self, inferred): |
148 """ | 197 """ |
149 some conclusions in the inferred graph lead to PUT requests | 198 some conclusions in the inferred graph lead to PUT requests |
150 getting made | 199 getting made |
155 PUT ?url <- ?val | 204 PUT ?url <- ?val |
156 | 205 |
157 If the graph doesn't contain any matches, we use (?d | 206 If the graph doesn't contain any matches, we use (?d |
158 :zeroValue ?val) for the value and PUT that. | 207 :zeroValue ?val) for the value and PUT that. |
159 """ | 208 """ |
160 | 209 return |
161 for dev, pred in [ | 210 for dev, pred in [ |
162 # the config of each putUrl should actually be in the | 211 # the config of each putUrl should actually be in the |
163 # context of a dev and predicate pair, and then that would | 212 # context of a dev and predicate pair, and then that would |
164 # be the source of this list | 213 # be the source of this list |
165 (DEV.theaterDoorLock, ROOM.state), | 214 (DEV.theaterDoorLock, ROOM.state), |