Mercurial > code > home > repos > homeauto
annotate lib/patchsource.py @ 1299:fa74a9c9f753
rm old golang server that made an rdf graph from inputs on rpi (goraptor & hwio)
Ignore-this: 9631ad748dccd16277c74ee602846add
darcs-hash:223083e3a057713de9856ce548eddd285c8f48fa
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sun, 21 Apr 2019 02:45:35 -0700 |
parents | 47f309d8ba94 |
children |
rev | line source |
---|---|
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
1 import sys, logging |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
2 import traceback |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
3 from twisted.internet import reactor, defer |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
4 from twisted_sse_demo.eventsource import EventSource |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
5 from rdflib import ConjunctiveGraph |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
6 from rdflib.parser import StringInputSource |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
7 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
8 sys.path.append("../../lib") |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
9 from patchablegraph import patchFromJson |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
10 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
11 sys.path.append("/my/proj/rdfdb") |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
12 from rdfdb.patch import Patch |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
13 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
14 log = logging.getLogger('fetch') |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
15 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
16 class PatchSource(object): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
17 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
18 def __init__(self, url, agent): |
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
19 self.url = str(url) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
20 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
21 # add callbacks to these to learn if we failed to connect |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
22 # (approximately) or if the ccnnection was unexpectedly lost |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
23 self.connectionFailed = defer.Deferred() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
24 self.connectionLost = defer.Deferred() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
25 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
26 self._listeners = set() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
27 log.info('start read from %s', url) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
28 # note: fullGraphReceived isn't guaranteed- the stream could |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
29 # start with patches |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
30 self._fullGraphReceived = False |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
31 self._eventSource = EventSource(url.toPython().encode('utf8'), |
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
32 userAgent=agent) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
33 |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
34 self._eventSource.addEventListener(b'fullGraph', self._onFullGraph) |
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
35 self._eventSource.addEventListener(b'patch', self._onPatch) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
36 self._eventSource.onerror(self._onError) |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
37 self._eventSource.onConnectionLost = self._onDisconnect |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
38 |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1232
diff
changeset
|
39 def state(self): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
40 return { |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
41 'url': self.url, |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
42 'fullGraphReceived': self._fullGraphReceived, |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
43 } |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
44 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
45 def addPatchListener(self, func): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
46 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
47 func(patch, fullGraph=[true if the patch is the initial fullgraph]) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
48 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
49 self._listeners.add(func) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
50 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
51 def stop(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
52 log.info('stop read from %s', self.url) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
53 try: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
54 self._eventSource.protocol.stopProducing() # needed? |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
55 except AttributeError: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
56 pass |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
57 self._eventSource = None |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
58 |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
59 def _onDisconnect(self, reason): |
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
60 log.debug('PatchSource._onDisconnect from %s (%s)', self.url, reason) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
61 # skip this if we're doing a stop? |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
62 self.connectionLost.callback(None) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
63 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
64 def _onError(self, msg): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
65 log.debug('PatchSource._onError from %s %r', self.url, msg) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
66 if not self._fullGraphReceived: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
67 self.connectionFailed.callback(msg) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
68 else: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
69 self.connectionLost.callback(msg) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
70 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
71 def _onFullGraph(self, message): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
72 try: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
73 g = ConjunctiveGraph() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
74 g.parse(StringInputSource(message), format='json-ld') |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
75 p = Patch(addGraph=g) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
76 self._sendPatch(p, fullGraph=True) |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
77 except Exception: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
78 log.error(traceback.format_exc()) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
79 raise |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
80 self._fullGraphReceived = True |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
81 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
82 def _onPatch(self, message): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
83 try: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
84 p = patchFromJson(message) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
85 self._sendPatch(p, fullGraph=False) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
86 except: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
87 log.error(traceback.format_exc()) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
88 raise |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
89 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
90 def _sendPatch(self, p, fullGraph): |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
91 log.debug('PatchSource %s received patch %s (fullGraph=%s)', |
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
92 self.url, p.shortSummary(), fullGraph) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
93 for lis in self._listeners: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
94 lis(p, fullGraph=fullGraph) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
95 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
96 def __del__(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
97 if self._eventSource: |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
98 raise ValueError("PatchSource wasn't stopped before del") |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
99 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
100 class ReconnectingPatchSource(object): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
101 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
102 PatchSource api, but auto-reconnects internally and takes listener |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
103 at init time to not miss any patches. You'll get another |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
104 fullGraph=True patch if we have to reconnect. |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
105 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
106 todo: generate connection stmts in here |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
107 """ |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
108 def __init__(self, url, listener, reconnectSecs=60, agent='unset'): |
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
109 # type: (str, Any, Any, str) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
110 self.url = url |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
111 self._stopped = False |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
112 self._listener = listener |
1232
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
113 self.reconnectSecs = reconnectSecs |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
114 self.agent = agent |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
115 self._reconnect() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
116 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
117 def _reconnect(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
118 if self._stopped: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
119 return |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
120 self._ps = PatchSource(self.url, agent=self.agent) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
121 self._ps.addPatchListener(self._onPatch) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
122 self._ps.connectionFailed.addCallback(self._onConnectionFailed) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
123 self._ps.connectionLost.addCallback(self._onConnectionLost) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
124 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
125 def _onPatch(self, p, fullGraph): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
126 self._listener(p, fullGraph=fullGraph) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
127 |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1232
diff
changeset
|
128 def state(self): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
129 return { |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1232
diff
changeset
|
130 'reconnectedPatchSource': self._ps.state(), |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
131 } |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
132 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
133 def stop(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
134 self._stopped = True |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
135 self._ps.stop() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
136 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
137 def _onConnectionFailed(self, arg): |
1232
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
138 reactor.callLater(self.reconnectSecs, self._reconnect) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
139 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
140 def _onConnectionLost(self, arg): |
1232
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1156
diff
changeset
|
141 reactor.callLater(self.reconnectSecs, self._reconnect) |
1285
47f309d8ba94
UA support, some rewrites from twisted_sse_demo work
drewp <drewp@bigasterisk.com>
parents:
1250
diff
changeset
|
142 |