Mercurial > code > home > repos > homeauto
comparison service/reasoning/reasoning.py @ 1054:bbaf0576f653
fetch all source graphs in parallel
Ignore-this: 666d61fa9c69f78846987e0ccea750d4
darcs-hash:fee0fc60a6fd66102d3a2b73bf980b393e1349c6
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 09 Feb 2016 22:01:19 -0800 |
parents | a328cc370b22 |
children | c1287ab87add |
comparison
equal
deleted
inserted
replaced
1053:e8648fdff873 | 1054:bbaf0576f653 |
---|---|
15 with PSHB, that their graph has changed. | 15 with PSHB, that their graph has changed. |
16 """ | 16 """ |
17 | 17 |
18 | 18 |
19 from twisted.internet import reactor, task | 19 from twisted.internet import reactor, task |
20 from twisted.internet.defer import inlineCallbacks, gatherResults | |
20 from twisted.python.filepath import FilePath | 21 from twisted.python.filepath import FilePath |
21 import time, traceback, sys, json, logging, urllib | 22 import time, traceback, sys, json, logging, urllib |
22 from rdflib import Graph, ConjunctiveGraph | 23 from rdflib import Graph, ConjunctiveGraph |
23 from rdflib import Namespace, URIRef, Literal, RDF | 24 from rdflib import Namespace, URIRef, Literal, RDF |
24 from rdflib.parser import StringInputSource | 25 from rdflib.parser import StringInputSource |
83 self._fileGraph.parse(fp.open(), format="n3") | 84 self._fileGraph.parse(fp.open(), format="n3") |
84 self._combinedGraph = None | 85 self._combinedGraph = None |
85 | 86 |
86 self.onChange(self) | 87 self.onChange(self) |
87 | 88 |
89 @inlineCallbacks | |
88 def updateRemoteData(self): | 90 def updateRemoteData(self): |
89 """ | 91 """ |
90 read all remote graphs (which are themselves enumerated within | 92 read all remote graphs (which are themselves enumerated within |
91 the file data) | 93 the file data) |
92 """ | 94 """ |
95 t1 = time.time() | |
93 log.debug("read remote graphs") | 96 log.debug("read remote graphs") |
94 g = ConjunctiveGraph() | 97 g = ConjunctiveGraph() |
95 for source in self._fileGraph.objects(ROOM['reasoning'], | 98 |
96 ROOM['source']): | 99 @inlineCallbacks |
100 def fetchOne(source): | |
97 try: | 101 try: |
98 # this part could be parallelized | 102 # this part could be parallelized |
99 fetchTime = addTrig(g, source) | 103 fetchTime = yield addTrig(g, source) |
100 except Exception, e: | 104 except Exception, e: |
101 log.error(" adding source %s: %s", source, e) | 105 log.error(" can't add source %s: %s", source, e) |
102 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) | 106 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) |
103 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) | 107 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) |
104 else: | 108 else: |
105 g.add((URIRef(source), ROOM['graphLoadMs'], | 109 g.add((URIRef(source), ROOM['graphLoadMs'], |
106 Literal(round(fetchTime * 1000, 1)))) | 110 Literal(round(fetchTime * 1000, 1)))) |
111 | |
112 fetchDone = [] | |
113 for source in self._fileGraph.objects(ROOM['reasoning'], | |
114 ROOM['source']): | |
115 fetchDone.append(fetchOne(source)) | |
116 yield gatherResults(fetchDone, consumeErrors=True) | |
117 log.debug("loaded all in %.1f ms", 1000 * (time.time() - t1)) | |
118 | |
107 prevGraph = self._remoteGraph | 119 prevGraph = self._remoteGraph |
108 self._remoteGraph = g | 120 self._remoteGraph = g |
109 self._combinedGraph = None | 121 self._combinedGraph = None |
110 if (prevGraph is None or | 122 if (prevGraph is None or |
111 not graphEqual(g, prevGraph, ignorePredicates=[ | 123 not graphEqual(g, prevGraph, ignorePredicates=[ |
112 ROOM.signalStrength, | 124 ROOM['signalStrength'], |
113 ROOM.graphLoadSecs, | |
114 # perhaps anything with a number-datatype for its | 125 # perhaps anything with a number-datatype for its |
115 # object should be filtered out, and you have to make | 126 # object should be filtered out, and you have to make |
116 # an upstream quantization (e.g. 'temp high'/'temp | 127 # an upstream quantization (e.g. 'temp high'/'temp |
117 # low') if you want to do reasoning on the difference | 128 # low') if you want to do reasoning on the difference |
118 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), | 129 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), |
119 URIRef("http://bigasterisk.com/map#lastSeenAgo"), | 130 URIRef("http://bigasterisk.com/map#lastSeenAgo"), |
120 URIRef("http://projects.bigasterisk.com/room/usingPower"), | 131 ROOM['usingPower'], |
121 URIRef("http://projects.bigasterisk.com/room/idleTimeMinutes"), | 132 ROOM['idleTimeMinutes'], |
122 URIRef("http://projects.bigasterisk.com/room/idleTimeMs"), | 133 ROOM['idleTimeMs'], |
123 ROOM.history, | 134 ROOM['graphLoadMs'], |
135 ROOM['localTimeToSecond'], | |
136 ROOM['history'], | |
137 ROOM['temperatureF'], | |
138 ROOM['connectedAgo'], | |
124 ])): | 139 ])): |
125 log.debug(" remote graph changed") | 140 log.debug(" remote graph changed") |
126 self.onChange(self) | 141 self.onChange(self) |
127 else: | 142 else: |
128 log.debug(" remote graph is unchanged") | 143 log.debug(" remote graph has no changes to trigger rules") |
129 | 144 |
130 def addOneShot(self, g): | 145 def addOneShot(self, g): |
131 """ | 146 """ |
132 add this graph to the total, call onChange, and then revert | 147 add this graph to the total, call onChange, and then revert |
133 the addition of this graph | 148 the addition of this graph |
178 self.rulesN3 = open('rules.n3').read() # for web display | 193 self.rulesN3 = open('rules.n3').read() # for web display |
179 self.ruleStore = N3RuleStore() | 194 self.ruleStore = N3RuleStore() |
180 self.ruleGraph = Graph(self.ruleStore) | 195 self.ruleGraph = Graph(self.ruleStore) |
181 self.ruleGraph.parse('rules.n3', format='n3') # for inference | 196 self.ruleGraph.parse('rules.n3', format='n3') # for inference |
182 | 197 |
198 @inlineCallbacks | |
183 def poll(self): | 199 def poll(self): |
184 try: | 200 try: |
185 self.inputGraph.updateRemoteData() | 201 yield self.inputGraph.updateRemoteData() |
186 self.lastPollTime = time.time() | 202 self.lastPollTime = time.time() |
187 except Exception, e: | 203 except Exception, e: |
188 log.error(traceback.format_exc()) | 204 log.error(traceback.format_exc()) |
189 self.lastError = str(e) | 205 self.lastError = str(e) |
190 | 206 |
252 return | 268 return |
253 self.set_header("Content-Type", "text/html") | 269 self.set_header("Content-Type", "text/html") |
254 self.write(open('index.html').read()) | 270 self.write(open('index.html').read()) |
255 | 271 |
256 class ImmediateUpdate(cyclone.web.RequestHandler): | 272 class ImmediateUpdate(cyclone.web.RequestHandler): |
273 @inlineCallbacks | |
257 def put(self): | 274 def put(self): |
258 """ | 275 """ |
259 request an immediate load of the remote graphs; the thing we | 276 request an immediate load of the remote graphs; the thing we |
260 do in the background anyway. No payload. | 277 do in the background anyway. No payload. |
261 | 278 |
266 in very quickly | 283 in very quickly |
267 """ | 284 """ |
268 print self.request.headers | 285 print self.request.headers |
269 log.info("immediateUpdate from %s", | 286 log.info("immediateUpdate from %s", |
270 self.request.headers.get('User-Agent', '?')) | 287 self.request.headers.get('User-Agent', '?')) |
271 r.poll() | 288 yield r.poll() |
272 self.set_status(202) | 289 self.set_status(202) |
273 | 290 |
274 def parseRdf(text, contentType): | 291 def parseRdf(text, contentType): |
275 g = Graph() | 292 g = Graph() |
276 g.parse(StringInputSource(text), format={ | 293 g.parse(StringInputSource(text), format={ |
277 'text/n3': 'n3', | 294 'text/n3': ['n3'], |
278 }[contentType]) | 295 }[contentType]) |
279 return g | 296 return g |
280 | 297 |
281 class OneShot(cyclone.web.RequestHandler): | 298 class OneShot(cyclone.web.RequestHandler): |
282 def post(self): | 299 def post(self): |
385 if __name__ == '__main__': | 402 if __name__ == '__main__': |
386 | 403 |
387 arg = docopt(""" | 404 arg = docopt(""" |
388 Usage: reasoning.py [options] | 405 Usage: reasoning.py [options] |
389 | 406 |
390 -v Verbose | 407 -v Verbose (and slow updates) |
391 """) | 408 """) |
392 | 409 |
393 r = Reasoning() | 410 r = Reasoning() |
394 if arg['-v']: | 411 if arg['-v']: |
395 from colorlog import ColoredFormatter | 412 from colorlog import ColoredFormatter |
410 import twisted.python.log | 427 import twisted.python.log |
411 twisted.python.log.startLogging(sys.stdout) | 428 twisted.python.log.startLogging(sys.stdout) |
412 log.setLevel(logging.DEBUG) | 429 log.setLevel(logging.DEBUG) |
413 outlog.setLevel(logging.DEBUG) | 430 outlog.setLevel(logging.DEBUG) |
414 | 431 |
415 task.LoopingCall(r.poll).start(1.0) | 432 task.LoopingCall(r.poll).start(1.0 if not arg['-v'] else 10) |
416 reactor.listenTCP(9071, Application(r), interface='::') | 433 reactor.listenTCP(9071, Application(r), interface='::') |
417 reactor.run() | 434 reactor.run() |