Mercurial > code > home > repos > homeauto
view service/reasoning/sse_collector.py @ 296:233b81cf2712
start sse_collector
Ignore-this: eba53ef3b8b7b34089e018595c41d202
author | drewp@bigasterisk.com |
---|---|
date | Fri, 19 Aug 2016 10:54:38 -0700 |
parents | |
children | 8d89da1915df |
line wrap: on
line source
""" requesting /graph/foo returns an SSE patch stream that's the result of fetching multiple other SSE patch streams. The result stream may include new statements injected by this service. Future: - filter out unneeded stmts from the sources - give a time resolution and concatenate patches faster than that res """ config = { 'streams': [ {'id': 'home', 'sources': [ #'http://bang:9059/graph/events', 'http://plus:9075/graph/events', ] }, ] } from crochet import no_setup no_setup() import sys, logging, weakref, traceback, json from twisted.internet import reactor import cyclone.web, cyclone.sse from rdflib import ConjunctiveGraph from rdflib.parser import StringInputSource from docopt import docopt from twisted_sse_demo.eventsource import EventSource sys.path.append("../../lib") from logsetup import log from patchablegraph import patchAsJson sys.path.append("/my/proj/light9") from light9.rdfdb.patch import Patch def patchFromJson(j): body = json.loads(j)['patch'] a = ConjunctiveGraph() a.parse(StringInputSource(json.dumps(body['adds'])), format='json-ld') d = ConjunctiveGraph() d.parse(StringInputSource(json.dumps(body['deletes'])), format='json-ld') return Patch(addGraph=a, delGraph=d) class PatchSource(object): """wrap EventSource so it emits Patch objects and has an explicit stop method""" def __init__(self, url): self.url = url self._listeners = set()#weakref.WeakSet() log.info('start read from %s', url) self._eventSource = EventSource(url) self._eventSource.protocol.delimiter = '\n' self._eventSource.addEventListener('fullGraph', self._onFullGraph) self._eventSource.addEventListener('patch', self._onMessage) def _onFullGraph(self, message): try: g = ConjunctiveGraph() g.parse(StringInputSource(message), format='json-ld') p = Patch(addGraph=g) self._sendPatch(p) except: traceback.print_exc() def _onMessage(self, message): try: p = patchFromJson(message) self._sendPatch(p) except: traceback.print_exc() def _sendPatch(self, p): log.info('output patch to %s listeners', p, len(self._listeners)) for lis in self._listeners: lis(p) def addPatchListener(self, func): self._listeners.add(func) def stop(self): log.info('stop read from %s', self.url) self._eventSource.protocol.stopProducing() #? self._eventSource = None def __del__(self): if self._eventSource: raise ValueError class GraphClient(object): """A listener of some EventSources that sends patches to one of our clients.""" def __init__(self, handler, patchSources): self.handler = handler for ps in patchSources: ps.addPatchListener(self.onPatch) def onPatch(self, p): self.handler.sendEvent(message=patchAsJson(p), event='patch') class GraphClients(object): """All the active EventClient objects""" def __init__(self): self.clients = {} # url: EventClient self.listeners = {} # url: [GraphClient] def addSseHandler(self, handler, streamId): log.info('addSseHandler %r %r', handler, streamId) matches = [s for s in config['streams'] if s['id'] == streamId] if len(matches) != 1: raise ValueError("%s matches for %r" % (len(matches), streamId)) ecs = [] for source in matches[0]['sources']: if source not in self.clients: self.clients[source] = PatchSource(source) ecs.append(self.clients[source]) self.listeners.setdefault(source, []).append(GraphClient(handler, ecs)) print self.__dict__ def removeSseHandler(self, handler): log.info('removeSseHandler %r', handler) for url, graphClients in self.listeners.items(): keep = [] for gc in graphClients: if gc.handler != handler: keep.append(gc) graphClients[:] = keep if not keep: self.clients[url].stop() del self.clients[url] del self.listeners[url] class SomeGraph(cyclone.sse.SSEHandler): def __init__(self, application, request): cyclone.sse.SSEHandler.__init__(self, application, request) self.id = request.uri[len('/graph/'):] self.graphClients = self.settings.graphClients def bind(self): self.graphClients.addSseHandler(self, self.id) def unbind(self): self.graphClients.removeSseHandler(self) if __name__ == '__main__': arg = docopt(""" Usage: sse_collector.py [options] -v Verbose """) if arg['-v']: import twisted.python.log twisted.python.log.startLogging(sys.stdout) log.setLevel(logging.DEBUG) graphClients = GraphClients() reactor.listenTCP( 9071, cyclone.web.Application( handlers=[ (r'/graph/(.*)', SomeGraph), ], graphClients=graphClients), interface='::') reactor.run()