comparison service/reasoning/inputgraph.py @ 1555:606bb60a5e5c

something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot Ignore-this: 263923a8d12db2173017bc9dbfc638ba darcs-hash:cedac5a49c9cbd0b8c34c80efa9282853fda2cc5
author drewp <drewp@bigasterisk.com>
date Thu, 13 Feb 2020 23:00:06 -0800
parents 13b7e4de3824
children f3f667769aef
comparison
equal deleted inserted replaced
1554:bb765a6bf09a 1555:606bb60a5e5c
1 import logging, time, sys 1 import logging, time
2 import weakref
2 3
4 from greplin import scales
3 from rdflib import Graph, ConjunctiveGraph 5 from rdflib import Graph, ConjunctiveGraph
4 from rdflib import Namespace, URIRef, Literal, RDF, RDFS 6 from rdflib import Namespace, URIRef, RDFS
5 from rdflib.parser import StringInputSource 7 from rdflib.parser import StringInputSource
6 8 from rx.subjects import BehaviorSubject
7 from twisted.python.filepath import FilePath 9 from twisted.python.filepath import FilePath
8 from twisted.internet.defer import inlineCallbacks, gatherResults 10 from twisted.internet import reactor
9
10 from rdflibtrig import addTrig
11 from graphop import graphEqual
12 from greplin import scales
13 11
14 from patchablegraph.patchsource import ReconnectingPatchSource 12 from patchablegraph.patchsource import ReconnectingPatchSource
15 from rdfdb.rdflibpatch import patchQuads 13 from rdfdb.rdflibpatch import patchQuads
16 14
17 log = logging.getLogger('fetch') 15 log = logging.getLogger('fetch')
31 return g 29 return g
32 30
33 31
34 class RemoteData(object): 32 class RemoteData(object):
35 def __init__(self, onChange): 33 def __init__(self, onChange):
34 """we won't fire onChange during init"""
36 self.onChange = onChange 35 self.onChange = onChange
37 self.graph = ConjunctiveGraph() 36 self.graph = ConjunctiveGraph()
37 reactor.callLater(0, self._finishInit)
38
39 def _finishInit(self):
38 self.patchSource = ReconnectingPatchSource( 40 self.patchSource = ReconnectingPatchSource(
39 URIRef('http://bang:9072/graph/home'), 41 URIRef('http://bang:9072/graph/home'),
40 #URIRef('http://frontdoor:10012/graph/events'), 42 #URIRef('http://frontdoor:10012/graph/events'),
41 self.onPatch, reconnectSecs=10, agent='reasoning') 43 self.onPatch, reconnectSecs=10, agent='reasoning')
42 44
98 self._onChange = onChange 100 self._onChange = onChange
99 self._fileGraph = Graph() 101 self._fileGraph = Graph()
100 self._remoteData = RemoteData(lambda: self.onChangeLocal()) 102 self._remoteData = RemoteData(lambda: self.onChangeLocal())
101 self._combinedGraph = None 103 self._combinedGraph = None
102 self._oneShotAdditionGraph = None 104 self._oneShotAdditionGraph = None
105 self._rxValues = weakref.WeakKeyDictionary()
103 106
104 def onChangeLocal(self, oneShot=False, oneShotGraph=None): 107 def onChangeLocal(self, oneShot=False, oneShotGraph=None):
105 self._combinedGraph = None 108 self._combinedGraph = None
106 self._onChange(self, oneShot=oneShot, oneShotGraph=oneShotGraph) 109 self._onChange(self, oneShot=oneShot, oneShotGraph=oneShotGraph)
107 110 for rxv, (subj, pred, default) in self._rxValues.items():
111 self._rxUpdate(subj, pred, default, rxv)
112
113 def _rxUpdate(self, subj, pred, default, rxv):
114 rxv.on_next(self.getGraph().value(subj, pred, default=default))
115
116 def rxValue(self, subj, pred, default):# -> BehaviorSubject:
117 value = BehaviorSubject(default)
118 self._rxValues[value] = (subj, pred, default)
119 self._rxUpdate(subj, pred, default, value)
120 return value
121
108 def updateFileData(self): 122 def updateFileData(self):
109 """ 123 """
110 make sure we contain the correct data from the files in inputDirs 124 make sure we contain the correct data from the files in inputDirs
111 """ 125 """
112 # this sample one is actually only needed for the output, but I don't 126 # this sample one is actually only needed for the output, but I don't