Mercurial > code > home > repos > collector
annotate collector.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | 032e59be8fe9 |
children |
rev | line source |
---|---|
0 | 1 """ |
2 requesting /graph/foo returns an SSE patch stream that's the | |
3 result of fetching multiple other SSE patch streams. The result stream | |
4 may include new statements injected by this service. | |
5 | |
6 Future: | |
7 - filter out unneeded stmts from the sources | |
8 - give a time resolution and concatenate any patches that come faster than that res | |
9 """ | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
10 import asyncio |
0 | 11 import json |
12 import logging | |
13 import time | |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
9
diff
changeset
|
14 from typing import Dict, List, Optional, Set, Union |
0 | 15 |
9 | 16 from patchablegraph.patchablegraph import jsonFromPatch |
6 | 17 from prometheus_client import Summary |
0 | 18 from rdfdb.patch import Patch |
6 | 19 from rdflib import Namespace, URIRef |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
20 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
21 from starlette.applications import Starlette |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
22 from starlette.requests import Request |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
23 from starlette.responses import JSONResponse |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
24 from starlette.routing import Route |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
25 from starlette_exporter import PrometheusMiddleware, handle_metrics |
0 | 26 |
27 from collector_config import config | |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
9
diff
changeset
|
28 from merge import SourceUri, ActiveStatements, LocalStatements |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
29 from patchsink import PatchSink, PatchSinkResponse |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
30 from patchsource import PatchSource |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
31 logging.basicConfig(level=logging.DEBUG) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
32 log=logging.getLogger() |
0 | 33 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
34 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) | |
35 | |
36 GET_STATE_CALLS = Summary("get_state_calls", 'calls') | |
37 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') | |
38 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') | |
39 | |
40 | |
41 class GraphClients(object): | |
42 """ | |
43 All the active PatchSources and SSEHandlers | |
44 | |
45 To handle all the overlapping-statement cases, we store a set of | |
46 true statements along with the sources that are currently | |
47 asserting them and the requesters who currently know them. As | |
48 statements come and go, we make patches to send to requesters. | |
49 """ | |
50 | |
51 def __init__(self): | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
52 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
53 self.handlers: Set[PatchSinkResponse] = set() |
0 | 54 self.statements: ActiveStatements = ActiveStatements() |
55 | |
56 self._localStatements = LocalStatements(self._onPatch) | |
57 | |
58 def state(self) -> Dict: | |
59 return { | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
60 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['url']), |
0 | 61 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), |
62 'statements': self.statements.state(), | |
63 } | |
64 | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
65 def _sourcesForHandler(self, handler: PatchSinkResponse) -> List[SourceUri]: |
0 | 66 streamId = handler.streamId |
67 matches = [s for s in config['streams'] if s['id'] == streamId] | |
68 if len(matches) != 1: | |
69 raise ValueError("%s matches for %r" % (len(matches), streamId)) | |
70 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] | |
71 | |
72 @ON_PATCH_CALLS.time() | |
73 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False): | |
74 if fullGraph: | |
75 # a reconnect may need to resend the full graph even | |
76 # though we've already sent some statements | |
77 self.statements.replaceSourceStatements(source, p.addQuads) | |
78 else: | |
79 self.statements.applySourcePatch(source, p) | |
80 | |
81 self._sendUpdatePatch() | |
82 | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
83 if 0 and log.isEnabledFor(logging.DEBUG): |
0 | 84 self.statements.pprintTable() |
85 | |
86 if source != COLLECTOR: | |
87 self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) | |
88 | |
89 @SEND_UPDATE_PATCH_CALLS.time() | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
90 def _sendUpdatePatch(self, handler: Optional[PatchSinkResponse] = None): |
0 | 91 """ |
92 send a patch event out this handler to bring it up to date with | |
93 self.statements | |
94 """ | |
95 now = time.time() | |
96 selected = self.handlers | |
97 if handler is not None: | |
98 if handler not in self.handlers: | |
99 log.error("called _sendUpdatePatch on a handler that's gone") | |
100 return | |
101 selected = {handler} | |
102 # reduce loops here- prepare all patches at once | |
103 for h in selected: | |
104 period = .9 | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
105 if 'Raspbian' in h.user_agent: |
0 | 106 period = 5 |
107 if h.lastPatchSentTime > now - period: | |
108 continue | |
109 p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) | |
110 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) | |
111 if not p.isNoop(): | |
112 log.debug("send patch %s to %s", p.shortSummary(), h) | |
113 # This can be a giant line, which was a problem | |
114 # once. Might be nice for this service to try to break | |
115 # it up into multiple sends, although there's no | |
116 # guarantee at all since any single stmt could be any | |
117 # length. | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
118 h.sendEvent(message=jsonFromPatch(p), event='patch') |
0 | 119 h.lastPatchSentTime = now |
120 else: | |
121 log.debug('nothing to send to %s', h) | |
122 | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
123 def addSseHandler(self, handler: PatchSinkResponse): |
0 | 124 log.info('addSseHandler %r %r', handler, handler.streamId) |
125 | |
126 # fail early if id doesn't match | |
127 sources = self._sourcesForHandler(handler) | |
128 | |
129 self.handlers.add(handler) | |
130 | |
131 for source in sources: | |
132 if source not in self.clients and source != COLLECTOR: | |
133 log.debug('connect to patch source %s', source) | |
134 self._localStatements.setSourceState(source, ROOM['connect']) | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
135 self.clients[source] = PatchSource(source, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
136 listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
137 reconnectSecs=10) |
0 | 138 log.debug('bring new client up to date') |
139 | |
140 self._sendUpdatePatch(handler) | |
141 | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
142 def removeSseHandler(self, handler: PatchSinkResponse): |
0 | 143 log.info('removeSseHandler %r', handler) |
144 self.statements.discardHandler(handler) | |
145 for source in self._sourcesForHandler(handler): | |
146 for otherHandler in self.handlers: | |
147 if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): | |
148 # still in use | |
149 break | |
150 else: | |
151 self._stopClient(source) | |
152 | |
153 self.handlers.remove(handler) | |
154 | |
155 def _stopClient(self, url: SourceUri): | |
156 if url == COLLECTOR: | |
157 return | |
158 | |
159 self.clients[url].stop() | |
160 | |
161 self.statements.discardSource(url) | |
162 | |
163 self._localStatements.setSourceState(url, None) | |
164 if url in self.clients: | |
165 del self.clients[url] | |
166 | |
167 self.cleanup() | |
168 | |
169 def cleanup(self): | |
170 """ | |
171 despite the attempts above, we still get useless rows in the table | |
172 sometimes | |
173 """ | |
174 with self.statements.postDeleteStatements() as garbage: | |
175 for stmt, (sources, handlers) in self.statements.table.items(): | |
176 if not sources and not any(h in self.handlers for h in handlers): | |
177 garbage.add(stmt) | |
178 | |
179 | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
180 @GET_STATE_CALLS.time() |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
181 def State(request: Request) -> JSONResponse: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
182 state = request.app.state.graphClients.state() |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
183 msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>') |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
184 log.info(msg) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
185 return JSONResponse({'graphClients': state}) |
0 | 186 |
187 | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
188 def GraphList(request: Request) -> JSONResponse: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
189 return JSONResponse(config['streams']) |
0 | 190 |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
191 def main(): |
0 | 192 graphClients = GraphClients() |
193 | |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
194 app = Starlette( |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
195 debug=True, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
196 routes=[ |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
197 Route('/state', State), |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
198 Route('/graph/', GraphList), |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
199 Route('/graph/{stream_id:str}', PatchSink), |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
200 ]) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
201 app.state.graphClients = graphClients |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
202 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
203 app.add_middleware(PrometheusMiddleware, app_name='collector') |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
204 app.add_route("/metrics", handle_metrics) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
205 return app |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
206 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
207 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
208 app = main() |