comparison lib/patchsource.py @ 351:7716b1810d6c

reasoning & collector move into docker images Ignore-this: 67e97d307eba96791cbe77e57c57ad57
author drewp@bigasterisk.com
date Mon, 03 Sep 2018 00:45:34 -0700
parents service/reasoning/patchsource.py@170dc9b1e789
children fcd2c026f51e
comparison
equal deleted inserted replaced
350:a380561fd8a8 351:7716b1810d6c
1 import sys, logging
2 import traceback
3 from twisted.internet import reactor, defer
4 from twisted_sse_demo.eventsource import EventSource
5 from rdflib import ConjunctiveGraph
6 from rdflib.parser import StringInputSource
7
8 sys.path.append("../../lib")
9 from patchablegraph import patchFromJson
10
11 sys.path.append("/my/proj/rdfdb")
12 from rdfdb.patch import Patch
13
14 log = logging.getLogger('fetch')
15
16 class PatchSource(object):
17 """wrap EventSource so it emits Patch objects and has an explicit stop method."""
18 def __init__(self, url):
19 self.url = url
20
21 # add callbacks to these to learn if we failed to connect
22 # (approximately) or if the ccnnection was unexpectedly lost
23 self.connectionFailed = defer.Deferred()
24 self.connectionLost = defer.Deferred()
25
26 self._listeners = set()
27 log.info('start read from %s', url)
28 # note: fullGraphReceived isn't guaranteed- the stream could
29 # start with patches
30 self._fullGraphReceived = False
31 self._eventSource = EventSource(url.toPython().encode('utf8'))
32 self._eventSource.protocol.delimiter = '\n'
33
34 self._eventSource.addEventListener('fullGraph', self._onFullGraph)
35 self._eventSource.addEventListener('patch', self._onPatch)
36 self._eventSource.onerror(self._onError)
37
38 origSet = self._eventSource.protocol.setFinishedDeferred
39 def sfd(d):
40 origSet(d)
41 d.addCallback(self._onDisconnect)
42 self._eventSource.protocol.setFinishedDeferred = sfd
43
44 def stats(self):
45 return {
46 'url': self.url,
47 'fullGraphReceived': self._fullGraphReceived,
48 }
49
50 def addPatchListener(self, func):
51 """
52 func(patch, fullGraph=[true if the patch is the initial fullgraph])
53 """
54 self._listeners.add(func)
55
56 def stop(self):
57 log.info('stop read from %s', self.url)
58 try:
59 self._eventSource.protocol.stopProducing() # needed?
60 except AttributeError:
61 pass
62 self._eventSource = None
63
64 def _onDisconnect(self, a):
65 log.debug('PatchSource._onDisconnect from %s', self.url)
66 # skip this if we're doing a stop?
67 self.connectionLost.callback(None)
68
69 def _onError(self, msg):
70 log.debug('PatchSource._onError from %s %r', self.url, msg)
71 if not self._fullGraphReceived:
72 self.connectionFailed.callback(msg)
73 else:
74 self.connectionLost.callback(msg)
75
76 def _onFullGraph(self, message):
77 try:
78 g = ConjunctiveGraph()
79 g.parse(StringInputSource(message), format='json-ld')
80 p = Patch(addGraph=g)
81 self._sendPatch(p, fullGraph=True)
82 except:
83 log.error(traceback.format_exc())
84 raise
85 self._fullGraphReceived = True
86
87 def _onPatch(self, message):
88 try:
89 p = patchFromJson(message)
90 self._sendPatch(p, fullGraph=False)
91 except:
92 log.error(traceback.format_exc())
93 raise
94
95 def _sendPatch(self, p, fullGraph):
96 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph)
97 for lis in self._listeners:
98 lis(p, fullGraph=fullGraph)
99
100 def __del__(self):
101 if self._eventSource:
102 raise ValueError
103
104 class ReconnectingPatchSource(object):
105 """
106 PatchSource api, but auto-reconnects internally and takes listener
107 at init time to not miss any patches. You'll get another
108 fullGraph=True patch if we have to reconnect.
109
110 todo: generate connection stmts in here
111 """
112 def __init__(self, url, listener):
113 self.url = url
114 self._stopped = False
115 self._listener = listener
116 self._reconnect()
117
118 def _reconnect(self):
119 if self._stopped:
120 return
121 self._ps = PatchSource(self.url)
122 self._ps.addPatchListener(self._onPatch)
123 self._ps.connectionFailed.addCallback(self._onConnectionFailed)
124 self._ps.connectionLost.addCallback(self._onConnectionLost)
125
126 def _onPatch(self, p, fullGraph):
127 self._listener(p, fullGraph=fullGraph)
128
129 def stats(self):
130 return {
131 'reconnectedPatchSource': self._ps.stats(),
132 }
133
134 def stop(self):
135 self._stopped = True
136 self._ps.stop()
137
138 def _onConnectionFailed(self, arg):
139 reactor.callLater(60, self._reconnect)
140
141 def _onConnectionLost(self, arg):
142 reactor.callLater(60, self._reconnect)
143