Mercurial > code > home > repos > homeauto
annotate lib/patchsource.py @ 1229:02e4b84821d5
talk to store graph, second button for holding unlocked, etc
Ignore-this: c2ae7d756e743c26e5e01d99772899bd
darcs-hash:a0750d0bbc4dc7c0f65f63f3e7342b35a175141b
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Thu, 04 Apr 2019 02:16:22 -0700 |
parents | ee168d55524a |
children | b50a13ef20ba |
rev | line source |
---|---|
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
1 import sys, logging |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
2 import traceback |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
3 from twisted.internet import reactor, defer |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
4 from twisted_sse_demo.eventsource import EventSource |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
5 from rdflib import ConjunctiveGraph |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
6 from rdflib.parser import StringInputSource |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
7 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
8 sys.path.append("../../lib") |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
9 from patchablegraph import patchFromJson |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
10 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
11 sys.path.append("/my/proj/rdfdb") |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
12 from rdfdb.patch import Patch |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
13 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
14 log = logging.getLogger('fetch') |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
15 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
16 class PatchSource(object): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
17 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
18 def __init__(self, url): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
19 self.url = url |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
20 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
21 # add callbacks to these to learn if we failed to connect |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
22 # (approximately) or if the ccnnection was unexpectedly lost |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
23 self.connectionFailed = defer.Deferred() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
24 self.connectionLost = defer.Deferred() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
25 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
26 self._listeners = set() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
27 log.info('start read from %s', url) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
28 # note: fullGraphReceived isn't guaranteed- the stream could |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
29 # start with patches |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
30 self._fullGraphReceived = False |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
31 self._eventSource = EventSource(url.toPython().encode('utf8')) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
32 self._eventSource.protocol.delimiter = '\n' |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
33 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
34 self._eventSource.addEventListener('fullGraph', self._onFullGraph) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
35 self._eventSource.addEventListener('patch', self._onPatch) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
36 self._eventSource.onerror(self._onError) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
37 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
38 origSet = self._eventSource.protocol.setFinishedDeferred |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
39 def sfd(d): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
40 origSet(d) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
41 d.addCallback(self._onDisconnect) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
42 self._eventSource.protocol.setFinishedDeferred = sfd |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
43 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
44 def stats(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
45 return { |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
46 'url': self.url, |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
47 'fullGraphReceived': self._fullGraphReceived, |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
48 } |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
49 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
50 def addPatchListener(self, func): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
51 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
52 func(patch, fullGraph=[true if the patch is the initial fullgraph]) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
53 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
54 self._listeners.add(func) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
55 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
56 def stop(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
57 log.info('stop read from %s', self.url) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
58 try: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
59 self._eventSource.protocol.stopProducing() # needed? |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
60 except AttributeError: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
61 pass |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
62 self._eventSource = None |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
63 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
64 def _onDisconnect(self, a): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
65 log.debug('PatchSource._onDisconnect from %s', self.url) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
66 # skip this if we're doing a stop? |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
67 self.connectionLost.callback(None) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
68 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
69 def _onError(self, msg): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
70 log.debug('PatchSource._onError from %s %r', self.url, msg) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
71 if not self._fullGraphReceived: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
72 self.connectionFailed.callback(msg) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
73 else: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
74 self.connectionLost.callback(msg) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
75 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
76 def _onFullGraph(self, message): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
77 try: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
78 g = ConjunctiveGraph() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
79 g.parse(StringInputSource(message), format='json-ld') |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
80 p = Patch(addGraph=g) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
81 self._sendPatch(p, fullGraph=True) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
82 except: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
83 log.error(traceback.format_exc()) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
84 raise |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
85 self._fullGraphReceived = True |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
86 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
87 def _onPatch(self, message): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
88 try: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
89 p = patchFromJson(message) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
90 self._sendPatch(p, fullGraph=False) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
91 except: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
92 log.error(traceback.format_exc()) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
93 raise |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
94 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
95 def _sendPatch(self, p, fullGraph): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
96 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
97 for lis in self._listeners: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
98 lis(p, fullGraph=fullGraph) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
99 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
100 def __del__(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
101 if self._eventSource: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
102 raise ValueError |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
103 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
104 class ReconnectingPatchSource(object): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
105 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
106 PatchSource api, but auto-reconnects internally and takes listener |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
107 at init time to not miss any patches. You'll get another |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
108 fullGraph=True patch if we have to reconnect. |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
109 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
110 todo: generate connection stmts in here |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
111 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
112 def __init__(self, url, listener): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
113 self.url = url |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
114 self._stopped = False |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
115 self._listener = listener |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
116 self._reconnect() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
117 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
118 def _reconnect(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
119 if self._stopped: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
120 return |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
121 self._ps = PatchSource(self.url) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
122 self._ps.addPatchListener(self._onPatch) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
123 self._ps.connectionFailed.addCallback(self._onConnectionFailed) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
124 self._ps.connectionLost.addCallback(self._onConnectionLost) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
125 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
126 def _onPatch(self, p, fullGraph): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
127 self._listener(p, fullGraph=fullGraph) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
128 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
129 def stats(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
130 return { |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
131 'reconnectedPatchSource': self._ps.stats(), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
132 } |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
133 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
134 def stop(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
135 self._stopped = True |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
136 self._ps.stop() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
137 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
138 def _onConnectionFailed(self, arg): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
139 reactor.callLater(60, self._reconnect) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
140 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
141 def _onConnectionLost(self, arg): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
142 reactor.callLater(60, self._reconnect) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
143 |