Mercurial > code > home > repos > homeauto
changeset 298:8d89da1915df
sse_collector now kind of gets concurrent requests right
Ignore-this: e1a104d9ae81473b86fc12fbb8ac097b
author | drewp@bigasterisk.com |
---|---|
date | Fri, 19 Aug 2016 22:37:01 -0700 |
parents | 6cde6131f2c0 |
children | 5084a1f719c9 |
files | lib/patchablegraph.py service/reasoning/sse_collector.py service/reasoning/twisted_sse_demo/eventsource.py |
diffstat | 3 files changed, 111 insertions(+), 39 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/patchablegraph.py Fri Aug 19 10:59:39 2016 -0700 +++ b/lib/patchablegraph.py Fri Aug 19 22:37:01 2016 -0700 @@ -26,7 +26,9 @@ from light9.rdfdb.grapheditapi import GraphEditApi from rdflib import ConjunctiveGraph from light9.rdfdb.rdflibpatch import patchQuads +from light9.rdfdb.patch import Patch from rdflib_jsonld.serializer import from_rdf +from rdflib.parser import StringInputSource from cycloneerr import PrettyErrorHandler log = logging.getLogger('patchablegraph') @@ -51,11 +53,21 @@ #g.store.add((s,p,o), c) # no effect on nquad output return g -def patchAsJson(p): +def jsonFromPatch(p): return json.dumps({'patch': { 'adds': from_rdf(_graphFromQuads2(p.addQuads)), 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), }}) +patchAsJson = jsonFromPatch # deprecated name + + +def patchFromJson(j): + body = json.loads(j)['patch'] + a = ConjunctiveGraph() + a.parse(StringInputSource(json.dumps(body['adds'])), format='json-ld') + d = ConjunctiveGraph() + d.parse(StringInputSource(json.dumps(body['deletes'])), format='json-ld') + return Patch(addGraph=a, delGraph=d) def graphAsJson(g): # This is not the same as g.serialize(format='json-ld')! That
--- a/service/reasoning/sse_collector.py Fri Aug 19 10:59:39 2016 -0700 +++ b/service/reasoning/sse_collector.py Fri Aug 19 22:37:01 2016 -0700 @@ -5,7 +5,7 @@ Future: - filter out unneeded stmts from the sources -- give a time resolution and concatenate patches faster than that res +- give a time resolution and concatenate any patches that come faster than that res """ config = { @@ -22,7 +22,7 @@ from crochet import no_setup no_setup() -import sys, logging, weakref, traceback, json +import sys, logging, traceback, json, collections from twisted.internet import reactor import cyclone.web, cyclone.sse from rdflib import ConjunctiveGraph @@ -33,24 +33,16 @@ sys.path.append("../../lib") from logsetup import log -from patchablegraph import patchAsJson +from patchablegraph import jsonFromPatch, PatchableGraph, patchFromJson sys.path.append("/my/proj/light9") from light9.rdfdb.patch import Patch -def patchFromJson(j): - body = json.loads(j)['patch'] - a = ConjunctiveGraph() - a.parse(StringInputSource(json.dumps(body['adds'])), format='json-ld') - d = ConjunctiveGraph() - d.parse(StringInputSource(json.dumps(body['deletes'])), format='json-ld') - return Patch(addGraph=a, delGraph=d) - class PatchSource(object): - """wrap EventSource so it emits Patch objects and has an explicit stop method""" + """wrap EventSource so it emits Patch objects and has an explicit stop method.""" def __init__(self, url): self.url = url - self._listeners = set()#weakref.WeakSet() + self._listeners = set() log.info('start read from %s', url) self._eventSource = EventSource(url) self._eventSource.protocol.delimiter = '\n' @@ -65,17 +57,19 @@ p = Patch(addGraph=g) self._sendPatch(p) except: - traceback.print_exc() + log.error(traceback.format_exc()) + raise def _onMessage(self, message): try: p = patchFromJson(message) self._sendPatch(p) except: - traceback.print_exc() + log.error(traceback.format_exc()) + raise def _sendPatch(self, p): - log.info('output patch to %s listeners', p, len(self._listeners)) + log.debug('PatchSource received patch %s', p.shortSummary()) for lis in self._listeners: lis(p) @@ -84,7 +78,10 @@ def stop(self): log.info('stop read from %s', self.url) - self._eventSource.protocol.stopProducing() #? + try: + self._eventSource.protocol.stopProducing() #? + except AttributeError: + pass self._eventSource = None def __del__(self): @@ -92,49 +89,113 @@ raise ValueError class GraphClient(object): - """A listener of some EventSources that sends patches to one of our clients.""" + """A listener of some PatchSources that emits patches to a cyclone SSEHandler.""" - def __init__(self, handler, patchSources): + def __init__(self, handler): self.handler = handler - for ps in patchSources: - ps.addPatchListener(self.onPatch) + # The graph that the requester knows. + # + # Note that often, 2 requests for the same streamId would have + # the same graph contents in this attribute and ought to share + # it. But, that's a little harder to write, and if clients + # want different throttling rates or have stalled different + # amounts, their currentGraph contents might drift apart + # temporarily. + self._currentGraph = PatchableGraph() + self._currentGraph.addObserver(self._sendPatch) - def onPatch(self, p): - self.handler.sendEvent(message=patchAsJson(p), event='patch') + def addPatchSource(self, ps): + """Connect this object to a PatchSource whose patches should get applied to our output graph""" + # this is never getting released, so we'll keep sending until + # no one wants the source anymore. + ps.addPatchListener(self._onPatch) + + def _onPatch(self, p): + self._currentGraph.patch(p) + + def _sendPatch(self, jsonPatch): + self.handler.sendEvent(message=jsonPatch, event='patch') class GraphClients(object): - """All the active EventClient objects""" + """ + All the active GraphClient objects + + To handle all the overlapping-statement cases, we store a set of + true statements along with the sources that are currently + asserting them and the requesters who currently know them. As + statements come and go, we make patches to send to requesters. + + todo: reconnect patchsources that go down and deal with their graph diffs + """ def __init__(self): - self.clients = {} # url: EventClient - self.listeners = {} # url: [GraphClient] - + self.clients = {} # url: PatchSource + self.handlers = set() # handler + self.listeners = {} # url: [handler] (handler may appear under multiple urls) + self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` + def addSseHandler(self, handler, streamId): log.info('addSseHandler %r %r', handler, streamId) matches = [s for s in config['streams'] if s['id'] == streamId] if len(matches) != 1: raise ValueError("%s matches for %r" % (len(matches), streamId)) - ecs = [] + + self.handlers.add(handler) for source in matches[0]['sources']: if source not in self.clients: - self.clients[source] = PatchSource(source) - ecs.append(self.clients[source]) + ps = self.clients[source] = PatchSource(source) + ps.addPatchListener(lambda p, source=source: self._onPatch(source, p)) + self.listeners.setdefault(source, []).append(handler) + self._sendUpdatePatch(handler) + + def _onPatch(self, source, p): + + for stmt in p.addQuads: + sourceUrls, handlers = self.statements[stmt] + if source in sourceUrls: + raise ValueError("%s added stmt that it already had: %s" % (source, stmt)) + sourceUrls.add(source) + for stmt in p.delQuads: + sourceUrls, handlers = self.statements[stmt] + if source not in sourceUrls: + raise ValueError("%s deleting stmt that it didn't have: %s" % (source, stmt)) + sourceUrls.remove(source) + + for h in self.handlers: + self._sendUpdatePatch(h) - self.listeners.setdefault(source, []).append(GraphClient(handler, ecs)) - print self.__dict__ + def _sendUpdatePatch(self, handler): + """send a patch event out this handler to bring it up to date with self.statements""" + adds = [] + dels = [] + statementsToClear = [] + for stmt, (sources, handlers) in self.statements.iteritems(): + if sources and (handler not in handlers): + adds.append(stmt) + handlers.add(handler) + if not sources and (handler in handlers): + dels.append(stmt) + handlers.remove(handler) + statementsToClear.append(stmt) + # todo: cleanup statementsToClear + p = Patch(addQuads=adds, delQuads=dels) + if not p.isNoop(): + log.debug("send patch %s to %s", p.shortSummary(), handler) + handler.sendEvent(message=jsonFromPatch(p), event='patch') def removeSseHandler(self, handler): log.info('removeSseHandler %r', handler) - for url, graphClients in self.listeners.items(): + for url, handlers in self.listeners.items(): keep = [] - for gc in graphClients: - if gc.handler != handler: - keep.append(gc) - graphClients[:] = keep + for h in handlers: + if h != handler: + keep.append(h) + handlers[:] = keep if not keep: self.clients[url].stop() del self.clients[url] del self.listeners[url] + self.handlers.remove(handler) class SomeGraph(cyclone.sse.SSEHandler): def __init__(self, application, request):
--- a/service/reasoning/twisted_sse_demo/eventsource.py Fri Aug 19 10:59:39 2016 -0700 +++ b/service/reasoning/twisted_sse_demo/eventsource.py Fri Aug 19 22:37:01 2016 -0700 @@ -39,7 +39,6 @@ d.addCallback(self.cbRequest) def cbRequest(self, response): - print 'cbRequest', response.code if response.code != 200: self.callErrorHandler("non 200 response received: %d" % response.code)