comparison service/reasoning/reasoning.py @ 1070:1630e50d9842

sourceSubstr feature, untested Ignore-this: a7e5e11bef7fceffc2d2687ce9edcf6e darcs-hash:7d673210347008767dd40c7b48be32cf03e84372
author drewp <drewp@bigasterisk.com>
date Thu, 14 Apr 2016 00:11:12 -0700
parents c1961da4180a
children d3733587e749
comparison
equal deleted inserted replaced
1069:c5b14fe5c387 1070:1630e50d9842
44 ROOM = Namespace("http://projects.bigasterisk.com/room/") 44 ROOM = Namespace("http://projects.bigasterisk.com/room/")
45 DEV = Namespace("http://projects.bigasterisk.com/device/") 45 DEV = Namespace("http://projects.bigasterisk.com/device/")
46 46
47 47
48 class InputGraph(object): 48 class InputGraph(object):
49 def __init__(self, inputDirs, onChange): 49 def __init__(self, inputDirs, onChange, sourceSubstr=None):
50 """ 50 """
51 this has one Graph that's made of: 51 this has one Graph that's made of:
52 - all .n3 files from inputDirs (read at startup) 52 - all .n3 files from inputDirs (read at startup)
53 - all the remote graphs, specified in the file graphs 53 - all the remote graphs, specified in the file graphs
54 54
59 change (in an interesting way) during updateFileData or 59 change (in an interesting way) during updateFileData or
60 updateRemoteData. Interesting means statements other than the 60 updateRemoteData. Interesting means statements other than the
61 ones with the predicates on the boring list. onChange(self, 61 ones with the predicates on the boring list. onChange(self,
62 oneShot=True) means: don't store the result of this change 62 oneShot=True) means: don't store the result of this change
63 anywhere; it needs to be processed only once 63 anywhere; it needs to be processed only once
64
65 sourceSubstr filters to only pull from sources containing the
66 string (for debugging).
64 """ 67 """
65 self.inputDirs = inputDirs 68 self.inputDirs = inputDirs
66 self.onChange = onChange 69 self.onChange = onChange
70 self.sourceSubstr = sourceSubstr
67 self._fileGraph = Graph() 71 self._fileGraph = Graph()
68 self._remoteGraph = None 72 self._remoteGraph = None
69 self._combinedGraph = None 73 self._combinedGraph = None
70 self._oneShotAdditionGraph = None 74 self._oneShotAdditionGraph = None
75 self._lastErrLog = {} # source: error
71 76
72 def updateFileData(self): 77 def updateFileData(self):
73 """ 78 """
74 make sure we contain the correct data from the files in inputDirs 79 make sure we contain the correct data from the files in inputDirs
75 """ 80 """
100 g = ConjunctiveGraph() 105 g = ConjunctiveGraph()
101 106
102 @inlineCallbacks 107 @inlineCallbacks
103 def fetchOne(source): 108 def fetchOne(source):
104 try: 109 try:
105 # this part could be parallelized 110 fetchTime = yield addTrig(g, source, timeout=5)
106 fetchTime = yield addTrig(g, source)
107 except Exception, e: 111 except Exception, e:
108 log.error(" can't add source %s: %s", source, e) 112 e = str(e)
109 g.add((URIRef(source), ROOM['graphLoadError'], Literal(str(e)))) 113 if self._lastErrLog.get(source) != e:
114 log.error(" can't add source %s: %s", source, e)
115 self._lastErrLog[source] = e
116 g.add((URIRef(source), ROOM['graphLoadError'], Literal(e)))
110 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) 117 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
111 else: 118 else:
119 if self._lastErrLog.get(source):
120 log.warning(" source %s is back", source)
121 self._lastErrLog[source] = None
112 g.add((URIRef(source), ROOM['graphLoadMs'], 122 g.add((URIRef(source), ROOM['graphLoadMs'],
113 Literal(round(fetchTime * 1000, 1)))) 123 Literal(round(fetchTime * 1000, 1))))
114 124
115 fetchDone = [] 125 fetchDone = []
126 filtered = 0
116 for source in self._fileGraph.objects(ROOM['reasoning'], 127 for source in self._fileGraph.objects(ROOM['reasoning'],
117 ROOM['source']): 128 ROOM['source']):
129 if self.sourceSubstr and self.sourceSubstr not in source:
130 filtered += 1
131 continue
118 fetchDone.append(fetchOne(source)) 132 fetchDone.append(fetchOne(source))
119 yield gatherResults(fetchDone, consumeErrors=True) 133 yield gatherResults(fetchDone, consumeErrors=True)
120 log.debug("loaded all in %.1f ms", 1000 * (time.time() - t1)) 134 log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone),
135 filtered, 1000 * (time.time() - t1))
121 136
122 prevGraph = self._remoteGraph 137 prevGraph = self._remoteGraph
123 self._remoteGraph = g 138 self._remoteGraph = g
124 self._combinedGraph = None 139 self._combinedGraph = None
125 if (prevGraph is None or 140 if (prevGraph is None or
412 if __name__ == '__main__': 427 if __name__ == '__main__':
413 428
414 arg = docopt(""" 429 arg = docopt("""
415 Usage: reasoning.py [options] 430 Usage: reasoning.py [options]
416 431
417 -v Verbose (and slow updates) 432 -v Verbose (and slow updates)
433 --source=<substr> Limit sources to those with this string.
418 """) 434 """)
419 435
420 r = Reasoning() 436 r = Reasoning()
421 if arg['-v']: 437 if arg['-v']:
422 from colorlog import ColoredFormatter 438 from colorlog import ColoredFormatter