Mercurial > code > home > repos > collector
annotate collector.py @ 12:032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
author | drewp@bigasterisk.com |
---|---|
date | Fri, 25 Nov 2022 20:58:08 -0800 |
parents | 36471461685f |
children | bfd95926be6e |
rev | line source |
---|---|
0 | 1 """ |
2 requesting /graph/foo returns an SSE patch stream that's the | |
3 result of fetching multiple other SSE patch streams. The result stream | |
4 may include new statements injected by this service. | |
5 | |
6 Future: | |
7 - filter out unneeded stmts from the sources | |
8 - give a time resolution and concatenate any patches that come faster than that res | |
9 """ | |
10 import json | |
11 import logging | |
12 import time | |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
9
diff
changeset
|
13 from typing import Dict, List, Optional, Set, Union |
0 | 14 |
15 import cyclone.sse | |
16 import cyclone.web | |
17 from docopt import docopt | |
9 | 18 from patchablegraph.patchablegraph import jsonFromPatch |
0 | 19 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource |
6 | 20 from prometheus_client import Summary |
0 | 21 from prometheus_client.exposition import generate_latest |
22 from prometheus_client.registry import REGISTRY | |
23 from rdfdb.patch import Patch | |
6 | 24 from rdflib import Namespace, URIRef |
0 | 25 from standardservice.logsetup import enableTwistedLog, log |
26 from twisted.internet import defer, reactor | |
27 | |
28 from collector_config import config | |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
9
diff
changeset
|
29 from merge import SourceUri, ActiveStatements, LocalStatements |
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
9
diff
changeset
|
30 from patchsink import PatchSink |
0 | 31 |
9 | 32 import cyclone.sse |
33 def py3_sendEvent(self, message, event=None, eid=None, retry=None): | |
34 | |
35 if isinstance(message, dict): | |
36 message = cyclone.sse.escape.json_encode(message) | |
37 if isinstance(message, str): | |
38 message = message.encode("utf-8") | |
39 assert isinstance(message, bytes) | |
40 if eid: | |
41 self.transport.write(b"id: %s\n" % eid) | |
42 if event: | |
43 self.transport.write(b"event: %s\n" % event) | |
44 if retry: | |
45 self.transport.write(b"retry: %s\n" % retry) | |
46 self.transport.write(b"data: %s\n\n" % message) | |
47 | |
48 | |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
9
diff
changeset
|
49 cyclone.sse.SSEHandler.sendEvent = py3_sendEvent |
0 | 50 |
51 | |
52 ROOM = Namespace("http://projects.bigasterisk.com/room/") | |
53 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) | |
54 | |
55 GET_STATE_CALLS = Summary("get_state_calls", 'calls') | |
56 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') | |
57 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') | |
58 | |
59 | |
60 class Metrics(cyclone.web.RequestHandler): | |
61 | |
62 def get(self): | |
63 self.add_header('content-type', 'text/plain') | |
64 self.write(generate_latest(REGISTRY)) | |
65 | |
66 | |
67 | |
68 class GraphClients(object): | |
69 """ | |
70 All the active PatchSources and SSEHandlers | |
71 | |
72 To handle all the overlapping-statement cases, we store a set of | |
73 true statements along with the sources that are currently | |
74 asserting them and the requesters who currently know them. As | |
75 statements come and go, we make patches to send to requesters. | |
76 """ | |
77 | |
78 def __init__(self): | |
6 | 79 self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {} # (COLLECTOR is not listed) |
0 | 80 self.handlers: Set[PatchSink] = set() |
81 self.statements: ActiveStatements = ActiveStatements() | |
82 | |
83 self._localStatements = LocalStatements(self._onPatch) | |
84 | |
85 def state(self) -> Dict: | |
86 return { | |
87 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']), | |
88 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), | |
89 'statements': self.statements.state(), | |
90 } | |
91 | |
92 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: | |
93 streamId = handler.streamId | |
94 matches = [s for s in config['streams'] if s['id'] == streamId] | |
95 if len(matches) != 1: | |
96 raise ValueError("%s matches for %r" % (len(matches), streamId)) | |
97 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] | |
98 | |
99 @ON_PATCH_CALLS.time() | |
100 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False): | |
101 if fullGraph: | |
102 # a reconnect may need to resend the full graph even | |
103 # though we've already sent some statements | |
104 self.statements.replaceSourceStatements(source, p.addQuads) | |
105 else: | |
106 self.statements.applySourcePatch(source, p) | |
107 | |
108 self._sendUpdatePatch() | |
109 | |
110 if log.isEnabledFor(logging.DEBUG): | |
111 self.statements.pprintTable() | |
112 | |
113 if source != COLLECTOR: | |
114 self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) | |
115 | |
116 @SEND_UPDATE_PATCH_CALLS.time() | |
117 def _sendUpdatePatch(self, handler: Optional[PatchSink] = None): | |
118 """ | |
119 send a patch event out this handler to bring it up to date with | |
120 self.statements | |
121 """ | |
122 now = time.time() | |
123 selected = self.handlers | |
124 if handler is not None: | |
125 if handler not in self.handlers: | |
126 log.error("called _sendUpdatePatch on a handler that's gone") | |
127 return | |
128 selected = {handler} | |
129 # reduce loops here- prepare all patches at once | |
130 for h in selected: | |
131 period = .9 | |
132 if 'Raspbian' in h.request.headers.get('user-agent', ''): | |
133 period = 5 | |
134 if h.lastPatchSentTime > now - period: | |
135 continue | |
136 p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) | |
137 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) | |
138 if not p.isNoop(): | |
139 log.debug("send patch %s to %s", p.shortSummary(), h) | |
140 # This can be a giant line, which was a problem | |
141 # once. Might be nice for this service to try to break | |
142 # it up into multiple sends, although there's no | |
143 # guarantee at all since any single stmt could be any | |
144 # length. | |
9 | 145 h.sendEvent(message=jsonFromPatch(p), event=b'patch') |
0 | 146 h.lastPatchSentTime = now |
147 else: | |
148 log.debug('nothing to send to %s', h) | |
149 | |
150 def addSseHandler(self, handler: PatchSink): | |
151 log.info('addSseHandler %r %r', handler, handler.streamId) | |
152 | |
153 # fail early if id doesn't match | |
154 sources = self._sourcesForHandler(handler) | |
155 | |
156 self.handlers.add(handler) | |
157 | |
158 for source in sources: | |
159 if source not in self.clients and source != COLLECTOR: | |
160 log.debug('connect to patch source %s', source) | |
161 self._localStatements.setSourceState(source, ROOM['connect']) | |
162 self.clients[source] = ReconnectingPatchSource(source, | |
163 listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), | |
164 reconnectSecs=10) | |
165 log.debug('bring new client up to date') | |
166 | |
167 self._sendUpdatePatch(handler) | |
168 | |
169 def removeSseHandler(self, handler: PatchSink): | |
170 log.info('removeSseHandler %r', handler) | |
171 self.statements.discardHandler(handler) | |
172 for source in self._sourcesForHandler(handler): | |
173 for otherHandler in self.handlers: | |
174 if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): | |
175 # still in use | |
176 break | |
177 else: | |
178 self._stopClient(source) | |
179 | |
180 self.handlers.remove(handler) | |
181 | |
182 def _stopClient(self, url: SourceUri): | |
183 if url == COLLECTOR: | |
184 return | |
185 | |
186 self.clients[url].stop() | |
187 | |
188 self.statements.discardSource(url) | |
189 | |
190 self._localStatements.setSourceState(url, None) | |
191 if url in self.clients: | |
192 del self.clients[url] | |
193 | |
194 self.cleanup() | |
195 | |
196 def cleanup(self): | |
197 """ | |
198 despite the attempts above, we still get useless rows in the table | |
199 sometimes | |
200 """ | |
201 with self.statements.postDeleteStatements() as garbage: | |
202 for stmt, (sources, handlers) in self.statements.table.items(): | |
203 if not sources and not any(h in self.handlers for h in handlers): | |
204 garbage.add(stmt) | |
205 | |
206 | |
207 class State(cyclone.web.RequestHandler): | |
208 | |
209 @GET_STATE_CALLS.time() | |
210 def get(self) -> None: | |
211 try: | |
212 state = self.settings.graphClients.state() | |
9 | 213 msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>') |
214 log.info(msg) | |
215 self.write(msg) | |
0 | 216 except Exception: |
217 import traceback | |
218 traceback.print_exc() | |
219 raise | |
220 | |
221 | |
222 class GraphList(cyclone.web.RequestHandler): | |
223 | |
224 def get(self) -> None: | |
225 self.write(json.dumps(config['streams'])) | |
226 | |
227 | |
228 if __name__ == '__main__': | |
229 arg = docopt(""" | |
230 Usage: sse_collector.py [options] | |
231 | |
232 -v Verbose | |
233 -i Info level only | |
234 """) | |
235 | |
9 | 236 if True: |
0 | 237 enableTwistedLog() |
238 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) | |
239 defer.setDebugging(True) | |
240 | |
241 graphClients = GraphClients() | |
242 | |
6 | 243 reactor.listenTCP( |
244 9072, | |
245 cyclone.web.Application( # | |
246 handlers=[ | |
247 (r'/state', State), | |
248 (r'/graph/', GraphList), | |
249 (r'/graph/(.+)', PatchSink), | |
250 (r'/metrics', Metrics), | |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
9
diff
changeset
|
251 ], graphClients=graphClients), |
6 | 252 interface='::') |
0 | 253 reactor.run() |