Mercurial > code > home > repos > homeauto
annotate lib/patchsource.py @ 483:a8823c7aab58
door lock rules
Ignore-this: 182c5bf72e51479c94f03e4bb8deeb2a
author | drewp@bigasterisk.com |
---|---|
date | Sun, 21 Apr 2019 00:04:32 -0700 |
parents | b5abd4fc65a4 |
children |
rev | line source |
---|---|
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
306
diff
changeset
|
1 import sys, logging |
302 | 2 import traceback |
3 from twisted.internet import reactor, defer | |
4 from twisted_sse_demo.eventsource import EventSource | |
5 from rdflib import ConjunctiveGraph | |
6 from rdflib.parser import StringInputSource | |
7 | |
8 sys.path.append("../../lib") | |
9 from patchablegraph import patchFromJson | |
10 | |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
312
diff
changeset
|
11 sys.path.append("/my/proj/rdfdb") |
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
312
diff
changeset
|
12 from rdfdb.patch import Patch |
302 | 13 |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
306
diff
changeset
|
14 log = logging.getLogger('fetch') |
302 | 15 |
16 class PatchSource(object): | |
17 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" | |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
18 def __init__(self, url, agent): |
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
19 self.url = str(url) |
302 | 20 |
21 # add callbacks to these to learn if we failed to connect | |
22 # (approximately) or if the ccnnection was unexpectedly lost | |
23 self.connectionFailed = defer.Deferred() | |
24 self.connectionLost = defer.Deferred() | |
25 | |
26 self._listeners = set() | |
27 log.info('start read from %s', url) | |
312
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
306
diff
changeset
|
28 # note: fullGraphReceived isn't guaranteed- the stream could |
170dc9b1e789
fix input graph web display by dirtying combinedGraph better.
drewp@bigasterisk.com
parents:
306
diff
changeset
|
29 # start with patches |
302 | 30 self._fullGraphReceived = False |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
31 self._eventSource = EventSource(url.toPython().encode('utf8'), |
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
32 userAgent=agent) |
302 | 33 |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
34 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) |
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
35 self._eventSource.addEventListener(b'patch', self._onPatch) |
302 | 36 self._eventSource.onerror(self._onError) |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
37 self._eventSource.onConnectionLost = self._onDisconnect |
306 | 38 |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
429
diff
changeset
|
39 def state(self): |
306 | 40 return { |
41 'url': self.url, | |
42 'fullGraphReceived': self._fullGraphReceived, | |
43 } | |
302 | 44 |
45 def addPatchListener(self, func): | |
46 """ | |
47 func(patch, fullGraph=[true if the patch is the initial fullgraph]) | |
48 """ | |
49 self._listeners.add(func) | |
50 | |
51 def stop(self): | |
52 log.info('stop read from %s', self.url) | |
53 try: | |
54 self._eventSource.protocol.stopProducing() # needed? | |
55 except AttributeError: | |
56 pass | |
57 self._eventSource = None | |
58 | |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
59 def _onDisconnect(self, reason): |
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
60 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) |
302 | 61 # skip this if we're doing a stop? |
62 self.connectionLost.callback(None) | |
63 | |
64 def _onError(self, msg): | |
65 log.debug('PatchSource._onError from %s %r', self.url, msg) | |
66 if not self._fullGraphReceived: | |
67 self.connectionFailed.callback(msg) | |
68 else: | |
69 self.connectionLost.callback(msg) | |
70 | |
71 def _onFullGraph(self, message): | |
72 try: | |
73 g = ConjunctiveGraph() | |
74 g.parse(StringInputSource(message), format='json-ld') | |
75 p = Patch(addGraph=g) | |
76 self._sendPatch(p, fullGraph=True) | |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
77 except Exception: |
302 | 78 log.error(traceback.format_exc()) |
79 raise | |
80 self._fullGraphReceived = True | |
81 | |
82 def _onPatch(self, message): | |
83 try: | |
84 p = patchFromJson(message) | |
85 self._sendPatch(p, fullGraph=False) | |
86 except: | |
87 log.error(traceback.format_exc()) | |
88 raise | |
89 | |
90 def _sendPatch(self, p, fullGraph): | |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
91 log.debug('PatchSource %s received patch %s (fullGraph=%s)', |
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
92 self.url, p.shortSummary(), fullGraph) |
302 | 93 for lis in self._listeners: |
94 lis(p, fullGraph=fullGraph) | |
95 | |
96 def __del__(self): | |
97 if self._eventSource: | |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
98 raise ValueError("PatchSource wasn't stopped before del") |
302 | 99 |
100 class ReconnectingPatchSource(object): | |
101 """ | |
102 PatchSource api, but auto-reconnects internally and takes listener | |
103 at init time to not miss any patches. You'll get another | |
104 fullGraph=True patch if we have to reconnect. | |
105 | |
106 todo: generate connection stmts in here | |
107 """ | |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
108 def __init__(self, url, listener, reconnectSecs=60, agent='unset'): |
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
109 # type: (str, Any, Any, str) |
302 | 110 self.url = url |
111 self._stopped = False | |
112 self._listener = listener | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
351
diff
changeset
|
113 self.reconnectSecs = reconnectSecs |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
114 self.agent = agent |
302 | 115 self._reconnect() |
116 | |
117 def _reconnect(self): | |
118 if self._stopped: | |
119 return | |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
120 self._ps = PatchSource(self.url, agent=self.agent) |
302 | 121 self._ps.addPatchListener(self._onPatch) |
122 self._ps.connectionFailed.addCallback(self._onConnectionFailed) | |
123 self._ps.connectionLost.addCallback(self._onConnectionLost) | |
124 | |
125 def _onPatch(self, p, fullGraph): | |
126 self._listener(p, fullGraph=fullGraph) | |
306 | 127 |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
429
diff
changeset
|
128 def state(self): |
306 | 129 return { |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
429
diff
changeset
|
130 'reconnectedPatchSource': self._ps.state(), |
306 | 131 } |
302 | 132 |
133 def stop(self): | |
134 self._stopped = True | |
135 self._ps.stop() | |
136 | |
137 def _onConnectionFailed(self, arg): | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
351
diff
changeset
|
138 reactor.callLater(self.reconnectSecs, self._reconnect) |
302 | 139 |
140 def _onConnectionLost(self, arg): | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
351
diff
changeset
|
141 reactor.callLater(self.reconnectSecs, self._reconnect) |
482
b5abd4fc65a4
UA support, some rewrites from twisted_sse_demo work
drewp@bigasterisk.com
parents:
447
diff
changeset
|
142 |