1080
|
1 import logging, time
|
|
2
|
|
3 from rdflib import Graph, ConjunctiveGraph
|
|
4 from rdflib import Namespace, URIRef, Literal, RDF
|
|
5
|
|
6 from twisted.python.filepath import FilePath
|
|
7 from twisted.internet.defer import inlineCallbacks, gatherResults
|
|
8
|
|
9 from rdflibtrig import addTrig
|
|
10 from graphop import graphEqual
|
|
11
|
|
12 log = logging.getLogger('fetch')
|
|
13
|
|
14 ROOM = Namespace("http://projects.bigasterisk.com/room/")
|
|
15 DEV = Namespace("http://projects.bigasterisk.com/device/")
|
|
16
|
|
17
|
|
18 class InputGraph(object):
|
|
19 def __init__(self, inputDirs, onChange, sourceSubstr=None):
|
|
20 """
|
|
21 this has one Graph that's made of:
|
|
22 - all .n3 files from inputDirs (read at startup)
|
|
23 - all the remote graphs, specified in the file graphs
|
|
24
|
|
25 call updateFileData or updateRemoteData to reread those
|
|
26 graphs. getGraph to access the combined graph.
|
|
27
|
|
28 onChange(self) is called if the contents of the full graph
|
|
29 change (in an interesting way) during updateFileData or
|
|
30 updateRemoteData. Interesting means statements other than the
|
|
31 ones with the predicates on the boring list. onChange(self,
|
|
32 oneShot=True) means: don't store the result of this change
|
|
33 anywhere; it needs to be processed only once
|
|
34
|
|
35 sourceSubstr filters to only pull from sources containing the
|
|
36 string (for debugging).
|
|
37 """
|
|
38 self.inputDirs = inputDirs
|
|
39 self.onChange = onChange
|
|
40 self.sourceSubstr = sourceSubstr
|
|
41 self._fileGraph = Graph()
|
|
42 self._remoteGraph = None
|
|
43 self._combinedGraph = None
|
|
44 self._oneShotAdditionGraph = None
|
|
45 self._lastErrLog = {} # source: error
|
|
46
|
|
47 def updateFileData(self):
|
|
48 """
|
|
49 make sure we contain the correct data from the files in inputDirs
|
|
50 """
|
|
51 # this sample one is actually only needed for the output, but I don't
|
|
52 # think I want to have a separate graph for the output
|
|
53 # handling
|
|
54 log.debug("read file graphs")
|
|
55 for fp in FilePath("input").walk():
|
|
56 if fp.isdir():
|
|
57 continue
|
|
58 if fp.splitext()[1] != '.n3':
|
|
59 continue
|
|
60 log.debug("read %s", fp)
|
|
61 # todo: if this fails, leave the report in the graph
|
|
62 self._fileGraph.parse(fp.open(), format="n3")
|
|
63 self._combinedGraph = None
|
|
64
|
|
65 self.onChange(self)
|
|
66
|
|
67 @inlineCallbacks
|
|
68 def updateRemoteData(self):
|
|
69 """
|
|
70 read all remote graphs (which are themselves enumerated within
|
|
71 the file data)
|
|
72 """
|
|
73 t1 = time.time()
|
|
74 log.debug("read remote graphs")
|
|
75 g = ConjunctiveGraph()
|
|
76
|
|
77 @inlineCallbacks
|
|
78 def fetchOne(source):
|
|
79 try:
|
|
80 fetchTime = yield addTrig(g, source, timeout=5)
|
|
81 except Exception, e:
|
|
82 e = str(e)
|
|
83 if self._lastErrLog.get(source) != e:
|
|
84 log.error(" can't add source %s: %s", source, e)
|
|
85 self._lastErrLog[source] = e
|
|
86 g.add((URIRef(source), ROOM['graphLoadError'], Literal(e)))
|
|
87 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
|
|
88 else:
|
|
89 if self._lastErrLog.get(source):
|
|
90 log.warning(" source %s is back", source)
|
|
91 self._lastErrLog[source] = None
|
|
92 g.add((URIRef(source), ROOM['graphLoadMs'],
|
|
93 Literal(round(fetchTime * 1000, 1))))
|
|
94
|
|
95 fetchDone = []
|
|
96 filtered = 0
|
|
97 for source in self._fileGraph.objects(ROOM['reasoning'],
|
|
98 ROOM['source']):
|
|
99 if self.sourceSubstr and self.sourceSubstr not in source:
|
|
100 filtered += 1
|
|
101 continue
|
|
102 fetchDone.append(fetchOne(source))
|
|
103 yield gatherResults(fetchDone, consumeErrors=True)
|
|
104 log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone),
|
|
105 filtered, 1000 * (time.time() - t1))
|
|
106
|
|
107 prevGraph = self._remoteGraph
|
|
108 self._remoteGraph = g
|
|
109 self._combinedGraph = None
|
|
110 if (prevGraph is None or
|
|
111 not graphEqual(g, prevGraph, ignorePredicates=[
|
|
112 ROOM['signalStrength'],
|
|
113 # perhaps anything with a number-datatype for its
|
|
114 # object should be filtered out, and you have to make
|
|
115 # an upstream quantization (e.g. 'temp high'/'temp
|
|
116 # low') if you want to do reasoning on the difference
|
|
117 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
|
|
118 URIRef("http://bigasterisk.com/map#lastSeenAgo"),
|
|
119 ROOM['usingPower'],
|
|
120 ROOM['idleTimeMinutes'],
|
|
121 ROOM['idleTimeMs'],
|
|
122 ROOM['graphLoadMs'],
|
|
123 ROOM['localTimeToSecond'],
|
|
124 ROOM['history'],
|
|
125 ROOM['temperatureF'],
|
|
126 ROOM['connectedAgo'],
|
|
127 ])):
|
|
128 log.debug(" remote graph changed")
|
|
129 self.onChange(self)
|
|
130 else:
|
|
131 log.debug(" remote graph has no changes to trigger rules")
|
|
132
|
|
133 def addOneShot(self, g):
|
|
134 """
|
|
135 add this graph to the total, call onChange, and then revert
|
|
136 the addition of this graph
|
|
137 """
|
|
138 self._oneShotAdditionGraph = g
|
|
139 self._combinedGraph = None
|
|
140 try:
|
|
141 self.onChange(self, oneShot=True, oneShotGraph=g)
|
|
142 finally:
|
|
143 self._oneShotAdditionGraph = None
|
|
144 self._combinedGraph = None
|
|
145
|
|
146 def getGraph(self):
|
|
147 """rdflib Graph with the file+remote contents of the input graph"""
|
|
148 # this could be much faster with the combined readonly graph
|
|
149 # view from rdflib
|
|
150 if self._combinedGraph is None:
|
|
151 self._combinedGraph = Graph()
|
|
152 if self._fileGraph:
|
|
153 for s in self._fileGraph:
|
|
154 self._combinedGraph.add(s)
|
|
155 if self._remoteGraph:
|
|
156 for s in self._remoteGraph:
|
|
157 self._combinedGraph.add(s)
|
|
158 if self._oneShotAdditionGraph:
|
|
159 for s in self._oneShotAdditionGraph:
|
|
160 self._combinedGraph.add(s)
|
|
161
|
|
162 return self._combinedGraph
|