Mercurial > code > home > repos > homeauto
comparison service/reasoning/reasoning.py @ 1070:1630e50d9842
sourceSubstr feature, untested
Ignore-this: a7e5e11bef7fceffc2d2687ce9edcf6e
darcs-hash:7d673210347008767dd40c7b48be32cf03e84372
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Thu, 14 Apr 2016 00:11:12 -0700 |
parents | c1961da4180a |
children | d3733587e749 |
comparison
equal
deleted
inserted
replaced
1069:c5b14fe5c387 | 1070:1630e50d9842 |
---|---|
44 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 44 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
45 DEV = Namespace("http://projects.bigasterisk.com/device/") | 45 DEV = Namespace("http://projects.bigasterisk.com/device/") |
46 | 46 |
47 | 47 |
48 class InputGraph(object): | 48 class InputGraph(object): |
49 def __init__(self, inputDirs, onChange): | 49 def __init__(self, inputDirs, onChange, sourceSubstr=None): |
50 """ | 50 """ |
51 this has one Graph that's made of: | 51 this has one Graph that's made of: |
52 - all .n3 files from inputDirs (read at startup) | 52 - all .n3 files from inputDirs (read at startup) |
53 - all the remote graphs, specified in the file graphs | 53 - all the remote graphs, specified in the file graphs |
54 | 54 |
59 change (in an interesting way) during updateFileData or | 59 change (in an interesting way) during updateFileData or |
60 updateRemoteData. Interesting means statements other than the | 60 updateRemoteData. Interesting means statements other than the |
61 ones with the predicates on the boring list. onChange(self, | 61 ones with the predicates on the boring list. onChange(self, |
62 oneShot=True) means: don't store the result of this change | 62 oneShot=True) means: don't store the result of this change |
63 anywhere; it needs to be processed only once | 63 anywhere; it needs to be processed only once |
64 | |
65 sourceSubstr filters to only pull from sources containing the | |
66 string (for debugging). | |
64 """ | 67 """ |
65 self.inputDirs = inputDirs | 68 self.inputDirs = inputDirs |
66 self.onChange = onChange | 69 self.onChange = onChange |
70 self.sourceSubstr = sourceSubstr | |
67 self._fileGraph = Graph() | 71 self._fileGraph = Graph() |
68 self._remoteGraph = None | 72 self._remoteGraph = None |
69 self._combinedGraph = None | 73 self._combinedGraph = None |
70 self._oneShotAdditionGraph = None | 74 self._oneShotAdditionGraph = None |
75 self._lastErrLog = {} # source: error | |
71 | 76 |
72 def updateFileData(self): | 77 def updateFileData(self): |
73 """ | 78 """ |
74 make sure we contain the correct data from the files in inputDirs | 79 make sure we contain the correct data from the files in inputDirs |
75 """ | 80 """ |
100 g = ConjunctiveGraph() | 105 g = ConjunctiveGraph() |
101 | 106 |
102 @inlineCallbacks | 107 @inlineCallbacks |
103 def fetchOne(source): | 108 def fetchOne(source): |
104 try: | 109 try: |
105 # this part could be parallelized | 110 fetchTime = yield addTrig(g, source, timeout=5) |
106 fetchTime = yield addTrig(g, source) | |
107 except Exception, e: | 111 except Exception, e: |
108 log.error(" can't add source %s: %s", source, e) | 112 e = str(e) |
109 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) | 113 if self._lastErrLog.get(source) != e: |
114 log.error(" can't add source %s: %s", source, e) | |
115 self._lastErrLog[source] = e | |
116 g.add((URIRef(source), ROOM['graphLoadError'], Literal(e))) | |
110 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) | 117 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) |
111 else: | 118 else: |
119 if self._lastErrLog.get(source): | |
120 log.warning(" source %s is back", source) | |
121 self._lastErrLog[source] = None | |
112 g.add((URIRef(source), ROOM['graphLoadMs'], | 122 g.add((URIRef(source), ROOM['graphLoadMs'], |
113 Literal(round(fetchTime * 1000, 1)))) | 123 Literal(round(fetchTime * 1000, 1)))) |
114 | 124 |
115 fetchDone = [] | 125 fetchDone = [] |
126 filtered = 0 | |
116 for source in self._fileGraph.objects(ROOM['reasoning'], | 127 for source in self._fileGraph.objects(ROOM['reasoning'], |
117 ROOM['source']): | 128 ROOM['source']): |
129 if self.sourceSubstr and self.sourceSubstr not in source: | |
130 filtered += 1 | |
131 continue | |
118 fetchDone.append(fetchOne(source)) | 132 fetchDone.append(fetchOne(source)) |
119 yield gatherResults(fetchDone, consumeErrors=True) | 133 yield gatherResults(fetchDone, consumeErrors=True) |
120 log.debug("loaded all in %.1f ms", 1000 * (time.time() - t1)) | 134 log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone), |
135 filtered, 1000 * (time.time() - t1)) | |
121 | 136 |
122 prevGraph = self._remoteGraph | 137 prevGraph = self._remoteGraph |
123 self._remoteGraph = g | 138 self._remoteGraph = g |
124 self._combinedGraph = None | 139 self._combinedGraph = None |
125 if (prevGraph is None or | 140 if (prevGraph is None or |
412 if __name__ == '__main__': | 427 if __name__ == '__main__': |
413 | 428 |
414 arg = docopt(""" | 429 arg = docopt(""" |
415 Usage: reasoning.py [options] | 430 Usage: reasoning.py [options] |
416 | 431 |
417 -v Verbose (and slow updates) | 432 -v Verbose (and slow updates) |
433 --source=<substr> Limit sources to those with this string. | |
418 """) | 434 """) |
419 | 435 |
420 r = Reasoning() | 436 r = Reasoning() |
421 if arg['-v']: | 437 if arg['-v']: |
422 from colorlog import ColoredFormatter | 438 from colorlog import ColoredFormatter |