# HG changeset patch # User drewp # Date 1581663606 28800 # Node ID 606bb60a5e5c884d09954c37cb0b671351304004 # Parent bb765a6bf09af2fd597d9d93b736a1ab65c7ae0d something with rx on inputgraph, i forget. also cleanup imports and logging of oneshot Ignore-this: 263923a8d12db2173017bc9dbfc638ba darcs-hash:cedac5a49c9cbd0b8c34c80efa9282853fda2cc5 diff -r bb765a6bf09a -r 606bb60a5e5c service/reasoning/inputgraph.py --- a/service/reasoning/inputgraph.py Thu Feb 13 22:58:56 2020 -0800 +++ b/service/reasoning/inputgraph.py Thu Feb 13 23:00:06 2020 -0800 @@ -1,15 +1,13 @@ -import logging, time, sys - -from rdflib import Graph, ConjunctiveGraph -from rdflib import Namespace, URIRef, Literal, RDF, RDFS -from rdflib.parser import StringInputSource +import logging, time +import weakref +from greplin import scales +from rdflib import Graph, ConjunctiveGraph +from rdflib import Namespace, URIRef, RDFS +from rdflib.parser import StringInputSource +from rx.subjects import BehaviorSubject from twisted.python.filepath import FilePath -from twisted.internet.defer import inlineCallbacks, gatherResults - -from rdflibtrig import addTrig -from graphop import graphEqual -from greplin import scales +from twisted.internet import reactor from patchablegraph.patchsource import ReconnectingPatchSource from rdfdb.rdflibpatch import patchQuads @@ -33,8 +31,12 @@ class RemoteData(object): def __init__(self, onChange): + """we won't fire onChange during init""" self.onChange = onChange self.graph = ConjunctiveGraph() + reactor.callLater(0, self._finishInit) + + def _finishInit(self): self.patchSource = ReconnectingPatchSource( URIRef('http://bang:9072/graph/home'), #URIRef('http://frontdoor:10012/graph/events'), @@ -100,11 +102,23 @@ self._remoteData = RemoteData(lambda: self.onChangeLocal()) self._combinedGraph = None self._oneShotAdditionGraph = None + self._rxValues = weakref.WeakKeyDictionary() def onChangeLocal(self, oneShot=False, oneShotGraph=None): self._combinedGraph = None self._onChange(self, oneShot=oneShot, oneShotGraph=oneShotGraph) - + for rxv, (subj, pred, default) in self._rxValues.items(): + self._rxUpdate(subj, pred, default, rxv) + + def _rxUpdate(self, subj, pred, default, rxv): + rxv.on_next(self.getGraph().value(subj, pred, default=default)) + + def rxValue(self, subj, pred, default):# -> BehaviorSubject: + value = BehaviorSubject(default) + self._rxValues[value] = (subj, pred, default) + self._rxUpdate(subj, pred, default, value) + return value + def updateFileData(self): """ make sure we contain the correct data from the files in inputDirs diff -r bb765a6bf09a -r 606bb60a5e5c service/reasoning/reasoning.py --- a/service/reasoning/reasoning.py Thu Feb 13 22:58:56 2020 -0800 +++ b/service/reasoning/reasoning.py Thu Feb 13 23:00:06 2020 -0800 @@ -22,9 +22,8 @@ from colorlog import ColoredFormatter from docopt import docopt -from rdflib import Namespace, Literal, RDF, Graph -from twisted.internet import reactor, task, defer -from twisted.internet.defer import inlineCallbacks +from rdflib import Namespace, Literal, RDF, Graph, URIRef +from twisted.internet import reactor, defer import cyclone.web, cyclone.websocket from greplin import scales @@ -49,17 +48,23 @@ scales.PmfStat('updateRules'), ) +def ntStatement(stmt): + def compact(u): + if isinstance(u, URIRef) and u.startswith(ROOM): + return 'room:' + u[len(ROOM):] + return u.n3() + return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2])) + class Reasoning(object): def __init__(self, mockOutput=False): self.prevGraph = None - self.actions = Actions(sendToLiveClients, mockOutput=mockOutput) - self.rulesN3 = "(not read yet)" self.inferred = Graph() # gets replaced in each graphChanged call self.outputGraph = PatchableGraph() # copy of inferred, for now self.inputGraph = InputGraph([], self.graphChanged) + self.actions = Actions(self.inputGraph, sendToLiveClients, mockOutput=mockOutput) self.inputGraph.updateFileData() @STATS.updateRules.time() @@ -99,7 +104,7 @@ oneShot, len(oneShotGraph) if oneShotGraph is not None else 0) if oneShotGraph: for stmt in oneShotGraph: - log.info(" OS-> %r", stmt) + log.info(" oneshot -> %s", ntStatement(stmt)) t1 = time.time() oldInferred = self.inferred try: @@ -120,7 +125,7 @@ self.inferred += oneShotGraph t3 = time.time() - self.actions.putResults(self.inputGraph.getGraph(), self.inferred) + self.actions.putResults(self.inferred) putResultsTime = time.time() - t3 finally: if oneShot: