comparison lib/patchablegraph/patchsource.py @ 514:495f573af4f4

make patchablegraph release Ignore-this: f55c9a56b052797ff23a80630714b51a
author drewp@bigasterisk.com
date Mon, 22 Apr 2019 23:29:19 -0700
parents lib/patchsource.py@b5abd4fc65a4
children 2d13254fdd44
comparison
equal deleted inserted replaced
513:7a7002c95d09 514:495f573af4f4
1 import logging
2 import traceback
3 from rdflib import ConjunctiveGraph
4 from rdflib.parser import StringInputSource
5 from twisted.internet import reactor, defer
6
7 from patchablegraph import patchFromJson
8 from rdfdb.patch import Patch
9 from twisted_sse_demo.eventsource import EventSource
10
11 log = logging.getLogger('fetch')
12
13 class PatchSource(object):
14 """wrap EventSource so it emits Patch objects and has an explicit stop method."""
15 def __init__(self, url, agent):
16 self.url = str(url)
17
18 # add callbacks to these to learn if we failed to connect
19 # (approximately) or if the ccnnection was unexpectedly lost
20 self.connectionFailed = defer.Deferred()
21 self.connectionLost = defer.Deferred()
22
23 self._listeners = set()
24 log.info('start read from %s', url)
25 # note: fullGraphReceived isn't guaranteed- the stream could
26 # start with patches
27 self._fullGraphReceived = False
28 self._eventSource = EventSource(url.toPython().encode('utf8'),
29 userAgent=agent)
30
31 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph)
32 self._eventSource.addEventListener(b'patch', self._onPatch)
33 self._eventSource.onerror(self._onError)
34 self._eventSource.onConnectionLost = self._onDisconnect
35
36 def state(self):
37 return {
38 'url': self.url,
39 'fullGraphReceived': self._fullGraphReceived,
40 }
41
42 def addPatchListener(self, func):
43 """
44 func(patch, fullGraph=[true if the patch is the initial fullgraph])
45 """
46 self._listeners.add(func)
47
48 def stop(self):
49 log.info('stop read from %s', self.url)
50 try:
51 self._eventSource.protocol.stopProducing() # needed?
52 except AttributeError:
53 pass
54 self._eventSource = None
55
56 def _onDisconnect(self, reason):
57 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason)
58 # skip this if we're doing a stop?
59 self.connectionLost.callback(None)
60
61 def _onError(self, msg):
62 log.debug('PatchSource._onError from %s %r', self.url, msg)
63 if not self._fullGraphReceived:
64 self.connectionFailed.callback(msg)
65 else:
66 self.connectionLost.callback(msg)
67
68 def _onFullGraph(self, message):
69 try:
70 g = ConjunctiveGraph()
71 g.parse(StringInputSource(message), format='json-ld')
72 p = Patch(addGraph=g)
73 self._sendPatch(p, fullGraph=True)
74 except Exception:
75 log.error(traceback.format_exc())
76 raise
77 self._fullGraphReceived = True
78
79 def _onPatch(self, message):
80 try:
81 p = patchFromJson(message)
82 self._sendPatch(p, fullGraph=False)
83 except:
84 log.error(traceback.format_exc())
85 raise
86
87 def _sendPatch(self, p, fullGraph):
88 log.debug('PatchSource %s received patch %s (fullGraph=%s)',
89 self.url, p.shortSummary(), fullGraph)
90 for lis in self._listeners:
91 lis(p, fullGraph=fullGraph)
92
93 def __del__(self):
94 if self._eventSource:
95 raise ValueError("PatchSource wasn't stopped before del")
96
97 class ReconnectingPatchSource(object):
98 """
99 PatchSource api, but auto-reconnects internally and takes listener
100 at init time to not miss any patches. You'll get another
101 fullGraph=True patch if we have to reconnect.
102
103 todo: generate connection stmts in here
104 """
105 def __init__(self, url, listener, reconnectSecs=60, agent='unset'):
106 # type: (str, Any, Any, str)
107 self.url = url
108 self._stopped = False
109 self._listener = listener
110 self.reconnectSecs = reconnectSecs
111 self.agent = agent
112 self._reconnect()
113
114 def _reconnect(self):
115 if self._stopped:
116 return
117 self._ps = PatchSource(self.url, agent=self.agent)
118 self._ps.addPatchListener(self._onPatch)
119 self._ps.connectionFailed.addCallback(self._onConnectionFailed)
120 self._ps.connectionLost.addCallback(self._onConnectionLost)
121
122 def _onPatch(self, p, fullGraph):
123 self._listener(p, fullGraph=fullGraph)
124
125 def state(self):
126 return {
127 'reconnectedPatchSource': self._ps.state(),
128 }
129
130 def stop(self):
131 self._stopped = True
132 self._ps.stop()
133
134 def _onConnectionFailed(self, arg):
135 reactor.callLater(self.reconnectSecs, self._reconnect)
136
137 def _onConnectionLost(self, arg):
138 reactor.callLater(self.reconnectSecs, self._reconnect)
139