Mercurial > code > home > repos > homeauto
diff service/reasoning/sse_collector.py @ 300:371af6e92b5e
local state statements and self.statements rewrite
Ignore-this: 731cac6010e37305053061f637c8729f
author | drewp@bigasterisk.com |
---|---|
date | Sat, 20 Aug 2016 23:34:04 -0700 |
parents | 5084a1f719c9 |
children | 29f593aee67b |
line wrap: on
line diff
--- a/service/reasoning/sse_collector.py Fri Aug 19 22:46:33 2016 -0700 +++ b/service/reasoning/sse_collector.py Sat Aug 20 23:34:04 2016 -0700 @@ -25,7 +25,7 @@ import sys, logging, traceback, json, collections from twisted.internet import reactor import cyclone.web, cyclone.sse -from rdflib import ConjunctiveGraph +from rdflib import ConjunctiveGraph, URIRef, Namespace from rdflib.parser import StringInputSource from docopt import docopt @@ -38,13 +38,19 @@ sys.path.append("/my/proj/light9") from light9.rdfdb.patch import Patch +ROOM = Namespace("http://projects.bigasterisk.com/room/") +COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') + +class ConnectionLost(object): + pass + class PatchSource(object): """wrap EventSource so it emits Patch objects and has an explicit stop method.""" def __init__(self, url): self.url = url self._listeners = set() log.info('start read from %s', url) - self._eventSource = EventSource(url) + self._eventSource = EventSource(url.toPython().encode('utf8')) self._eventSource.protocol.delimiter = '\n' self._eventSource.addEventListener('fullGraph', self._onFullGraph) @@ -55,7 +61,7 @@ g = ConjunctiveGraph() g.parse(StringInputSource(message), format='json-ld') p = Patch(addGraph=g) - self._sendPatch(p) + self._sendPatch(p, fullGraph=True) except: log.error(traceback.format_exc()) raise @@ -63,23 +69,26 @@ def _onMessage(self, message): try: p = patchFromJson(message) - self._sendPatch(p) + self._sendPatch(p, fullGraph=False) except: log.error(traceback.format_exc()) raise - def _sendPatch(self, p): + def _sendPatch(self, p, fullGraph): log.debug('PatchSource received patch %s', p.shortSummary()) for lis in self._listeners: - lis(p) + lis(p, fullGraph=fullGraph) def addPatchListener(self, func): + """ + func(patch or ConnectionLost, fullGraph=[true if the patch is the initial fullgraph]) + """ self._listeners.add(func) def stop(self): log.info('stop read from %s', self.url) try: - self._eventSource.protocol.stopProducing() #? + self._eventSource.protocol.stopProducing() # needed? except AttributeError: pass self._eventSource = None @@ -88,35 +97,42 @@ if self._eventSource: raise ValueError -class GraphClient(object): - """A listener of some PatchSources that emits patches to a cyclone SSEHandler.""" - - def __init__(self, handler): - self.handler = handler - - # The graph that the requester knows. - # - # Note that often, 2 requests for the same streamId would have - # the same graph contents in this attribute and ought to share - # it. But, that's a little harder to write, and if clients - # want different throttling rates or have stalled different - # amounts, their currentGraph contents might drift apart - # temporarily. - self._currentGraph = PatchableGraph() - self._currentGraph.addObserver(self._sendPatch) - - def addPatchSource(self, ps): - """Connect this object to a PatchSource whose patches should get applied to our output graph""" - # this is never getting released, so we'll keep sending until - # no one wants the source anymore. - ps.addPatchListener(self._onPatch) - - def _onPatch(self, p): - self._currentGraph.patch(p) +class LocalStatements(object): + def __init__(self, applyPatch): + self.applyPatch = applyPatch + self._sourceState = {} # source: state URIRef - def _sendPatch(self, jsonPatch): - self.handler.sendEvent(message=jsonPatch, event='patch') - + def setSourceState(self, source, state): + """ + add a patch to the COLLECTOR graph about the state of this + source. state=None to remove the source. + """ + oldState = self._sourceState.get(source, None) + if state == oldState: + return + log.info('source state %s -> %s', source, state) + if oldState is None: + self._sourceState[source] = state + self.applyPatch(COLLECTOR, Patch(addQuads=[ + (COLLECTOR, ROOM['source'], source, COLLECTOR), + (source, ROOM['state'], state, COLLECTOR), + ])) + elif state is None: + del self._sourceState[source] + self.applyPatch(COLLECTOR, Patch(delQuads=[ + (COLLECTOR, ROOM['source'], source, COLLECTOR), + (source, ROOM['state'], oldState, COLLECTOR), + ])) + else: + self._sourceState[source] = state + self.applyPatch(COLLECTOR, Patch( + addQuads=[ + (source, ROOM['state'], state, COLLECTOR), + ], + delQuads=[ + (source, ROOM['state'], oldState, COLLECTOR), + ])) + class GraphClients(object): """ All the active GraphClient objects @@ -132,24 +148,55 @@ self.clients = {} # url: PatchSource self.handlers = set() # handler self.listeners = {} # url: [handler] (handler may appear under multiple urls) + + # This table holds statements asserted by any of our sources + # plus local statements that we introduce (source is + # http://bigasterisk.com/sse_collector/). self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` - - def addSseHandler(self, handler, streamId): - log.info('addSseHandler %r %r', handler, streamId) - matches = [s for s in config['streams'] if s['id'] == streamId] - if len(matches) != 1: - raise ValueError("%s matches for %r" % (len(matches), streamId)) + + self._localStatements = LocalStatements(self._onPatch) - self.handlers.add(handler) - for source in matches[0]['sources']: - if source not in self.clients: - ps = self.clients[source] = PatchSource(source) - ps.addPatchListener(lambda p, source=source: self._onPatch(source, p)) - self.listeners.setdefault(source, []).append(handler) - self._sendUpdatePatch(handler) + def _pprintTable(self): + for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): + print "%03d. (%s, %s, %s, %s) from %s to %s" % ( + i, + stmt[0].n3(), + stmt[1].n3(), + stmt[2].n3(), + stmt[3].n3(), + ','.join(s.n3() for s in sources), + handlers) + + def _sendUpdatePatch(self, handler): + """send a patch event out this handler to bring it up to date with self.statements""" + p = self._makeSyncPatch(handler) + if not p.isNoop(): + log.debug("send patch %s to %s", p.shortSummary(), handler) + handler.sendEvent(message=jsonFromPatch(p), event='patch') - def _onPatch(self, source, p): + def _makeSyncPatch(self, handler): + # todo: this could run all handlers at once, which is how we use it anyway + adds = [] + dels = [] + statementsToClear = [] + for stmt, (sources, handlers) in self.statements.iteritems(): + relevantToHandler = handler in sum((self.listeners.get(s, []) for s in sources), []) + handlerHasIt = handler in handlers + if relevantToHandler and not handlerHasIt: + adds.append(stmt) + handlers.add(handler) + elif not relevantToHandler and handlerHasIt: + dels.append(stmt) + handlers.remove(handler) + if not handlers: + statementsToClear.append(stmt) + + for stmt in statementsToClear: + del self.statements[stmt] + return Patch(addQuads=adds, delQuads=dels) + + def _onPatch(self, source, p, fullGraph=False): for stmt in p.addQuads: sourceUrls, handlers = self.statements[stmt] if source in sourceUrls: @@ -163,28 +210,43 @@ for h in self.handlers: self._sendUpdatePatch(h) - - def _sendUpdatePatch(self, handler): - """send a patch event out this handler to bring it up to date with self.statements""" - adds = [] - dels = [] - statementsToClear = [] - for stmt, (sources, handlers) in self.statements.iteritems(): - if sources and (handler not in handlers): - adds.append(stmt) - handlers.add(handler) - if not sources and (handler in handlers): - dels.append(stmt) - handlers.remove(handler) - statementsToClear.append(stmt) - # todo: cleanup statementsToClear - p = Patch(addQuads=adds, delQuads=dels) - if not p.isNoop(): - log.debug("send patch %s to %s", p.shortSummary(), handler) - handler.sendEvent(message=jsonFromPatch(p), event='patch') + + if log.isEnabledFor(logging.DEBUG): + self._pprintTable() + + if source != COLLECTOR: + if fullGraph: + self._localStatements.setSourceState(source, ROOM['fullGraphReceived']) + else: + self._localStatements.setSourceState(source, ROOM['patchesReceived']) + + def addSseHandler(self, handler, streamId): + log.info('addSseHandler %r %r', handler, streamId) + matches = [s for s in config['streams'] if s['id'] == streamId] + if len(matches) != 1: + raise ValueError("%s matches for %r" % (len(matches), streamId)) + + self.handlers.add(handler) + for source in map(URIRef, matches[0]['sources']): + if source not in self.clients: + self._localStatements.setSourceState(source, ROOM['connect']) + ps = self.clients[source] = PatchSource(source) + ps.addPatchListener( + lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph)) + self.listeners.setdefault(source, []).append(handler) + self._sendUpdatePatch(handler) def removeSseHandler(self, handler): log.info('removeSseHandler %r', handler) + + statementsToClear = [] + for stmt, (sources, handlers) in self.statements.iteritems(): + handlers.discard(handler) + if not sources and not handlers: + statementsToClear.append(stmt) + for stmt in statementsToClear: + del self.statements[stmt] + for url, handlers in self.listeners.items(): keep = [] for h in handlers: @@ -192,11 +254,22 @@ keep.append(h) handlers[:] = keep if not keep: - self.clients[url].stop() - del self.clients[url] - del self.listeners[url] + self._stopClient(url) self.handlers.remove(handler) - + + def _stopClient(self, url): + self.clients[url].stop() + + for stmt, (sources, handlers) in self.statements.iteritems(): + sources.discard(url) + + self._localStatements.setSourceState(url, None) + del self.clients[url] + del self.listeners[url] + + + + class SomeGraph(cyclone.sse.SSEHandler): def __init__(self, application, request): cyclone.sse.SSEHandler.__init__(self, application, request)