Mercurial > code > home > repos > homeauto
annotate service/reasoning/inputgraph.py @ 755:ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
Ignore-this: 263923a8d12db2173017bc9dbfc638ba
author | drewp@bigasterisk.com |
---|---|
date | Thu, 13 Feb 2020 23:00:06 -0800 |
parents | b87b6e9cedb2 |
children | f3f667769aef |
rev | line source |
---|---|
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
1 import logging, time |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
2 import weakref |
275 | 3 |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
4 from greplin import scales |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
5 from rdflib import Graph, ConjunctiveGraph |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
6 from rdflib import Namespace, URIRef, RDFS |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
7 from rdflib.parser import StringInputSource |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
8 from rx.subjects import BehaviorSubject |
275 | 9 from twisted.python.filepath import FilePath |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
10 from twisted.internet import reactor |
275 | 11 |
571
53a2664f450a
build/import update for reasoning service
drewp@bigasterisk.com
parents:
463
diff
changeset
|
12 from patchablegraph.patchsource import ReconnectingPatchSource |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
314
diff
changeset
|
13 from rdfdb.rdflibpatch import patchQuads |
303 | 14 |
275 | 15 log = logging.getLogger('fetch') |
16 | |
17 ROOM = Namespace("http://projects.bigasterisk.com/room/") | |
18 DEV = Namespace("http://projects.bigasterisk.com/device/") | |
19 | |
20 | |
392
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
351
diff
changeset
|
21 STATS = scales.collection('/web', |
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
351
diff
changeset
|
22 scales.PmfStat('combineGraph'), |
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
351
diff
changeset
|
23 ) |
281 | 24 def parseRdf(text, contentType): |
25 g = Graph() | |
26 g.parse(StringInputSource(text), format={ | |
27 'text/n3': 'n3', | |
28 }[contentType]) | |
29 return g | |
30 | |
31 | |
303 | 32 class RemoteData(object): |
33 def __init__(self, onChange): | |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
34 """we won't fire onChange during init""" |
303 | 35 self.onChange = onChange |
36 self.graph = ConjunctiveGraph() | |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
37 reactor.callLater(0, self._finishInit) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
38 |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
39 def _finishInit(self): |
392
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
351
diff
changeset
|
40 self.patchSource = ReconnectingPatchSource( |
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
351
diff
changeset
|
41 URIRef('http://bang:9072/graph/home'), |
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
351
diff
changeset
|
42 #URIRef('http://frontdoor:10012/graph/events'), |
463
1ceb26846eca
add separate :matchPredicate support. some build and log edits.
drewp@bigasterisk.com
parents:
392
diff
changeset
|
43 self.onPatch, reconnectSecs=10, agent='reasoning') |
303 | 44 |
45 def onPatch(self, p, fullGraph): | |
46 if fullGraph: | |
47 self.graph = ConjunctiveGraph() | |
48 patchQuads(self.graph, | |
49 deleteQuads=p.delQuads, | |
50 addQuads=p.addQuads, | |
51 perfect=True) | |
52 | |
53 ignorePredicates = [ | |
54 ROOM['signalStrength'], | |
55 # perhaps anything with a number-datatype for its | |
56 # object should be filtered out, and you have to make | |
57 # an upstream quantization (e.g. 'temp high'/'temp | |
58 # low') if you want to do reasoning on the difference | |
59 URIRef("http://bigasterisk.com/map#lastSeenAgoSec"), | |
60 URIRef("http://bigasterisk.com/map#lastSeenAgo"), | |
61 ROOM['usingPower'], | |
62 ROOM['idleTimeMinutes'], | |
63 ROOM['idleTimeMs'], | |
64 ROOM['graphLoadMs'], | |
65 ROOM['localTimeToSecond'], | |
66 ROOM['history'], | |
67 ROOM['connectedAgo'], | |
68 RDFS['comment'], | |
69 ] | |
70 ignoreContexts = [ | |
71 URIRef('http://bigasterisk.com/sse_collector/'), | |
72 ] | |
73 for affected in p.addQuads + p.delQuads: | |
74 if (affected[1] not in ignorePredicates and | |
75 affected[3] not in ignoreContexts): | |
76 log.debug(" remote graph changed") | |
77 self.onChange() | |
78 break | |
79 else: | |
80 log.debug(" remote graph has no changes to trigger rules") | |
81 | |
275 | 82 class InputGraph(object): |
303 | 83 def __init__(self, inputDirs, onChange): |
275 | 84 """ |
85 this has one Graph that's made of: | |
86 - all .n3 files from inputDirs (read at startup) | |
87 - all the remote graphs, specified in the file graphs | |
88 | |
89 call updateFileData or updateRemoteData to reread those | |
90 graphs. getGraph to access the combined graph. | |
91 | |
92 onChange(self) is called if the contents of the full graph | |
93 change (in an interesting way) during updateFileData or | |
94 updateRemoteData. Interesting means statements other than the | |
95 ones with the predicates on the boring list. onChange(self, | |
96 oneShot=True) means: don't store the result of this change | |
97 anywhere; it needs to be processed only once | |
98 """ | |
99 self.inputDirs = inputDirs | |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
100 self._onChange = onChange |
275 | 101 self._fileGraph = Graph() |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
102 self._remoteData = RemoteData(lambda: self.onChangeLocal()) |
275 | 103 self._combinedGraph = None |
104 self._oneShotAdditionGraph = None | |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
105 self._rxValues = weakref.WeakKeyDictionary() |
275 | 106 |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
107 def onChangeLocal(self, oneShot=False, oneShotGraph=None): |
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
108 self._combinedGraph = None |
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
109 self._onChange(self, oneShot=oneShot, oneShotGraph=oneShotGraph) |
755
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
110 for rxv, (subj, pred, default) in self._rxValues.items(): |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
111 self._rxUpdate(subj, pred, default, rxv) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
112 |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
113 def _rxUpdate(self, subj, pred, default, rxv): |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
114 rxv.on_next(self.getGraph().value(subj, pred, default=default)) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
115 |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
116 def rxValue(self, subj, pred, default):# -> BehaviorSubject: |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
117 value = BehaviorSubject(default) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
118 self._rxValues[value] = (subj, pred, default) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
119 self._rxUpdate(subj, pred, default, value) |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
120 return value |
ffcad6bf9c57
something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot
drewp@bigasterisk.com
parents:
723
diff
changeset
|
121 |
275 | 122 def updateFileData(self): |
123 """ | |
124 make sure we contain the correct data from the files in inputDirs | |
125 """ | |
126 # this sample one is actually only needed for the output, but I don't | |
127 # think I want to have a separate graph for the output | |
128 # handling | |
129 log.debug("read file graphs") | |
130 for fp in FilePath("input").walk(): | |
131 if fp.isdir(): | |
132 continue | |
133 if fp.splitext()[1] != '.n3': | |
134 continue | |
135 log.debug("read %s", fp) | |
136 # todo: if this fails, leave the report in the graph | |
137 self._fileGraph.parse(fp.open(), format="n3") | |
138 self._combinedGraph = None | |
139 | |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
303
diff
changeset
|
140 self.onChangeLocal() |
275 | 141 |
142 def addOneShot(self, g): | |
143 """ | |
144 add this graph to the total, call onChange, and then revert | |
145 the addition of this graph | |
146 """ | |
147 self._oneShotAdditionGraph = g | |
148 self._combinedGraph = None | |
149 try: | |
314 | 150 self.onChangeLocal(oneShot=True, oneShotGraph=g) |
275 | 151 finally: |
152 self._oneShotAdditionGraph = None | |
153 self._combinedGraph = None | |
154 | |
281 | 155 def addOneShotFromString(self, body, contentType): |
156 g = parseRdf(body, contentType) | |
157 if not len(g): | |
158 log.warn("incoming oneshot graph had no statements: %r", body) | |
159 return 0 | |
160 t1 = time.time() | |
161 self.addOneShot(g) | |
162 return time.time() - t1 | |
392
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
351
diff
changeset
|
163 |
79d041273e26
mqtt has two devices now. various older cleanups.
drewp@bigasterisk.com
parents:
351
diff
changeset
|
164 @STATS.combineGraph.time() |
275 | 165 def getGraph(self): |
166 """rdflib Graph with the file+remote contents of the input graph""" | |
167 # this could be much faster with the combined readonly graph | |
168 # view from rdflib | |
169 if self._combinedGraph is None: | |
170 self._combinedGraph = Graph() | |
171 if self._fileGraph: | |
172 for s in self._fileGraph: | |
173 self._combinedGraph.add(s) | |
303 | 174 for s in self._remoteData.graph: |
175 self._combinedGraph.add(s) | |
275 | 176 if self._oneShotAdditionGraph: |
177 for s in self._oneShotAdditionGraph: | |
178 self._combinedGraph.add(s) | |
179 | |
180 return self._combinedGraph |