Mercurial > code > home > repos > homeauto
comparison service/reasoning/inputgraph.py @ 1108:8caf62030955
reasoning uses sse_collector
Ignore-this: 5b3787bd354b9bc82968c76ba262b725
darcs-hash:5ab617abe4bdfcdc8ad94e4272beb2cf548bb896
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Mon, 29 Aug 2016 00:27:46 -0700 |
parents | 6ab5238fc049 |
children | 170dc9b1e789 |
comparison
equal
deleted
inserted
replaced
1107:e68f6e5712c6 | 1108:8caf62030955 |
---|---|
1 import logging, time | 1 import logging, time, sys |
2 | 2 |
3 from rdflib import Graph, ConjunctiveGraph | 3 from rdflib import Graph, ConjunctiveGraph |
4 from rdflib import Namespace, URIRef, Literal, RDF | 4 from rdflib import Namespace, URIRef, Literal, RDF, RDFS |
5 from rdflib.parser import StringInputSource | 5 from rdflib.parser import StringInputSource |
6 | 6 |
7 from twisted.python.filepath import FilePath | 7 from twisted.python.filepath import FilePath |
8 from twisted.internet.defer import inlineCallbacks, gatherResults | 8 from twisted.internet.defer import inlineCallbacks, gatherResults |
9 | 9 |
10 from rdflibtrig import addTrig | 10 from rdflibtrig import addTrig |
11 from graphop import graphEqual | 11 from graphop import graphEqual |
12 | |
13 from patchsource import ReconnectingPatchSource | |
14 | |
15 sys.path.append("/my/proj/light9") | |
16 from light9.rdfdb.rdflibpatch import patchQuads | |
12 | 17 |
13 log = logging.getLogger('fetch') | 18 log = logging.getLogger('fetch') |
14 | 19 |
15 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 20 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
16 DEV = Namespace("http://projects.bigasterisk.com/device/") | 21 DEV = Namespace("http://projects.bigasterisk.com/device/") |
22 'text/n3': 'n3', | 27 'text/n3': 'n3', |
23 }[contentType]) | 28 }[contentType]) |
24 return g | 29 return g |
25 | 30 |
26 | 31 |
32 class RemoteData(object): | |
33 def __init__(self, onChange): | |
34 self.onChange = onChange | |
35 self.graph = ConjunctiveGraph() | |
36 self.patchSource = ReconnectingPatchSource(URIRef('http://bang:9072/graph/home'), self.onPatch) | |
37 | |
38 def onPatch(self, p, fullGraph): | |
39 if fullGraph: | |
40 self.graph = ConjunctiveGraph() | |
41 patchQuads(self.graph, | |
42 deleteQuads=p.delQuads, | |
43 addQuads=p.addQuads, | |
44 perfect=True) | |
45 | |
46 ignorePredicates = [ | |
47 ROOM['signalStrength'], | |
48 # perhaps anything with a number-datatype for its | |
49 # object should be filtered out, and you have to make | |
50 # an upstream quantization (e.g. 'temp high'/'temp | |
51 # low') if you want to do reasoning on the difference | |
52 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), | |
53 URIRef("http://bigasterisk.com/map#lastSeenAgo"), | |
54 ROOM['usingPower'], | |
55 ROOM['idleTimeMinutes'], | |
56 ROOM['idleTimeMs'], | |
57 ROOM['graphLoadMs'], | |
58 ROOM['localTimeToSecond'], | |
59 ROOM['history'], | |
60 ROOM['temperatureF'], | |
61 ROOM['connectedAgo'], | |
62 RDFS['comment'], | |
63 ] | |
64 ignoreContexts = [ | |
65 URIRef('http://bigasterisk.com/sse_collector/'), | |
66 ] | |
67 for affected in p.addQuads + p.delQuads: | |
68 if (affected[1] not in ignorePredicates and | |
69 affected[3] not in ignoreContexts): | |
70 log.debug(" remote graph changed") | |
71 self.onChange() | |
72 break | |
73 else: | |
74 log.debug(" remote graph has no changes to trigger rules") | |
75 | |
27 class InputGraph(object): | 76 class InputGraph(object): |
28 def __init__(self, inputDirs, onChange, sourceSubstr=None): | 77 def __init__(self, inputDirs, onChange): |
29 """ | 78 """ |
30 this has one Graph that's made of: | 79 this has one Graph that's made of: |
31 - all .n3 files from inputDirs (read at startup) | 80 - all .n3 files from inputDirs (read at startup) |
32 - all the remote graphs, specified in the file graphs | 81 - all the remote graphs, specified in the file graphs |
33 | 82 |
38 change (in an interesting way) during updateFileData or | 87 change (in an interesting way) during updateFileData or |
39 updateRemoteData. Interesting means statements other than the | 88 updateRemoteData. Interesting means statements other than the |
40 ones with the predicates on the boring list. onChange(self, | 89 ones with the predicates on the boring list. onChange(self, |
41 oneShot=True) means: don't store the result of this change | 90 oneShot=True) means: don't store the result of this change |
42 anywhere; it needs to be processed only once | 91 anywhere; it needs to be processed only once |
43 | |
44 sourceSubstr filters to only pull from sources containing the | |
45 string (for debugging). | |
46 """ | 92 """ |
47 self.inputDirs = inputDirs | 93 self.inputDirs = inputDirs |
48 self.onChange = onChange | 94 self.onChange = onChange |
49 self.sourceSubstr = sourceSubstr | |
50 self._fileGraph = Graph() | 95 self._fileGraph = Graph() |
51 self._remoteGraph = None | 96 self._remoteData = RemoteData(lambda: self.onChange(self)) |
52 self._combinedGraph = None | 97 self._combinedGraph = None |
53 self._oneShotAdditionGraph = None | 98 self._oneShotAdditionGraph = None |
54 self._lastErrLog = {} # source: error | |
55 | 99 |
56 def updateFileData(self): | 100 def updateFileData(self): |
57 """ | 101 """ |
58 make sure we contain the correct data from the files in inputDirs | 102 make sure we contain the correct data from the files in inputDirs |
59 """ | 103 """ |
70 # todo: if this fails, leave the report in the graph | 114 # todo: if this fails, leave the report in the graph |
71 self._fileGraph.parse(fp.open(), format="n3") | 115 self._fileGraph.parse(fp.open(), format="n3") |
72 self._combinedGraph = None | 116 self._combinedGraph = None |
73 | 117 |
74 self.onChange(self) | 118 self.onChange(self) |
75 | |
76 @inlineCallbacks | |
77 def updateRemoteData(self): | |
78 """ | |
79 read all remote graphs (which are themselves enumerated within | |
80 the file data) | |
81 """ | |
82 t1 = time.time() | |
83 log.debug("read remote graphs") | |
84 g = ConjunctiveGraph() | |
85 | |
86 @inlineCallbacks | |
87 def fetchOne(source): | |
88 try: | |
89 fetchTime = yield addTrig(g, source, timeout=5) | |
90 except Exception, e: | |
91 e = str(e) | |
92 if self._lastErrLog.get(source) != e: | |
93 log.error(" can't add source %s: %s", source, e) | |
94 self._lastErrLog[source] = e | |
95 g.add((URIRef(source), ROOM['graphLoadError'], Literal(e))) | |
96 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) | |
97 else: | |
98 if self._lastErrLog.get(source): | |
99 log.warning(" source %s is back", source) | |
100 self._lastErrLog[source] = None | |
101 g.add((URIRef(source), ROOM['graphLoadMs'], | |
102 Literal(round(fetchTime * 1000, 1)))) | |
103 | |
104 fetchDone = [] | |
105 filtered = 0 | |
106 for source in self._fileGraph.objects(ROOM['reasoning'], | |
107 ROOM['source']): | |
108 if self.sourceSubstr and self.sourceSubstr not in source: | |
109 filtered += 1 | |
110 continue | |
111 fetchDone.append(fetchOne(source)) | |
112 yield gatherResults(fetchDone, consumeErrors=True) | |
113 log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone), | |
114 filtered, 1000 * (time.time() - t1)) | |
115 | |
116 prevGraph = self._remoteGraph | |
117 self._remoteGraph = g | |
118 self._combinedGraph = None | |
119 if (prevGraph is None or | |
120 not graphEqual(g, prevGraph, ignorePredicates=[ | |
121 ROOM['signalStrength'], | |
122 # perhaps anything with a number-datatype for its | |
123 # object should be filtered out, and you have to make | |
124 # an upstream quantization (e.g. 'temp high'/'temp | |
125 # low') if you want to do reasoning on the difference | |
126 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), | |
127 URIRef("http://bigasterisk.com/map#lastSeenAgo"), | |
128 ROOM['usingPower'], | |
129 ROOM['idleTimeMinutes'], | |
130 ROOM['idleTimeMs'], | |
131 ROOM['graphLoadMs'], | |
132 ROOM['localTimeToSecond'], | |
133 ROOM['history'], | |
134 ROOM['temperatureF'], | |
135 ROOM['connectedAgo'], | |
136 ])): | |
137 log.debug(" remote graph changed") | |
138 self.onChange(self) | |
139 else: | |
140 log.debug(" remote graph has no changes to trigger rules") | |
141 | 119 |
142 def addOneShot(self, g): | 120 def addOneShot(self, g): |
143 """ | 121 """ |
144 add this graph to the total, call onChange, and then revert | 122 add this graph to the total, call onChange, and then revert |
145 the addition of this graph | 123 the addition of this graph |
168 if self._combinedGraph is None: | 146 if self._combinedGraph is None: |
169 self._combinedGraph = Graph() | 147 self._combinedGraph = Graph() |
170 if self._fileGraph: | 148 if self._fileGraph: |
171 for s in self._fileGraph: | 149 for s in self._fileGraph: |
172 self._combinedGraph.add(s) | 150 self._combinedGraph.add(s) |
173 if self._remoteGraph: | 151 for s in self._remoteData.graph: |
174 for s in self._remoteGraph: | 152 self._combinedGraph.add(s) |
175 self._combinedGraph.add(s) | |
176 if self._oneShotAdditionGraph: | 153 if self._oneShotAdditionGraph: |
177 for s in self._oneShotAdditionGraph: | 154 for s in self._oneShotAdditionGraph: |
178 self._combinedGraph.add(s) | 155 self._combinedGraph.add(s) |
179 | 156 |
180 return self._combinedGraph | 157 return self._combinedGraph |