Mercurial > code > home > repos > homeauto
changeset 302:46c5fae89823
factor out patchsource
Ignore-this: a9757cc53b914cb8be1f880a6504336f
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Aug 2016 23:43:03 -0700 |
parents | 29f593aee67b |
children | 66fe7a93753d |
files | service/reasoning/patchsource.py service/reasoning/sse_collector.py |
diffstat | 2 files changed, 137 insertions(+), 122 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/patchsource.py Sun Aug 28 23:43:03 2016 -0700 @@ -0,0 +1,130 @@ +import sys +import traceback +from twisted.internet import reactor, defer +from twisted_sse_demo.eventsource import EventSource +from rdflib import ConjunctiveGraph +from rdflib.parser import StringInputSource + +sys.path.append("../../lib") +from logsetup import log +from patchablegraph import patchFromJson + +sys.path.append("/my/proj/light9") +from light9.rdfdb.patch import Patch + + +class PatchSource(object): + """wrap EventSource so it emits Patch objects and has an explicit stop method.""" + def __init__(self, url): + self.url = url + + # add callbacks to these to learn if we failed to connect + # (approximately) or if the ccnnection was unexpectedly lost + self.connectionFailed = defer.Deferred() + self.connectionLost = defer.Deferred() + + self._listeners = set() + log.info('start read from %s', url) + self._fullGraphReceived = False + self._eventSource = EventSource(url.toPython().encode('utf8')) + self._eventSource.protocol.delimiter = '\n' + + self._eventSource.addEventListener('fullGraph', self._onFullGraph) + self._eventSource.addEventListener('patch', self._onPatch) + self._eventSource.onerror(self._onError) + + origSet = self._eventSource.protocol.setFinishedDeferred + def sfd(d): + origSet(d) + d.addCallback(self._onDisconnect) + self._eventSource.protocol.setFinishedDeferred = sfd + + def addPatchListener(self, func): + """ + func(patch, fullGraph=[true if the patch is the initial fullgraph]) + """ + self._listeners.add(func) + + def stop(self): + log.info('stop read from %s', self.url) + try: + self._eventSource.protocol.stopProducing() # needed? + except AttributeError: + pass + self._eventSource = None + + def _onDisconnect(self, a): + log.debug('PatchSource._onDisconnect from %s', self.url) + # skip this if we're doing a stop? + self.connectionLost.callback(None) + + def _onError(self, msg): + log.debug('PatchSource._onError from %s %r', self.url, msg) + if not self._fullGraphReceived: + self.connectionFailed.callback(msg) + else: + self.connectionLost.callback(msg) + + def _onFullGraph(self, message): + try: + g = ConjunctiveGraph() + g.parse(StringInputSource(message), format='json-ld') + p = Patch(addGraph=g) + self._sendPatch(p, fullGraph=True) + except: + log.error(traceback.format_exc()) + raise + self._fullGraphReceived = True + + def _onPatch(self, message): + try: + p = patchFromJson(message) + self._sendPatch(p, fullGraph=False) + except: + log.error(traceback.format_exc()) + raise + + def _sendPatch(self, p, fullGraph): + log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) + for lis in self._listeners: + lis(p, fullGraph=fullGraph) + + def __del__(self): + if self._eventSource: + raise ValueError + +class ReconnectingPatchSource(object): + """ + PatchSource api, but auto-reconnects internally and takes listener + at init time to not miss any patches. You'll get another + fullGraph=True patch if we have to reconnect. + + todo: generate connection stmts in here + """ + def __init__(self, url, listener): + self.url = url + self._stopped = False + self._listener = listener + self._reconnect() + + def _reconnect(self): + if self._stopped: + return + self._ps = PatchSource(self.url) + self._ps.addPatchListener(self._onPatch) + self._ps.connectionFailed.addCallback(self._onConnectionFailed) + self._ps.connectionLost.addCallback(self._onConnectionLost) + + def _onPatch(self, p, fullGraph): + self._listener(p, fullGraph=fullGraph) + + def stop(self): + self._stopped = True + self._ps.stop() + + def _onConnectionFailed(self, arg): + reactor.callLater(60, self._reconnect) + + def _onConnectionLost(self, arg): + reactor.callLater(60, self._reconnect) +
--- a/service/reasoning/sse_collector.py Sun Aug 28 18:11:34 2016 -0700 +++ b/service/reasoning/sse_collector.py Sun Aug 28 23:43:03 2016 -0700 @@ -22,140 +22,25 @@ from crochet import no_setup no_setup() -import sys, logging, traceback, json, collections -from twisted.internet import reactor, defer +import sys, logging, collections +from twisted.internet import reactor import cyclone.web, cyclone.sse -from rdflib import ConjunctiveGraph, URIRef, Namespace -from rdflib.parser import StringInputSource +from rdflib import URIRef, Namespace from docopt import docopt -from twisted_sse_demo.eventsource import EventSource sys.path.append("../../lib") from logsetup import log -from patchablegraph import jsonFromPatch, PatchableGraph, patchFromJson +from patchablegraph import jsonFromPatch sys.path.append("/my/proj/light9") from light9.rdfdb.patch import Patch +from patchsource import ReconnectingPatchSource + ROOM = Namespace("http://projects.bigasterisk.com/room/") COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') -class PatchSource(object): - """wrap EventSource so it emits Patch objects and has an explicit stop method.""" - def __init__(self, url): - self.url = url - - # add callbacks to these to learn if we failed to connect - # (approximately) or if the ccnnection was unexpectedly lost - self.connectionFailed = defer.Deferred() - self.connectionLost = defer.Deferred() - - self._listeners = set() - log.info('start read from %s', url) - self._fullGraphReceived = False - self._eventSource = EventSource(url.toPython().encode('utf8')) - self._eventSource.protocol.delimiter = '\n' - - self._eventSource.addEventListener('fullGraph', self._onFullGraph) - self._eventSource.addEventListener('patch', self._onPatch) - self._eventSource.onerror(self._onError) - - origSet = self._eventSource.protocol.setFinishedDeferred - def sfd(d): - origSet(d) - d.addCallback(self._onDisconnect) - self._eventSource.protocol.setFinishedDeferred = sfd - - def addPatchListener(self, func): - """ - func(patch, fullGraph=[true if the patch is the initial fullgraph]) - """ - self._listeners.add(func) - - def stop(self): - log.info('stop read from %s', self.url) - try: - self._eventSource.protocol.stopProducing() # needed? - except AttributeError: - pass - self._eventSource = None - - def _onDisconnect(self, a): - log.debug('PatchSource._onDisconnect from %s', self.url) - # skip this if we're doing a stop? - self.connectionLost.callback(None) - - def _onError(self, msg): - log.debug('PatchSource._onError from %s %r', self.url, msg) - if not self._fullGraphReceived: - self.connectionFailed.callback(msg) - else: - self.connectionLost.callback(msg) - - def _onFullGraph(self, message): - try: - g = ConjunctiveGraph() - g.parse(StringInputSource(message), format='json-ld') - p = Patch(addGraph=g) - self._sendPatch(p, fullGraph=True) - except: - log.error(traceback.format_exc()) - raise - self._fullGraphReceived = True - - def _onPatch(self, message): - try: - p = patchFromJson(message) - self._sendPatch(p, fullGraph=False) - except: - log.error(traceback.format_exc()) - raise - - def _sendPatch(self, p, fullGraph): - log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) - for lis in self._listeners: - lis(p, fullGraph=fullGraph) - - def __del__(self): - if self._eventSource: - raise ValueError - -class ReconnectingPatchSource(object): - """ - PatchSource api, but auto-reconnects internally and takes listener - at init time to not miss any patches. You'll get another - fullGraph=True patch if we have to reconnect. - - todo: generate connection stmts in here - """ - def __init__(self, url, listener): - self.url = url - self._stopped = False - self._listener = listener - self._reconnect() - - def _reconnect(self): - if self._stopped: - return - self._ps = PatchSource(self.url) - self._ps.addPatchListener(self._onPatch) - self._ps.connectionFailed.addCallback(self._onConnectionFailed) - self._ps.connectionLost.addCallback(self._onConnectionLost) - - def _onPatch(self, p, fullGraph): - self._listener(p, fullGraph=fullGraph) - - def stop(self): - self._stopped = True - self._ps.stop() - - def _onConnectionFailed(self, arg): - reactor.callLater(1, self._reconnect) - - def _onConnectionLost(self, arg): - reactor.callLater(1, self._reconnect) - class LocalStatements(object): """ functions that make statements originating from sse_collector itself @@ -367,7 +252,7 @@ for source in self._sourcesForHandler(handler): if source not in self.clients and source != COLLECTOR: self._localStatements.setSourceState(source, ROOM['connect']) - ps = self.clients[source] = ReconnectingPatchSource( + self.clients[source] = ReconnectingPatchSource( source, listener=lambda p, fullGraph, source=source: self._onPatch( source, p, fullGraph)) self._sendUpdatePatch(handler)