Mercurial > code > home > repos > homeauto
comparison service/reasoning/inputgraph.py @ 275:d3733587e749
refactor inputgraph
Ignore-this: 9931e669c180d8141a51fb6f7927db0a
author | drewp@bigasterisk.com |
---|---|
date | Fri, 06 May 2016 15:42:04 -0700 |
parents | |
children | 9728288c7f2f |
comparison
equal
deleted
inserted
replaced
274:de541d0697b8 | 275:d3733587e749 |
---|---|
1 import logging, time | |
2 | |
3 from rdflib import Graph, ConjunctiveGraph | |
4 from rdflib import Namespace, URIRef, Literal, RDF | |
5 | |
6 from twisted.python.filepath import FilePath | |
7 from twisted.internet.defer import inlineCallbacks, gatherResults | |
8 | |
9 from rdflibtrig import addTrig | |
10 from graphop import graphEqual | |
11 | |
12 log = logging.getLogger('fetch') | |
13 | |
14 ROOM = Namespace("http://projects.bigasterisk.com/room/") | |
15 DEV = Namespace("http://projects.bigasterisk.com/device/") | |
16 | |
17 | |
18 class InputGraph(object): | |
19 def __init__(self, inputDirs, onChange, sourceSubstr=None): | |
20 """ | |
21 this has one Graph that's made of: | |
22 - all .n3 files from inputDirs (read at startup) | |
23 - all the remote graphs, specified in the file graphs | |
24 | |
25 call updateFileData or updateRemoteData to reread those | |
26 graphs. getGraph to access the combined graph. | |
27 | |
28 onChange(self) is called if the contents of the full graph | |
29 change (in an interesting way) during updateFileData or | |
30 updateRemoteData. Interesting means statements other than the | |
31 ones with the predicates on the boring list. onChange(self, | |
32 oneShot=True) means: don't store the result of this change | |
33 anywhere; it needs to be processed only once | |
34 | |
35 sourceSubstr filters to only pull from sources containing the | |
36 string (for debugging). | |
37 """ | |
38 self.inputDirs = inputDirs | |
39 self.onChange = onChange | |
40 self.sourceSubstr = sourceSubstr | |
41 self._fileGraph = Graph() | |
42 self._remoteGraph = None | |
43 self._combinedGraph = None | |
44 self._oneShotAdditionGraph = None | |
45 self._lastErrLog = {} # source: error | |
46 | |
47 def updateFileData(self): | |
48 """ | |
49 make sure we contain the correct data from the files in inputDirs | |
50 """ | |
51 # this sample one is actually only needed for the output, but I don't | |
52 # think I want to have a separate graph for the output | |
53 # handling | |
54 log.debug("read file graphs") | |
55 for fp in FilePath("input").walk(): | |
56 if fp.isdir(): | |
57 continue | |
58 if fp.splitext()[1] != '.n3': | |
59 continue | |
60 log.debug("read %s", fp) | |
61 # todo: if this fails, leave the report in the graph | |
62 self._fileGraph.parse(fp.open(), format="n3") | |
63 self._combinedGraph = None | |
64 | |
65 self.onChange(self) | |
66 | |
67 @inlineCallbacks | |
68 def updateRemoteData(self): | |
69 """ | |
70 read all remote graphs (which are themselves enumerated within | |
71 the file data) | |
72 """ | |
73 t1 = time.time() | |
74 log.debug("read remote graphs") | |
75 g = ConjunctiveGraph() | |
76 | |
77 @inlineCallbacks | |
78 def fetchOne(source): | |
79 try: | |
80 fetchTime = yield addTrig(g, source, timeout=5) | |
81 except Exception, e: | |
82 e = str(e) | |
83 if self._lastErrLog.get(source) != e: | |
84 log.error(" can't add source %s: %s", source, e) | |
85 self._lastErrLog[source] = e | |
86 g.add((URIRef(source), ROOM['graphLoadError'], Literal(e))) | |
87 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad'])) | |
88 else: | |
89 if self._lastErrLog.get(source): | |
90 log.warning(" source %s is back", source) | |
91 self._lastErrLog[source] = None | |
92 g.add((URIRef(source), ROOM['graphLoadMs'], | |
93 Literal(round(fetchTime * 1000, 1)))) | |
94 | |
95 fetchDone = [] | |
96 filtered = 0 | |
97 for source in self._fileGraph.objects(ROOM['reasoning'], | |
98 ROOM['source']): | |
99 if self.sourceSubstr and self.sourceSubstr not in source: | |
100 filtered += 1 | |
101 continue | |
102 fetchDone.append(fetchOne(source)) | |
103 yield gatherResults(fetchDone, consumeErrors=True) | |
104 log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone), | |
105 filtered, 1000 * (time.time() - t1)) | |
106 | |
107 prevGraph = self._remoteGraph | |
108 self._remoteGraph = g | |
109 self._combinedGraph = None | |
110 if (prevGraph is None or | |
111 not graphEqual(g, prevGraph, ignorePredicates=[ | |
112 ROOM['signalStrength'], | |
113 # perhaps anything with a number-datatype for its | |
114 # object should be filtered out, and you have to make | |
115 # an upstream quantization (e.g. 'temp high'/'temp | |
116 # low') if you want to do reasoning on the difference | |
117 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), | |
118 URIRef("http://bigasterisk.com/map#lastSeenAgo"), | |
119 ROOM['usingPower'], | |
120 ROOM['idleTimeMinutes'], | |
121 ROOM['idleTimeMs'], | |
122 ROOM['graphLoadMs'], | |
123 ROOM['localTimeToSecond'], | |
124 ROOM['history'], | |
125 ROOM['temperatureF'], | |
126 ROOM['connectedAgo'], | |
127 ])): | |
128 log.debug(" remote graph changed") | |
129 self.onChange(self) | |
130 else: | |
131 log.debug(" remote graph has no changes to trigger rules") | |
132 | |
133 def addOneShot(self, g): | |
134 """ | |
135 add this graph to the total, call onChange, and then revert | |
136 the addition of this graph | |
137 """ | |
138 self._oneShotAdditionGraph = g | |
139 self._combinedGraph = None | |
140 try: | |
141 self.onChange(self, oneShot=True, oneShotGraph=g) | |
142 finally: | |
143 self._oneShotAdditionGraph = None | |
144 self._combinedGraph = None | |
145 | |
146 def getGraph(self): | |
147 """rdflib Graph with the file+remote contents of the input graph""" | |
148 # this could be much faster with the combined readonly graph | |
149 # view from rdflib | |
150 if self._combinedGraph is None: | |
151 self._combinedGraph = Graph() | |
152 if self._fileGraph: | |
153 for s in self._fileGraph: | |
154 self._combinedGraph.add(s) | |
155 if self._remoteGraph: | |
156 for s in self._remoteGraph: | |
157 self._combinedGraph.add(s) | |
158 if self._oneShotAdditionGraph: | |
159 for s in self._oneShotAdditionGraph: | |
160 self._combinedGraph.add(s) | |
161 | |
162 return self._combinedGraph |