Mercurial > code > home > repos > homeauto
annotate service/reasoning/inputgraph.py @ 1237:c8ac652d37b7
switch to uart
Ignore-this: b5285d12100bc182b970ea392f48e259
darcs-hash:c8280607c51cb0da02361600f2fa3d62be66b83b
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 09 Apr 2019 09:05:42 -0700 |
parents | d8acab2b01f5 |
children | 1ceb26846eca |
rev | line source |
---|---|
1108
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
1 import logging, time, sys |
1080 | 2 |
3 from rdflib import Graph, ConjunctiveGraph | |
1108
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
4 from rdflib import Namespace, URIRef, Literal, RDF, RDFS |
1086 | 5 from rdflib.parser import StringInputSource |
1080 | 6 |
7 from twisted.python.filepath import FilePath | |
8 from twisted.internet.defer import inlineCallbacks, gatherResults | |
9 | |
10 from rdflibtrig import addTrig | |
11 from graphop import graphEqual | |
1197
d8acab2b01f5
mqtt has two devices now. various older cleanups.
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
12 from greplin import scales |
1080 | 13 |
1108
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
14 from patchsource import ReconnectingPatchSource |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
15 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
1119
diff
changeset
|
16 sys.path.append("/my/proj/rdfdb") |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
1119
diff
changeset
|
17 from rdfdb.rdflibpatch import patchQuads |
1108
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
18 |
1080 | 19 log = logging.getLogger('fetch') |
20 | |
21 ROOM = Namespace("http://projects.bigasterisk.com/room/") | |
22 DEV = Namespace("http://projects.bigasterisk.com/device/") | |
23 | |
24 | |
1197
d8acab2b01f5
mqtt has two devices now. various older cleanups.
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
25 STATS = scales.collection('/web', |
d8acab2b01f5
mqtt has two devices now. various older cleanups.
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
26 scales.PmfStat('combineGraph'), |
d8acab2b01f5
mqtt has two devices now. various older cleanups.
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
27 ) |
1086 | 28 def parseRdf(text, contentType): |
29 g = Graph() | |
30 g.parse(StringInputSource(text), format={ | |
31 'text/n3': 'n3', | |
32 }[contentType]) | |
33 return g | |
34 | |
35 | |
1108
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
36 class RemoteData(object): |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
37 def __init__(self, onChange): |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
38 self.onChange = onChange |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
39 self.graph = ConjunctiveGraph() |
1197
d8acab2b01f5
mqtt has two devices now. various older cleanups.
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
40 self.patchSource = ReconnectingPatchSource( |
d8acab2b01f5
mqtt has two devices now. various older cleanups.
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
41 URIRef('http://bang:9072/graph/home'), |
d8acab2b01f5
mqtt has two devices now. various older cleanups.
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
42 #URIRef('http://frontdoor:10012/graph/events'), |
d8acab2b01f5
mqtt has two devices now. various older cleanups.
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
43 self.onPatch, reconnectSecs=10) |
1108
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
44 |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
45 def onPatch(self, p, fullGraph): |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
46 if fullGraph: |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
47 self.graph = ConjunctiveGraph() |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
48 patchQuads(self.graph, |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
49 deleteQuads=p.delQuads, |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
50 addQuads=p.addQuads, |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
51 perfect=True) |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
52 |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
53 ignorePredicates = [ |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
54 ROOM['signalStrength'], |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
55 # perhaps anything with a number-datatype for its |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
56 # object should be filtered out, and you have to make |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
57 # an upstream quantization (e.g. 'temp high'/'temp |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
58 # low') if you want to do reasoning on the difference |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
59 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
60 URIRef("http://bigasterisk.com/map#lastSeenAgo"), |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
61 ROOM['usingPower'], |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
62 ROOM['idleTimeMinutes'], |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
63 ROOM['idleTimeMs'], |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
64 ROOM['graphLoadMs'], |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
65 ROOM['localTimeToSecond'], |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
66 ROOM['history'], |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
67 ROOM['connectedAgo'], |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
68 RDFS['comment'], |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
69 ] |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
70 ignoreContexts = [ |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
71 URIRef('http://bigasterisk.com/sse_collector/'), |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
72 ] |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
73 for affected in p.addQuads + p.delQuads: |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
74 if (affected[1] not in ignorePredicates and |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
75 affected[3] not in ignoreContexts): |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
76 log.debug(" remote graph changed") |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
77 self.onChange() |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
78 break |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
79 else: |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
80 log.debug(" remote graph has no changes to trigger rules") |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
81 |
1080 | 82 class InputGraph(object): |
1108
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
83 def __init__(self, inputDirs, onChange): |
1080 | 84 """ |
85 this has one Graph that's made of: | |
86 - all .n3 files from inputDirs (read at startup) | |
87 - all the remote graphs, specified in the file graphs | |
88 | |
89 call updateFileData or updateRemoteData to reread those | |
90 graphs. getGraph to access the combined graph. | |
91 | |
92 onChange(self) is called if the contents of the full graph | |
93 change (in an interesting way) during updateFileData or | |
94 updateRemoteData. Interesting means statements other than the | |
95 ones with the predicates on the boring list. onChange(self, | |
96 oneShot=True) means: don't store the result of this change | |
97 anywhere; it needs to be processed only once | |
98 """ | |
99 self.inputDirs = inputDirs | |
1117
d5687ba23279
fix input graph web display by dirtying combinedGraph better.
drewp <drewp@bigasterisk.com>
parents:
1108
diff
changeset
|
100 self._onChange = onChange |
1080 | 101 self._fileGraph = Graph() |
1117
d5687ba23279
fix input graph web display by dirtying combinedGraph better.
drewp <drewp@bigasterisk.com>
parents:
1108
diff
changeset
|
102 self._remoteData = RemoteData(lambda: self.onChangeLocal()) |
1080 | 103 self._combinedGraph = None |
104 self._oneShotAdditionGraph = None | |
105 | |
1117
d5687ba23279
fix input graph web display by dirtying combinedGraph better.
drewp <drewp@bigasterisk.com>
parents:
1108
diff
changeset
|
106 def onChangeLocal(self, oneShot=False, oneShotGraph=None): |
d5687ba23279
fix input graph web display by dirtying combinedGraph better.
drewp <drewp@bigasterisk.com>
parents:
1108
diff
changeset
|
107 self._combinedGraph = None |
d5687ba23279
fix input graph web display by dirtying combinedGraph better.
drewp <drewp@bigasterisk.com>
parents:
1108
diff
changeset
|
108 self._onChange(self, oneShot=oneShot, oneShotGraph=oneShotGraph) |
d5687ba23279
fix input graph web display by dirtying combinedGraph better.
drewp <drewp@bigasterisk.com>
parents:
1108
diff
changeset
|
109 |
1080 | 110 def updateFileData(self): |
111 """ | |
112 make sure we contain the correct data from the files in inputDirs | |
113 """ | |
114 # this sample one is actually only needed for the output, but I don't | |
115 # think I want to have a separate graph for the output | |
116 # handling | |
117 log.debug("read file graphs") | |
118 for fp in FilePath("input").walk(): | |
119 if fp.isdir(): | |
120 continue | |
121 if fp.splitext()[1] != '.n3': | |
122 continue | |
123 log.debug("read %s", fp) | |
124 # todo: if this fails, leave the report in the graph | |
125 self._fileGraph.parse(fp.open(), format="n3") | |
126 self._combinedGraph = None | |
127 | |
1117
d5687ba23279
fix input graph web display by dirtying combinedGraph better.
drewp <drewp@bigasterisk.com>
parents:
1108
diff
changeset
|
128 self.onChangeLocal() |
1080 | 129 |
130 def addOneShot(self, g): | |
131 """ | |
132 add this graph to the total, call onChange, and then revert | |
133 the addition of this graph | |
134 """ | |
135 self._oneShotAdditionGraph = g | |
136 self._combinedGraph = None | |
137 try: | |
1119 | 138 self.onChangeLocal(oneShot=True, oneShotGraph=g) |
1080 | 139 finally: |
140 self._oneShotAdditionGraph = None | |
141 self._combinedGraph = None | |
142 | |
1086 | 143 def addOneShotFromString(self, body, contentType): |
144 g = parseRdf(body, contentType) | |
145 if not len(g): | |
146 log.warn("incoming oneshot graph had no statements: %r", body) | |
147 return 0 | |
148 t1 = time.time() | |
149 self.addOneShot(g) | |
150 return time.time() - t1 | |
1197
d8acab2b01f5
mqtt has two devices now. various older cleanups.
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
151 |
d8acab2b01f5
mqtt has two devices now. various older cleanups.
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
152 @STATS.combineGraph.time() |
1080 | 153 def getGraph(self): |
154 """rdflib Graph with the file+remote contents of the input graph""" | |
155 # this could be much faster with the combined readonly graph | |
156 # view from rdflib | |
157 if self._combinedGraph is None: | |
158 self._combinedGraph = Graph() | |
159 if self._fileGraph: | |
160 for s in self._fileGraph: | |
161 self._combinedGraph.add(s) | |
1108
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
162 for s in self._remoteData.graph: |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1086
diff
changeset
|
163 self._combinedGraph.add(s) |
1080 | 164 if self._oneShotAdditionGraph: |
165 for s in self._oneShotAdditionGraph: | |
166 self._combinedGraph.add(s) | |
167 | |
168 return self._combinedGraph |