Mercurial > code > home > repos > homeauto
annotate lib/patchsource.py @ 449:ef7eba0551f2
collector partial py3+types update. WIP
Ignore-this: 3fe8cc7b09bbfc8bec7f5d6a5e1630b
author | drewp@bigasterisk.com |
---|---|
date | Thu, 18 Apr 2019 22:00:06 -0700 |
parents | 3d51d4b63497 |
children | b5abd4fc65a4 |
rev | line source |
---|---|
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
306
diff
changeset
|
1 import sys, logging |
302 | 2 import traceback |
3 from twisted.internet import reactor, defer | |
4 from twisted_sse_demo.eventsource import EventSource | |
5 from rdflib import ConjunctiveGraph | |
6 from rdflib.parser import StringInputSource | |
7 | |
8 sys.path.append("../../lib") | |
9 from patchablegraph import patchFromJson | |
10 | |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
312
diff
changeset
|
11 sys.path.append("/my/proj/rdfdb") |
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
312
diff
changeset
|
12 from rdfdb.patch import Patch |
302 | 13 |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
306
diff
changeset
|
14 log = logging.getLogger('fetch') |
302 | 15 |
16 class PatchSource(object): | |
17 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" | |
18 def __init__(self, url): | |
19 self.url = url | |
20 | |
21 # add callbacks to these to learn if we failed to connect | |
22 # (approximately) or if the ccnnection was unexpectedly lost | |
23 self.connectionFailed = defer.Deferred() | |
24 self.connectionLost = defer.Deferred() | |
25 | |
26 self._listeners = set() | |
27 log.info('start read from %s', url) | |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
306
diff
changeset
|
28 # note: fullGraphReceived isn't guaranteed- the stream could |
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
306
diff
changeset
|
29 # start with patches |
302 | 30 self._fullGraphReceived = False |
31 self._eventSource = EventSource(url.toPython().encode('utf8')) | |
447
3d51d4b63497
patchsource py3 support. might break py2
drewp@bigasterisk.com
parents:
439
diff
changeset
|
32 self._eventSource.protocol.delimiter = b'\n' |
302 | 33 |
34 self._eventSource.addEventListener('fullGraph', self._onFullGraph) | |
35 self._eventSource.addEventListener('patch', self._onPatch) | |
36 self._eventSource.onerror(self._onError) | |
37 | |
38 origSet = self._eventSource.protocol.setFinishedDeferred | |
39 def sfd(d): | |
40 origSet(d) | |
41 d.addCallback(self._onDisconnect) | |
42 self._eventSource.protocol.setFinishedDeferred = sfd | |
306 | 43 |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
429
diff
changeset
|
44 def state(self): |
306 | 45 return { |
46 'url': self.url, | |
47 'fullGraphReceived': self._fullGraphReceived, | |
48 } | |
302 | 49 |
50 def addPatchListener(self, func): | |
51 """ | |
52 func(patch, fullGraph=[true if the patch is the initial fullgraph]) | |
53 """ | |
54 self._listeners.add(func) | |
55 | |
56 def stop(self): | |
57 log.info('stop read from %s', self.url) | |
58 try: | |
59 self._eventSource.protocol.stopProducing() # needed? | |
60 except AttributeError: | |
61 pass | |
62 self._eventSource = None | |
63 | |
64 def _onDisconnect(self, a): | |
65 log.debug('PatchSource._onDisconnect from %s', self.url) | |
66 # skip this if we're doing a stop? | |
67 self.connectionLost.callback(None) | |
68 | |
69 def _onError(self, msg): | |
70 log.debug('PatchSource._onError from %s %r', self.url, msg) | |
71 if not self._fullGraphReceived: | |
72 self.connectionFailed.callback(msg) | |
73 else: | |
74 self.connectionLost.callback(msg) | |
75 | |
76 def _onFullGraph(self, message): | |
77 try: | |
78 g = ConjunctiveGraph() | |
79 g.parse(StringInputSource(message), format='json-ld') | |
80 p = Patch(addGraph=g) | |
81 self._sendPatch(p, fullGraph=True) | |
82 except: | |
83 log.error(traceback.format_exc()) | |
84 raise | |
85 self._fullGraphReceived = True | |
86 | |
87 def _onPatch(self, message): | |
88 try: | |
89 p = patchFromJson(message) | |
90 self._sendPatch(p, fullGraph=False) | |
91 except: | |
92 log.error(traceback.format_exc()) | |
93 raise | |
94 | |
95 def _sendPatch(self, p, fullGraph): | |
96 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) | |
97 for lis in self._listeners: | |
98 lis(p, fullGraph=fullGraph) | |
99 | |
100 def __del__(self): | |
101 if self._eventSource: | |
102 raise ValueError | |
103 | |
104 class ReconnectingPatchSource(object): | |
105 """ | |
106 PatchSource api, but auto-reconnects internally and takes listener | |
107 at init time to not miss any patches. You'll get another | |
108 fullGraph=True patch if we have to reconnect. | |
109 | |
110 todo: generate connection stmts in here | |
111 """ | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
351
diff
changeset
|
112 def __init__(self, url, listener, reconnectSecs=60): |
302 | 113 self.url = url |
114 self._stopped = False | |
115 self._listener = listener | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
351
diff
changeset
|
116 self.reconnectSecs = reconnectSecs |
302 | 117 self._reconnect() |
118 | |
119 def _reconnect(self): | |
120 if self._stopped: | |
121 return | |
122 self._ps = PatchSource(self.url) | |
123 self._ps.addPatchListener(self._onPatch) | |
124 self._ps.connectionFailed.addCallback(self._onConnectionFailed) | |
125 self._ps.connectionLost.addCallback(self._onConnectionLost) | |
126 | |
127 def _onPatch(self, p, fullGraph): | |
128 self._listener(p, fullGraph=fullGraph) | |
306 | 129 |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
429
diff
changeset
|
130 def state(self): |
306 | 131 return { |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
429
diff
changeset
|
132 'reconnectedPatchSource': self._ps.state(), |
306 | 133 } |
302 | 134 |
135 def stop(self): | |
136 self._stopped = True | |
137 self._ps.stop() | |
138 | |
139 def _onConnectionFailed(self, arg): | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
351
diff
changeset
|
140 reactor.callLater(self.reconnectSecs, self._reconnect) |
302 | 141 |
142 def _onConnectionLost(self, arg): | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
351
diff
changeset
|
143 reactor.callLater(self.reconnectSecs, self._reconnect) |
302 | 144 |