comparison service/reasoning/reasoning.py @ 1054:bbaf0576f653

fetch all source graphs in parallel Ignore-this: 666d61fa9c69f78846987e0ccea750d4 darcs-hash:fee0fc60a6fd66102d3a2b73bf980b393e1349c6
author drewp <drewp@bigasterisk.com>
date Tue, 09 Feb 2016 22:01:19 -0800
parents a328cc370b22
children c1287ab87add
comparison
equal deleted inserted replaced
1053:e8648fdff873 1054:bbaf0576f653
15 with PSHB, that their graph has changed. 15 with PSHB, that their graph has changed.
16 """ 16 """
17 17
18 18
19 from twisted.internet import reactor, task 19 from twisted.internet import reactor, task
20 from twisted.internet.defer import inlineCallbacks, gatherResults
20 from twisted.python.filepath import FilePath 21 from twisted.python.filepath import FilePath
21 import time, traceback, sys, json, logging, urllib 22 import time, traceback, sys, json, logging, urllib
22 from rdflib import Graph, ConjunctiveGraph 23 from rdflib import Graph, ConjunctiveGraph
23 from rdflib import Namespace, URIRef, Literal, RDF 24 from rdflib import Namespace, URIRef, Literal, RDF
24 from rdflib.parser import StringInputSource 25 from rdflib.parser import StringInputSource
83 self._fileGraph.parse(fp.open(), format="n3") 84 self._fileGraph.parse(fp.open(), format="n3")
84 self._combinedGraph = None 85 self._combinedGraph = None
85 86
86 self.onChange(self) 87 self.onChange(self)
87 88
89 @inlineCallbacks
88 def updateRemoteData(self): 90 def updateRemoteData(self):
89 """ 91 """
90 read all remote graphs (which are themselves enumerated within 92 read all remote graphs (which are themselves enumerated within
91 the file data) 93 the file data)
92 """ 94 """
95 t1 = time.time()
93 log.debug("read remote graphs") 96 log.debug("read remote graphs")
94 g = ConjunctiveGraph() 97 g = ConjunctiveGraph()
95 for source in self._fileGraph.objects(ROOM['reasoning'], 98
96 ROOM['source']): 99 @inlineCallbacks
100 def fetchOne(source):
97 try: 101 try:
98 # this part could be parallelized 102 # this part could be parallelized
99 fetchTime = addTrig(g, source) 103 fetchTime = yield addTrig(g, source)
100 except Exception, e: 104 except Exception, e:
101 log.error(" adding source %s: %s", source, e) 105 log.error(" can't add source %s: %s", source, e)
102 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) 106 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e))))
103 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) 107 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
104 else: 108 else:
105 g.add((URIRef(source), ROOM['graphLoadMs'], 109 g.add((URIRef(source), ROOM['graphLoadMs'],
106 Literal(round(fetchTime * 1000, 1)))) 110 Literal(round(fetchTime * 1000, 1))))
111
112 fetchDone = []
113 for source in self._fileGraph.objects(ROOM['reasoning'],
114 ROOM['source']):
115 fetchDone.append(fetchOne(source))
116 yield gatherResults(fetchDone, consumeErrors=True)
117 log.debug("loaded all in %.1f ms", 1000 * (time.time() - t1))
118
107 prevGraph = self._remoteGraph 119 prevGraph = self._remoteGraph
108 self._remoteGraph = g 120 self._remoteGraph = g
109 self._combinedGraph = None 121 self._combinedGraph = None
110 if (prevGraph is None or 122 if (prevGraph is None or
111 not graphEqual(g, prevGraph, ignorePredicates=[ 123 not graphEqual(g, prevGraph, ignorePredicates=[
112 ROOM.signalStrength, 124 ROOM['signalStrength'],
113 ROOM.graphLoadSecs,
114 # perhaps anything with a number-datatype for its 125 # perhaps anything with a number-datatype for its
115 # object should be filtered out, and you have to make 126 # object should be filtered out, and you have to make
116 # an upstream quantization (e.g. 'temp high'/'temp 127 # an upstream quantization (e.g. 'temp high'/'temp
117 # low') if you want to do reasoning on the difference 128 # low') if you want to do reasoning on the difference
118 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), 129 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
119 URIRef("http://bigasterisk.com/map#lastSeenAgo"), 130 URIRef("http://bigasterisk.com/map#lastSeenAgo"),
120 URIRef("http://projects.bigasterisk.com/room/usingPower"), 131 ROOM['usingPower'],
121 URIRef("http://projects.bigasterisk.com/room/idleTimeMinutes"), 132 ROOM['idleTimeMinutes'],
122 URIRef("http://projects.bigasterisk.com/room/idleTimeMs"), 133 ROOM['idleTimeMs'],
123 ROOM.history, 134 ROOM['graphLoadMs'],
135 ROOM['localTimeToSecond'],
136 ROOM['history'],
137 ROOM['temperatureF'],
138 ROOM['connectedAgo'],
124 ])): 139 ])):
125 log.debug(" remote graph changed") 140 log.debug(" remote graph changed")
126 self.onChange(self) 141 self.onChange(self)
127 else: 142 else:
128 log.debug(" remote graph is unchanged") 143 log.debug(" remote graph has no changes to trigger rules")
129 144
130 def addOneShot(self, g): 145 def addOneShot(self, g):
131 """ 146 """
132 add this graph to the total, call onChange, and then revert 147 add this graph to the total, call onChange, and then revert
133 the addition of this graph 148 the addition of this graph
178 self.rulesN3 = open('rules.n3').read() # for web display 193 self.rulesN3 = open('rules.n3').read() # for web display
179 self.ruleStore = N3RuleStore() 194 self.ruleStore = N3RuleStore()
180 self.ruleGraph = Graph(self.ruleStore) 195 self.ruleGraph = Graph(self.ruleStore)
181 self.ruleGraph.parse('rules.n3', format='n3') # for inference 196 self.ruleGraph.parse('rules.n3', format='n3') # for inference
182 197
198 @inlineCallbacks
183 def poll(self): 199 def poll(self):
184 try: 200 try:
185 self.inputGraph.updateRemoteData() 201 yield self.inputGraph.updateRemoteData()
186 self.lastPollTime = time.time() 202 self.lastPollTime = time.time()
187 except Exception, e: 203 except Exception, e:
188 log.error(traceback.format_exc()) 204 log.error(traceback.format_exc())
189 self.lastError = str(e) 205 self.lastError = str(e)
190 206
252 return 268 return
253 self.set_header("Content-Type", "text/html") 269 self.set_header("Content-Type", "text/html")
254 self.write(open('index.html').read()) 270 self.write(open('index.html').read())
255 271
256 class ImmediateUpdate(cyclone.web.RequestHandler): 272 class ImmediateUpdate(cyclone.web.RequestHandler):
273 @inlineCallbacks
257 def put(self): 274 def put(self):
258 """ 275 """
259 request an immediate load of the remote graphs; the thing we 276 request an immediate load of the remote graphs; the thing we
260 do in the background anyway. No payload. 277 do in the background anyway. No payload.
261 278
266 in very quickly 283 in very quickly
267 """ 284 """
268 print self.request.headers 285 print self.request.headers
269 log.info("immediateUpdate from %s", 286 log.info("immediateUpdate from %s",
270 self.request.headers.get('User-Agent', '?')) 287 self.request.headers.get('User-Agent', '?'))
271 r.poll() 288 yield r.poll()
272 self.set_status(202) 289 self.set_status(202)
273 290
274 def parseRdf(text, contentType): 291 def parseRdf(text, contentType):
275 g = Graph() 292 g = Graph()
276 g.parse(StringInputSource(text), format={ 293 g.parse(StringInputSource(text), format={
277 'text/n3': 'n3', 294 'text/n3': ['n3'],
278 }[contentType]) 295 }[contentType])
279 return g 296 return g
280 297
281 class OneShot(cyclone.web.RequestHandler): 298 class OneShot(cyclone.web.RequestHandler):
282 def post(self): 299 def post(self):
385 if __name__ == '__main__': 402 if __name__ == '__main__':
386 403
387 arg = docopt(""" 404 arg = docopt("""
388 Usage: reasoning.py [options] 405 Usage: reasoning.py [options]
389 406
390 -v Verbose 407 -v Verbose (and slow updates)
391 """) 408 """)
392 409
393 r = Reasoning() 410 r = Reasoning()
394 if arg['-v']: 411 if arg['-v']:
395 from colorlog import ColoredFormatter 412 from colorlog import ColoredFormatter
410 import twisted.python.log 427 import twisted.python.log
411 twisted.python.log.startLogging(sys.stdout) 428 twisted.python.log.startLogging(sys.stdout)
412 log.setLevel(logging.DEBUG) 429 log.setLevel(logging.DEBUG)
413 outlog.setLevel(logging.DEBUG) 430 outlog.setLevel(logging.DEBUG)
414 431
415 task.LoopingCall(r.poll).start(1.0) 432 task.LoopingCall(r.poll).start(1.0 if not arg['-v'] else 10)
416 reactor.listenTCP(9071, Application(r), interface='::') 433 reactor.listenTCP(9071, Application(r), interface='::')
417 reactor.run() 434 reactor.run()