Mercurial > code > home > repos > collector
comparison collector.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | 032e59be8fe9 |
children |
comparison
equal
deleted
inserted
replaced
12:032e59be8fe9 | 13:bfd95926be6e |
---|---|
5 | 5 |
6 Future: | 6 Future: |
7 - filter out unneeded stmts from the sources | 7 - filter out unneeded stmts from the sources |
8 - give a time resolution and concatenate any patches that come faster than that res | 8 - give a time resolution and concatenate any patches that come faster than that res |
9 """ | 9 """ |
10 import asyncio | |
10 import json | 11 import json |
11 import logging | 12 import logging |
12 import time | 13 import time |
13 from typing import Dict, List, Optional, Set, Union | 14 from typing import Dict, List, Optional, Set, Union |
14 | 15 |
15 import cyclone.sse | |
16 import cyclone.web | |
17 from docopt import docopt | |
18 from patchablegraph.patchablegraph import jsonFromPatch | 16 from patchablegraph.patchablegraph import jsonFromPatch |
19 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource | |
20 from prometheus_client import Summary | 17 from prometheus_client import Summary |
21 from prometheus_client.exposition import generate_latest | |
22 from prometheus_client.registry import REGISTRY | |
23 from rdfdb.patch import Patch | 18 from rdfdb.patch import Patch |
24 from rdflib import Namespace, URIRef | 19 from rdflib import Namespace, URIRef |
25 from standardservice.logsetup import enableTwistedLog, log | 20 |
26 from twisted.internet import defer, reactor | 21 from starlette.applications import Starlette |
22 from starlette.requests import Request | |
23 from starlette.responses import JSONResponse | |
24 from starlette.routing import Route | |
25 from starlette_exporter import PrometheusMiddleware, handle_metrics | |
27 | 26 |
28 from collector_config import config | 27 from collector_config import config |
29 from merge import SourceUri, ActiveStatements, LocalStatements | 28 from merge import SourceUri, ActiveStatements, LocalStatements |
30 from patchsink import PatchSink | 29 from patchsink import PatchSink, PatchSinkResponse |
31 | 30 from patchsource import PatchSource |
32 import cyclone.sse | 31 logging.basicConfig(level=logging.DEBUG) |
33 def py3_sendEvent(self, message, event=None, eid=None, retry=None): | 32 log=logging.getLogger() |
34 | |
35 if isinstance(message, dict): | |
36 message = cyclone.sse.escape.json_encode(message) | |
37 if isinstance(message, str): | |
38 message = message.encode("utf-8") | |
39 assert isinstance(message, bytes) | |
40 if eid: | |
41 self.transport.write(b"id: %s\n" % eid) | |
42 if event: | |
43 self.transport.write(b"event: %s\n" % event) | |
44 if retry: | |
45 self.transport.write(b"retry: %s\n" % retry) | |
46 self.transport.write(b"data: %s\n\n" % message) | |
47 | |
48 | |
49 cyclone.sse.SSEHandler.sendEvent = py3_sendEvent | |
50 | |
51 | |
52 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 33 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
53 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) | 34 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) |
54 | 35 |
55 GET_STATE_CALLS = Summary("get_state_calls", 'calls') | 36 GET_STATE_CALLS = Summary("get_state_calls", 'calls') |
56 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') | 37 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') |
57 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') | 38 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') |
58 | |
59 | |
60 class Metrics(cyclone.web.RequestHandler): | |
61 | |
62 def get(self): | |
63 self.add_header('content-type', 'text/plain') | |
64 self.write(generate_latest(REGISTRY)) | |
65 | |
66 | 39 |
67 | 40 |
68 class GraphClients(object): | 41 class GraphClients(object): |
69 """ | 42 """ |
70 All the active PatchSources and SSEHandlers | 43 All the active PatchSources and SSEHandlers |
74 asserting them and the requesters who currently know them. As | 47 asserting them and the requesters who currently know them. As |
75 statements come and go, we make patches to send to requesters. | 48 statements come and go, we make patches to send to requesters. |
76 """ | 49 """ |
77 | 50 |
78 def __init__(self): | 51 def __init__(self): |
79 self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {} # (COLLECTOR is not listed) | 52 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) |
80 self.handlers: Set[PatchSink] = set() | 53 self.handlers: Set[PatchSinkResponse] = set() |
81 self.statements: ActiveStatements = ActiveStatements() | 54 self.statements: ActiveStatements = ActiveStatements() |
82 | 55 |
83 self._localStatements = LocalStatements(self._onPatch) | 56 self._localStatements = LocalStatements(self._onPatch) |
84 | 57 |
85 def state(self) -> Dict: | 58 def state(self) -> Dict: |
86 return { | 59 return { |
87 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']), | 60 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['url']), |
88 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), | 61 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), |
89 'statements': self.statements.state(), | 62 'statements': self.statements.state(), |
90 } | 63 } |
91 | 64 |
92 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: | 65 def _sourcesForHandler(self, handler: PatchSinkResponse) -> List[SourceUri]: |
93 streamId = handler.streamId | 66 streamId = handler.streamId |
94 matches = [s for s in config['streams'] if s['id'] == streamId] | 67 matches = [s for s in config['streams'] if s['id'] == streamId] |
95 if len(matches) != 1: | 68 if len(matches) != 1: |
96 raise ValueError("%s matches for %r" % (len(matches), streamId)) | 69 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
97 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] | 70 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] |
105 else: | 78 else: |
106 self.statements.applySourcePatch(source, p) | 79 self.statements.applySourcePatch(source, p) |
107 | 80 |
108 self._sendUpdatePatch() | 81 self._sendUpdatePatch() |
109 | 82 |
110 if log.isEnabledFor(logging.DEBUG): | 83 if 0 and log.isEnabledFor(logging.DEBUG): |
111 self.statements.pprintTable() | 84 self.statements.pprintTable() |
112 | 85 |
113 if source != COLLECTOR: | 86 if source != COLLECTOR: |
114 self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) | 87 self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) |
115 | 88 |
116 @SEND_UPDATE_PATCH_CALLS.time() | 89 @SEND_UPDATE_PATCH_CALLS.time() |
117 def _sendUpdatePatch(self, handler: Optional[PatchSink] = None): | 90 def _sendUpdatePatch(self, handler: Optional[PatchSinkResponse] = None): |
118 """ | 91 """ |
119 send a patch event out this handler to bring it up to date with | 92 send a patch event out this handler to bring it up to date with |
120 self.statements | 93 self.statements |
121 """ | 94 """ |
122 now = time.time() | 95 now = time.time() |
127 return | 100 return |
128 selected = {handler} | 101 selected = {handler} |
129 # reduce loops here- prepare all patches at once | 102 # reduce loops here- prepare all patches at once |
130 for h in selected: | 103 for h in selected: |
131 period = .9 | 104 period = .9 |
132 if 'Raspbian' in h.request.headers.get('user-agent', ''): | 105 if 'Raspbian' in h.user_agent: |
133 period = 5 | 106 period = 5 |
134 if h.lastPatchSentTime > now - period: | 107 if h.lastPatchSentTime > now - period: |
135 continue | 108 continue |
136 p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) | 109 p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) |
137 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) | 110 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) |
140 # This can be a giant line, which was a problem | 113 # This can be a giant line, which was a problem |
141 # once. Might be nice for this service to try to break | 114 # once. Might be nice for this service to try to break |
142 # it up into multiple sends, although there's no | 115 # it up into multiple sends, although there's no |
143 # guarantee at all since any single stmt could be any | 116 # guarantee at all since any single stmt could be any |
144 # length. | 117 # length. |
145 h.sendEvent(message=jsonFromPatch(p), event=b'patch') | 118 h.sendEvent(message=jsonFromPatch(p), event='patch') |
146 h.lastPatchSentTime = now | 119 h.lastPatchSentTime = now |
147 else: | 120 else: |
148 log.debug('nothing to send to %s', h) | 121 log.debug('nothing to send to %s', h) |
149 | 122 |
150 def addSseHandler(self, handler: PatchSink): | 123 def addSseHandler(self, handler: PatchSinkResponse): |
151 log.info('addSseHandler %r %r', handler, handler.streamId) | 124 log.info('addSseHandler %r %r', handler, handler.streamId) |
152 | 125 |
153 # fail early if id doesn't match | 126 # fail early if id doesn't match |
154 sources = self._sourcesForHandler(handler) | 127 sources = self._sourcesForHandler(handler) |
155 | 128 |
157 | 130 |
158 for source in sources: | 131 for source in sources: |
159 if source not in self.clients and source != COLLECTOR: | 132 if source not in self.clients and source != COLLECTOR: |
160 log.debug('connect to patch source %s', source) | 133 log.debug('connect to patch source %s', source) |
161 self._localStatements.setSourceState(source, ROOM['connect']) | 134 self._localStatements.setSourceState(source, ROOM['connect']) |
162 self.clients[source] = ReconnectingPatchSource(source, | 135 self.clients[source] = PatchSource(source, |
163 listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), | 136 listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), |
164 reconnectSecs=10) | 137 reconnectSecs=10) |
165 log.debug('bring new client up to date') | 138 log.debug('bring new client up to date') |
166 | 139 |
167 self._sendUpdatePatch(handler) | 140 self._sendUpdatePatch(handler) |
168 | 141 |
169 def removeSseHandler(self, handler: PatchSink): | 142 def removeSseHandler(self, handler: PatchSinkResponse): |
170 log.info('removeSseHandler %r', handler) | 143 log.info('removeSseHandler %r', handler) |
171 self.statements.discardHandler(handler) | 144 self.statements.discardHandler(handler) |
172 for source in self._sourcesForHandler(handler): | 145 for source in self._sourcesForHandler(handler): |
173 for otherHandler in self.handlers: | 146 for otherHandler in self.handlers: |
174 if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): | 147 if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): |
202 for stmt, (sources, handlers) in self.statements.table.items(): | 175 for stmt, (sources, handlers) in self.statements.table.items(): |
203 if not sources and not any(h in self.handlers for h in handlers): | 176 if not sources and not any(h in self.handlers for h in handlers): |
204 garbage.add(stmt) | 177 garbage.add(stmt) |
205 | 178 |
206 | 179 |
207 class State(cyclone.web.RequestHandler): | 180 @GET_STATE_CALLS.time() |
208 | 181 def State(request: Request) -> JSONResponse: |
209 @GET_STATE_CALLS.time() | 182 state = request.app.state.graphClients.state() |
210 def get(self) -> None: | 183 msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>') |
211 try: | 184 log.info(msg) |
212 state = self.settings.graphClients.state() | 185 return JSONResponse({'graphClients': state}) |
213 msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>') | 186 |
214 log.info(msg) | 187 |
215 self.write(msg) | 188 def GraphList(request: Request) -> JSONResponse: |
216 except Exception: | 189 return JSONResponse(config['streams']) |
217 import traceback | 190 |
218 traceback.print_exc() | 191 def main(): |
219 raise | |
220 | |
221 | |
222 class GraphList(cyclone.web.RequestHandler): | |
223 | |
224 def get(self) -> None: | |
225 self.write(json.dumps(config['streams'])) | |
226 | |
227 | |
228 if __name__ == '__main__': | |
229 arg = docopt(""" | |
230 Usage: sse_collector.py [options] | |
231 | |
232 -v Verbose | |
233 -i Info level only | |
234 """) | |
235 | |
236 if True: | |
237 enableTwistedLog() | |
238 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) | |
239 defer.setDebugging(True) | |
240 | |
241 graphClients = GraphClients() | 192 graphClients = GraphClients() |
242 | 193 |
243 reactor.listenTCP( | 194 app = Starlette( |
244 9072, | 195 debug=True, |
245 cyclone.web.Application( # | 196 routes=[ |
246 handlers=[ | 197 Route('/state', State), |
247 (r'/state', State), | 198 Route('/graph/', GraphList), |
248 (r'/graph/', GraphList), | 199 Route('/graph/{stream_id:str}', PatchSink), |
249 (r'/graph/(.+)', PatchSink), | 200 ]) |
250 (r'/metrics', Metrics), | 201 app.state.graphClients = graphClients |
251 ], graphClients=graphClients), | 202 |
252 interface='::') | 203 app.add_middleware(PrometheusMiddleware, app_name='collector') |
253 reactor.run() | 204 app.add_route("/metrics", handle_metrics) |
205 return app | |
206 | |
207 | |
208 app = main() |