Mercurial > code > home > repos > homeauto
comparison service/reasoning/patchsource.py @ 1107:e68f6e5712c6
factor out patchsource
Ignore-this: a9757cc53b914cb8be1f880a6504336f
darcs-hash:2c139d1f9deb03463a86b4f18e57bacf38ae3392
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sun, 28 Aug 2016 23:43:03 -0700 |
parents | |
children | 6aad04b34231 |
comparison
equal
deleted
inserted
replaced
1106:fe53ca09febc | 1107:e68f6e5712c6 |
---|---|
1 import sys | |
2 import traceback | |
3 from twisted.internet import reactor, defer | |
4 from twisted_sse_demo.eventsource import EventSource | |
5 from rdflib import ConjunctiveGraph | |
6 from rdflib.parser import StringInputSource | |
7 | |
8 sys.path.append("../../lib") | |
9 from logsetup import log | |
10 from patchablegraph import patchFromJson | |
11 | |
12 sys.path.append("/my/proj/light9") | |
13 from light9.rdfdb.patch import Patch | |
14 | |
15 | |
16 class PatchSource(object): | |
17 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" | |
18 def __init__(self, url): | |
19 self.url = url | |
20 | |
21 # add callbacks to these to learn if we failed to connect | |
22 # (approximately) or if the ccnnection was unexpectedly lost | |
23 self.connectionFailed = defer.Deferred() | |
24 self.connectionLost = defer.Deferred() | |
25 | |
26 self._listeners = set() | |
27 log.info('start read from %s', url) | |
28 self._fullGraphReceived = False | |
29 self._eventSource = EventSource(url.toPython().encode('utf8')) | |
30 self._eventSource.protocol.delimiter = '\n' | |
31 | |
32 self._eventSource.addEventListener('fullGraph', self._onFullGraph) | |
33 self._eventSource.addEventListener('patch', self._onPatch) | |
34 self._eventSource.onerror(self._onError) | |
35 | |
36 origSet = self._eventSource.protocol.setFinishedDeferred | |
37 def sfd(d): | |
38 origSet(d) | |
39 d.addCallback(self._onDisconnect) | |
40 self._eventSource.protocol.setFinishedDeferred = sfd | |
41 | |
42 def addPatchListener(self, func): | |
43 """ | |
44 func(patch, fullGraph=[true if the patch is the initial fullgraph]) | |
45 """ | |
46 self._listeners.add(func) | |
47 | |
48 def stop(self): | |
49 log.info('stop read from %s', self.url) | |
50 try: | |
51 self._eventSource.protocol.stopProducing() # needed? | |
52 except AttributeError: | |
53 pass | |
54 self._eventSource = None | |
55 | |
56 def _onDisconnect(self, a): | |
57 log.debug('PatchSource._onDisconnect from %s', self.url) | |
58 # skip this if we're doing a stop? | |
59 self.connectionLost.callback(None) | |
60 | |
61 def _onError(self, msg): | |
62 log.debug('PatchSource._onError from %s %r', self.url, msg) | |
63 if not self._fullGraphReceived: | |
64 self.connectionFailed.callback(msg) | |
65 else: | |
66 self.connectionLost.callback(msg) | |
67 | |
68 def _onFullGraph(self, message): | |
69 try: | |
70 g = ConjunctiveGraph() | |
71 g.parse(StringInputSource(message), format='json-ld') | |
72 p = Patch(addGraph=g) | |
73 self._sendPatch(p, fullGraph=True) | |
74 except: | |
75 log.error(traceback.format_exc()) | |
76 raise | |
77 self._fullGraphReceived = True | |
78 | |
79 def _onPatch(self, message): | |
80 try: | |
81 p = patchFromJson(message) | |
82 self._sendPatch(p, fullGraph=False) | |
83 except: | |
84 log.error(traceback.format_exc()) | |
85 raise | |
86 | |
87 def _sendPatch(self, p, fullGraph): | |
88 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) | |
89 for lis in self._listeners: | |
90 lis(p, fullGraph=fullGraph) | |
91 | |
92 def __del__(self): | |
93 if self._eventSource: | |
94 raise ValueError | |
95 | |
96 class ReconnectingPatchSource(object): | |
97 """ | |
98 PatchSource api, but auto-reconnects internally and takes listener | |
99 at init time to not miss any patches. You'll get another | |
100 fullGraph=True patch if we have to reconnect. | |
101 | |
102 todo: generate connection stmts in here | |
103 """ | |
104 def __init__(self, url, listener): | |
105 self.url = url | |
106 self._stopped = False | |
107 self._listener = listener | |
108 self._reconnect() | |
109 | |
110 def _reconnect(self): | |
111 if self._stopped: | |
112 return | |
113 self._ps = PatchSource(self.url) | |
114 self._ps.addPatchListener(self._onPatch) | |
115 self._ps.connectionFailed.addCallback(self._onConnectionFailed) | |
116 self._ps.connectionLost.addCallback(self._onConnectionLost) | |
117 | |
118 def _onPatch(self, p, fullGraph): | |
119 self._listener(p, fullGraph=fullGraph) | |
120 | |
121 def stop(self): | |
122 self._stopped = True | |
123 self._ps.stop() | |
124 | |
125 def _onConnectionFailed(self, arg): | |
126 reactor.callLater(60, self._reconnect) | |
127 | |
128 def _onConnectionLost(self, arg): | |
129 reactor.callLater(60, self._reconnect) | |
130 |