comparison service/reasoning/inputgraph.py @ 1108:8caf62030955

reasoning uses sse_collector Ignore-this: 5b3787bd354b9bc82968c76ba262b725 darcs-hash:5ab617abe4bdfcdc8ad94e4272beb2cf548bb896
author drewp <drewp@bigasterisk.com>
date Mon, 29 Aug 2016 00:27:46 -0700
parents 6ab5238fc049
children 170dc9b1e789
comparison
equal deleted inserted replaced
1107:e68f6e5712c6 1108:8caf62030955
1 import logging, time 1 import logging, time, sys
2 2
3 from rdflib import Graph, ConjunctiveGraph 3 from rdflib import Graph, ConjunctiveGraph
4 from rdflib import Namespace, URIRef, Literal, RDF 4 from rdflib import Namespace, URIRef, Literal, RDF, RDFS
5 from rdflib.parser import StringInputSource 5 from rdflib.parser import StringInputSource
6 6
7 from twisted.python.filepath import FilePath 7 from twisted.python.filepath import FilePath
8 from twisted.internet.defer import inlineCallbacks, gatherResults 8 from twisted.internet.defer import inlineCallbacks, gatherResults
9 9
10 from rdflibtrig import addTrig 10 from rdflibtrig import addTrig
11 from graphop import graphEqual 11 from graphop import graphEqual
12
13 from patchsource import ReconnectingPatchSource
14
15 sys.path.append("/my/proj/light9")
16 from light9.rdfdb.rdflibpatch import patchQuads
12 17
13 log = logging.getLogger('fetch') 18 log = logging.getLogger('fetch')
14 19
15 ROOM = Namespace("http://projects.bigasterisk.com/room/") 20 ROOM = Namespace("http://projects.bigasterisk.com/room/")
16 DEV = Namespace("http://projects.bigasterisk.com/device/") 21 DEV = Namespace("http://projects.bigasterisk.com/device/")
22 'text/n3': 'n3', 27 'text/n3': 'n3',
23 }[contentType]) 28 }[contentType])
24 return g 29 return g
25 30
26 31
32 class RemoteData(object):
33 def __init__(self, onChange):
34 self.onChange = onChange
35 self.graph = ConjunctiveGraph()
36 self.patchSource = ReconnectingPatchSource(URIRef('http://bang:9072/graph/home'), self.onPatch)
37
38 def onPatch(self, p, fullGraph):
39 if fullGraph:
40 self.graph = ConjunctiveGraph()
41 patchQuads(self.graph,
42 deleteQuads=p.delQuads,
43 addQuads=p.addQuads,
44 perfect=True)
45
46 ignorePredicates = [
47 ROOM['signalStrength'],
48 # perhaps anything with a number-datatype for its
49 # object should be filtered out, and you have to make
50 # an upstream quantization (e.g. 'temp high'/'temp
51 # low') if you want to do reasoning on the difference
52 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
53 URIRef("http://bigasterisk.com/map#lastSeenAgo"),
54 ROOM['usingPower'],
55 ROOM['idleTimeMinutes'],
56 ROOM['idleTimeMs'],
57 ROOM['graphLoadMs'],
58 ROOM['localTimeToSecond'],
59 ROOM['history'],
60 ROOM['temperatureF'],
61 ROOM['connectedAgo'],
62 RDFS['comment'],
63 ]
64 ignoreContexts = [
65 URIRef('http://bigasterisk.com/sse_collector/'),
66 ]
67 for affected in p.addQuads + p.delQuads:
68 if (affected[1] not in ignorePredicates and
69 affected[3] not in ignoreContexts):
70 log.debug(" remote graph changed")
71 self.onChange()
72 break
73 else:
74 log.debug(" remote graph has no changes to trigger rules")
75
27 class InputGraph(object): 76 class InputGraph(object):
28 def __init__(self, inputDirs, onChange, sourceSubstr=None): 77 def __init__(self, inputDirs, onChange):
29 """ 78 """
30 this has one Graph that's made of: 79 this has one Graph that's made of:
31 - all .n3 files from inputDirs (read at startup) 80 - all .n3 files from inputDirs (read at startup)
32 - all the remote graphs, specified in the file graphs 81 - all the remote graphs, specified in the file graphs
33 82
38 change (in an interesting way) during updateFileData or 87 change (in an interesting way) during updateFileData or
39 updateRemoteData. Interesting means statements other than the 88 updateRemoteData. Interesting means statements other than the
40 ones with the predicates on the boring list. onChange(self, 89 ones with the predicates on the boring list. onChange(self,
41 oneShot=True) means: don't store the result of this change 90 oneShot=True) means: don't store the result of this change
42 anywhere; it needs to be processed only once 91 anywhere; it needs to be processed only once
43
44 sourceSubstr filters to only pull from sources containing the
45 string (for debugging).
46 """ 92 """
47 self.inputDirs = inputDirs 93 self.inputDirs = inputDirs
48 self.onChange = onChange 94 self.onChange = onChange
49 self.sourceSubstr = sourceSubstr
50 self._fileGraph = Graph() 95 self._fileGraph = Graph()
51 self._remoteGraph = None 96 self._remoteData = RemoteData(lambda: self.onChange(self))
52 self._combinedGraph = None 97 self._combinedGraph = None
53 self._oneShotAdditionGraph = None 98 self._oneShotAdditionGraph = None
54 self._lastErrLog = {} # source: error
55 99
56 def updateFileData(self): 100 def updateFileData(self):
57 """ 101 """
58 make sure we contain the correct data from the files in inputDirs 102 make sure we contain the correct data from the files in inputDirs
59 """ 103 """
70 # todo: if this fails, leave the report in the graph 114 # todo: if this fails, leave the report in the graph
71 self._fileGraph.parse(fp.open(), format="n3") 115 self._fileGraph.parse(fp.open(), format="n3")
72 self._combinedGraph = None 116 self._combinedGraph = None
73 117
74 self.onChange(self) 118 self.onChange(self)
75
76 @inlineCallbacks
77 def updateRemoteData(self):
78 """
79 read all remote graphs (which are themselves enumerated within
80 the file data)
81 """
82 t1 = time.time()
83 log.debug("read remote graphs")
84 g = ConjunctiveGraph()
85
86 @inlineCallbacks
87 def fetchOne(source):
88 try:
89 fetchTime = yield addTrig(g, source, timeout=5)
90 except Exception, e:
91 e = str(e)
92 if self._lastErrLog.get(source) != e:
93 log.error(" can't add source %s: %s", source, e)
94 self._lastErrLog[source] = e
95 g.add((URIRef(source), ROOM['graphLoadError'], Literal(e)))
96 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
97 else:
98 if self._lastErrLog.get(source):
99 log.warning(" source %s is back", source)
100 self._lastErrLog[source] = None
101 g.add((URIRef(source), ROOM['graphLoadMs'],
102 Literal(round(fetchTime * 1000, 1))))
103
104 fetchDone = []
105 filtered = 0
106 for source in self._fileGraph.objects(ROOM['reasoning'],
107 ROOM['source']):
108 if self.sourceSubstr and self.sourceSubstr not in source:
109 filtered += 1
110 continue
111 fetchDone.append(fetchOne(source))
112 yield gatherResults(fetchDone, consumeErrors=True)
113 log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone),
114 filtered, 1000 * (time.time() - t1))
115
116 prevGraph = self._remoteGraph
117 self._remoteGraph = g
118 self._combinedGraph = None
119 if (prevGraph is None or
120 not graphEqual(g, prevGraph, ignorePredicates=[
121 ROOM['signalStrength'],
122 # perhaps anything with a number-datatype for its
123 # object should be filtered out, and you have to make
124 # an upstream quantization (e.g. 'temp high'/'temp
125 # low') if you want to do reasoning on the difference
126 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
127 URIRef("http://bigasterisk.com/map#lastSeenAgo"),
128 ROOM['usingPower'],
129 ROOM['idleTimeMinutes'],
130 ROOM['idleTimeMs'],
131 ROOM['graphLoadMs'],
132 ROOM['localTimeToSecond'],
133 ROOM['history'],
134 ROOM['temperatureF'],
135 ROOM['connectedAgo'],
136 ])):
137 log.debug(" remote graph changed")
138 self.onChange(self)
139 else:
140 log.debug(" remote graph has no changes to trigger rules")
141 119
142 def addOneShot(self, g): 120 def addOneShot(self, g):
143 """ 121 """
144 add this graph to the total, call onChange, and then revert 122 add this graph to the total, call onChange, and then revert
145 the addition of this graph 123 the addition of this graph
168 if self._combinedGraph is None: 146 if self._combinedGraph is None:
169 self._combinedGraph = Graph() 147 self._combinedGraph = Graph()
170 if self._fileGraph: 148 if self._fileGraph:
171 for s in self._fileGraph: 149 for s in self._fileGraph:
172 self._combinedGraph.add(s) 150 self._combinedGraph.add(s)
173 if self._remoteGraph: 151 for s in self._remoteData.graph:
174 for s in self._remoteGraph: 152 self._combinedGraph.add(s)
175 self._combinedGraph.add(s)
176 if self._oneShotAdditionGraph: 153 if self._oneShotAdditionGraph:
177 for s in self._oneShotAdditionGraph: 154 for s in self._oneShotAdditionGraph:
178 self._combinedGraph.add(s) 155 self._combinedGraph.add(s)
179 156
180 return self._combinedGraph 157 return self._combinedGraph