comparison service/reasoning/inputgraph.py @ 275:d3733587e749

refactor inputgraph Ignore-this: 9931e669c180d8141a51fb6f7927db0a
author drewp@bigasterisk.com
date Fri, 06 May 2016 15:42:04 -0700
parents
children 9728288c7f2f
comparison
equal deleted inserted replaced
274:de541d0697b8 275:d3733587e749
1 import logging, time
2
3 from rdflib import Graph, ConjunctiveGraph
4 from rdflib import Namespace, URIRef, Literal, RDF
5
6 from twisted.python.filepath import FilePath
7 from twisted.internet.defer import inlineCallbacks, gatherResults
8
9 from rdflibtrig import addTrig
10 from graphop import graphEqual
11
12 log = logging.getLogger('fetch')
13
14 ROOM = Namespace("http://projects.bigasterisk.com/room/")
15 DEV = Namespace("http://projects.bigasterisk.com/device/")
16
17
18 class InputGraph(object):
19 def __init__(self, inputDirs, onChange, sourceSubstr=None):
20 """
21 this has one Graph that's made of:
22 - all .n3 files from inputDirs (read at startup)
23 - all the remote graphs, specified in the file graphs
24
25 call updateFileData or updateRemoteData to reread those
26 graphs. getGraph to access the combined graph.
27
28 onChange(self) is called if the contents of the full graph
29 change (in an interesting way) during updateFileData or
30 updateRemoteData. Interesting means statements other than the
31 ones with the predicates on the boring list. onChange(self,
32 oneShot=True) means: don't store the result of this change
33 anywhere; it needs to be processed only once
34
35 sourceSubstr filters to only pull from sources containing the
36 string (for debugging).
37 """
38 self.inputDirs = inputDirs
39 self.onChange = onChange
40 self.sourceSubstr = sourceSubstr
41 self._fileGraph = Graph()
42 self._remoteGraph = None
43 self._combinedGraph = None
44 self._oneShotAdditionGraph = None
45 self._lastErrLog = {} # source: error
46
47 def updateFileData(self):
48 """
49 make sure we contain the correct data from the files in inputDirs
50 """
51 # this sample one is actually only needed for the output, but I don't
52 # think I want to have a separate graph for the output
53 # handling
54 log.debug("read file graphs")
55 for fp in FilePath("input").walk():
56 if fp.isdir():
57 continue
58 if fp.splitext()[1] != '.n3':
59 continue
60 log.debug("read %s", fp)
61 # todo: if this fails, leave the report in the graph
62 self._fileGraph.parse(fp.open(), format="n3")
63 self._combinedGraph = None
64
65 self.onChange(self)
66
67 @inlineCallbacks
68 def updateRemoteData(self):
69 """
70 read all remote graphs (which are themselves enumerated within
71 the file data)
72 """
73 t1 = time.time()
74 log.debug("read remote graphs")
75 g = ConjunctiveGraph()
76
77 @inlineCallbacks
78 def fetchOne(source):
79 try:
80 fetchTime = yield addTrig(g, source, timeout=5)
81 except Exception, e:
82 e = str(e)
83 if self._lastErrLog.get(source) != e:
84 log.error(" can't add source %s: %s", source, e)
85 self._lastErrLog[source] = e
86 g.add((URIRef(source), ROOM['graphLoadError'], Literal(e)))
87 g.add((URIRef(source), RDF.type, ROOM['FailedGraphLoad']))
88 else:
89 if self._lastErrLog.get(source):
90 log.warning(" source %s is back", source)
91 self._lastErrLog[source] = None
92 g.add((URIRef(source), ROOM['graphLoadMs'],
93 Literal(round(fetchTime * 1000, 1))))
94
95 fetchDone = []
96 filtered = 0
97 for source in self._fileGraph.objects(ROOM['reasoning'],
98 ROOM['source']):
99 if self.sourceSubstr and self.sourceSubstr not in source:
100 filtered += 1
101 continue
102 fetchDone.append(fetchOne(source))
103 yield gatherResults(fetchDone, consumeErrors=True)
104 log.debug("loaded %s (skipping %s) in %.1f ms", len(fetchDone),
105 filtered, 1000 * (time.time() - t1))
106
107 prevGraph = self._remoteGraph
108 self._remoteGraph = g
109 self._combinedGraph = None
110 if (prevGraph is None or
111 not graphEqual(g, prevGraph, ignorePredicates=[
112 ROOM['signalStrength'],
113 # perhaps anything with a number-datatype for its
114 # object should be filtered out, and you have to make
115 # an upstream quantization (e.g. 'temp high'/'temp
116 # low') if you want to do reasoning on the difference
117 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"),
118 URIRef("http://bigasterisk.com/map#lastSeenAgo"),
119 ROOM['usingPower'],
120 ROOM['idleTimeMinutes'],
121 ROOM['idleTimeMs'],
122 ROOM['graphLoadMs'],
123 ROOM['localTimeToSecond'],
124 ROOM['history'],
125 ROOM['temperatureF'],
126 ROOM['connectedAgo'],
127 ])):
128 log.debug(" remote graph changed")
129 self.onChange(self)
130 else:
131 log.debug(" remote graph has no changes to trigger rules")
132
133 def addOneShot(self, g):
134 """
135 add this graph to the total, call onChange, and then revert
136 the addition of this graph
137 """
138 self._oneShotAdditionGraph = g
139 self._combinedGraph = None
140 try:
141 self.onChange(self, oneShot=True, oneShotGraph=g)
142 finally:
143 self._oneShotAdditionGraph = None
144 self._combinedGraph = None
145
146 def getGraph(self):
147 """rdflib Graph with the file+remote contents of the input graph"""
148 # this could be much faster with the combined readonly graph
149 # view from rdflib
150 if self._combinedGraph is None:
151 self._combinedGraph = Graph()
152 if self._fileGraph:
153 for s in self._fileGraph:
154 self._combinedGraph.add(s)
155 if self._remoteGraph:
156 for s in self._remoteGraph:
157 self._combinedGraph.add(s)
158 if self._oneShotAdditionGraph:
159 for s in self._oneShotAdditionGraph:
160 self._combinedGraph.add(s)
161
162 return self._combinedGraph