comparison collector.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents 032e59be8fe9
children
comparison
equal deleted inserted replaced
12:032e59be8fe9 13:bfd95926be6e
5 5
6 Future: 6 Future:
7 - filter out unneeded stmts from the sources 7 - filter out unneeded stmts from the sources
8 - give a time resolution and concatenate any patches that come faster than that res 8 - give a time resolution and concatenate any patches that come faster than that res
9 """ 9 """
10 import asyncio
10 import json 11 import json
11 import logging 12 import logging
12 import time 13 import time
13 from typing import Dict, List, Optional, Set, Union 14 from typing import Dict, List, Optional, Set, Union
14 15
15 import cyclone.sse
16 import cyclone.web
17 from docopt import docopt
18 from patchablegraph.patchablegraph import jsonFromPatch 16 from patchablegraph.patchablegraph import jsonFromPatch
19 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource
20 from prometheus_client import Summary 17 from prometheus_client import Summary
21 from prometheus_client.exposition import generate_latest
22 from prometheus_client.registry import REGISTRY
23 from rdfdb.patch import Patch 18 from rdfdb.patch import Patch
24 from rdflib import Namespace, URIRef 19 from rdflib import Namespace, URIRef
25 from standardservice.logsetup import enableTwistedLog, log 20
26 from twisted.internet import defer, reactor 21 from starlette.applications import Starlette
22 from starlette.requests import Request
23 from starlette.responses import JSONResponse
24 from starlette.routing import Route
25 from starlette_exporter import PrometheusMiddleware, handle_metrics
27 26
28 from collector_config import config 27 from collector_config import config
29 from merge import SourceUri, ActiveStatements, LocalStatements 28 from merge import SourceUri, ActiveStatements, LocalStatements
30 from patchsink import PatchSink 29 from patchsink import PatchSink, PatchSinkResponse
31 30 from patchsource import PatchSource
32 import cyclone.sse 31 logging.basicConfig(level=logging.DEBUG)
33 def py3_sendEvent(self, message, event=None, eid=None, retry=None): 32 log=logging.getLogger()
34
35 if isinstance(message, dict):
36 message = cyclone.sse.escape.json_encode(message)
37 if isinstance(message, str):
38 message = message.encode("utf-8")
39 assert isinstance(message, bytes)
40 if eid:
41 self.transport.write(b"id: %s\n" % eid)
42 if event:
43 self.transport.write(b"event: %s\n" % event)
44 if retry:
45 self.transport.write(b"retry: %s\n" % retry)
46 self.transport.write(b"data: %s\n\n" % message)
47
48
49 cyclone.sse.SSEHandler.sendEvent = py3_sendEvent
50
51
52 ROOM = Namespace("http://projects.bigasterisk.com/room/") 33 ROOM = Namespace("http://projects.bigasterisk.com/room/")
53 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) 34 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
54 35
55 GET_STATE_CALLS = Summary("get_state_calls", 'calls') 36 GET_STATE_CALLS = Summary("get_state_calls", 'calls')
56 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') 37 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls')
57 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') 38 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls')
58
59
60 class Metrics(cyclone.web.RequestHandler):
61
62 def get(self):
63 self.add_header('content-type', 'text/plain')
64 self.write(generate_latest(REGISTRY))
65
66 39
67 40
68 class GraphClients(object): 41 class GraphClients(object):
69 """ 42 """
70 All the active PatchSources and SSEHandlers 43 All the active PatchSources and SSEHandlers
74 asserting them and the requesters who currently know them. As 47 asserting them and the requesters who currently know them. As
75 statements come and go, we make patches to send to requesters. 48 statements come and go, we make patches to send to requesters.
76 """ 49 """
77 50
78 def __init__(self): 51 def __init__(self):
79 self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {} # (COLLECTOR is not listed) 52 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed)
80 self.handlers: Set[PatchSink] = set() 53 self.handlers: Set[PatchSinkResponse] = set()
81 self.statements: ActiveStatements = ActiveStatements() 54 self.statements: ActiveStatements = ActiveStatements()
82 55
83 self._localStatements = LocalStatements(self._onPatch) 56 self._localStatements = LocalStatements(self._onPatch)
84 57
85 def state(self) -> Dict: 58 def state(self) -> Dict:
86 return { 59 return {
87 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']), 60 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['url']),
88 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), 61 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])),
89 'statements': self.statements.state(), 62 'statements': self.statements.state(),
90 } 63 }
91 64
92 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: 65 def _sourcesForHandler(self, handler: PatchSinkResponse) -> List[SourceUri]:
93 streamId = handler.streamId 66 streamId = handler.streamId
94 matches = [s for s in config['streams'] if s['id'] == streamId] 67 matches = [s for s in config['streams'] if s['id'] == streamId]
95 if len(matches) != 1: 68 if len(matches) != 1:
96 raise ValueError("%s matches for %r" % (len(matches), streamId)) 69 raise ValueError("%s matches for %r" % (len(matches), streamId))
97 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] 70 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR]
105 else: 78 else:
106 self.statements.applySourcePatch(source, p) 79 self.statements.applySourcePatch(source, p)
107 80
108 self._sendUpdatePatch() 81 self._sendUpdatePatch()
109 82
110 if log.isEnabledFor(logging.DEBUG): 83 if 0 and log.isEnabledFor(logging.DEBUG):
111 self.statements.pprintTable() 84 self.statements.pprintTable()
112 85
113 if source != COLLECTOR: 86 if source != COLLECTOR:
114 self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) 87 self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived'])
115 88
116 @SEND_UPDATE_PATCH_CALLS.time() 89 @SEND_UPDATE_PATCH_CALLS.time()
117 def _sendUpdatePatch(self, handler: Optional[PatchSink] = None): 90 def _sendUpdatePatch(self, handler: Optional[PatchSinkResponse] = None):
118 """ 91 """
119 send a patch event out this handler to bring it up to date with 92 send a patch event out this handler to bring it up to date with
120 self.statements 93 self.statements
121 """ 94 """
122 now = time.time() 95 now = time.time()
127 return 100 return
128 selected = {handler} 101 selected = {handler}
129 # reduce loops here- prepare all patches at once 102 # reduce loops here- prepare all patches at once
130 for h in selected: 103 for h in selected:
131 period = .9 104 period = .9
132 if 'Raspbian' in h.request.headers.get('user-agent', ''): 105 if 'Raspbian' in h.user_agent:
133 period = 5 106 period = 5
134 if h.lastPatchSentTime > now - period: 107 if h.lastPatchSentTime > now - period:
135 continue 108 continue
136 p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) 109 p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h)))
137 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) 110 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr)
140 # This can be a giant line, which was a problem 113 # This can be a giant line, which was a problem
141 # once. Might be nice for this service to try to break 114 # once. Might be nice for this service to try to break
142 # it up into multiple sends, although there's no 115 # it up into multiple sends, although there's no
143 # guarantee at all since any single stmt could be any 116 # guarantee at all since any single stmt could be any
144 # length. 117 # length.
145 h.sendEvent(message=jsonFromPatch(p), event=b'patch') 118 h.sendEvent(message=jsonFromPatch(p), event='patch')
146 h.lastPatchSentTime = now 119 h.lastPatchSentTime = now
147 else: 120 else:
148 log.debug('nothing to send to %s', h) 121 log.debug('nothing to send to %s', h)
149 122
150 def addSseHandler(self, handler: PatchSink): 123 def addSseHandler(self, handler: PatchSinkResponse):
151 log.info('addSseHandler %r %r', handler, handler.streamId) 124 log.info('addSseHandler %r %r', handler, handler.streamId)
152 125
153 # fail early if id doesn't match 126 # fail early if id doesn't match
154 sources = self._sourcesForHandler(handler) 127 sources = self._sourcesForHandler(handler)
155 128
157 130
158 for source in sources: 131 for source in sources:
159 if source not in self.clients and source != COLLECTOR: 132 if source not in self.clients and source != COLLECTOR:
160 log.debug('connect to patch source %s', source) 133 log.debug('connect to patch source %s', source)
161 self._localStatements.setSourceState(source, ROOM['connect']) 134 self._localStatements.setSourceState(source, ROOM['connect'])
162 self.clients[source] = ReconnectingPatchSource(source, 135 self.clients[source] = PatchSource(source,
163 listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), 136 listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph),
164 reconnectSecs=10) 137 reconnectSecs=10)
165 log.debug('bring new client up to date') 138 log.debug('bring new client up to date')
166 139
167 self._sendUpdatePatch(handler) 140 self._sendUpdatePatch(handler)
168 141
169 def removeSseHandler(self, handler: PatchSink): 142 def removeSseHandler(self, handler: PatchSinkResponse):
170 log.info('removeSseHandler %r', handler) 143 log.info('removeSseHandler %r', handler)
171 self.statements.discardHandler(handler) 144 self.statements.discardHandler(handler)
172 for source in self._sourcesForHandler(handler): 145 for source in self._sourcesForHandler(handler):
173 for otherHandler in self.handlers: 146 for otherHandler in self.handlers:
174 if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): 147 if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)):
202 for stmt, (sources, handlers) in self.statements.table.items(): 175 for stmt, (sources, handlers) in self.statements.table.items():
203 if not sources and not any(h in self.handlers for h in handlers): 176 if not sources and not any(h in self.handlers for h in handlers):
204 garbage.add(stmt) 177 garbage.add(stmt)
205 178
206 179
207 class State(cyclone.web.RequestHandler): 180 @GET_STATE_CALLS.time()
208 181 def State(request: Request) -> JSONResponse:
209 @GET_STATE_CALLS.time() 182 state = request.app.state.graphClients.state()
210 def get(self) -> None: 183 msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>')
211 try: 184 log.info(msg)
212 state = self.settings.graphClients.state() 185 return JSONResponse({'graphClients': state})
213 msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>') 186
214 log.info(msg) 187
215 self.write(msg) 188 def GraphList(request: Request) -> JSONResponse:
216 except Exception: 189 return JSONResponse(config['streams'])
217 import traceback 190
218 traceback.print_exc() 191 def main():
219 raise
220
221
222 class GraphList(cyclone.web.RequestHandler):
223
224 def get(self) -> None:
225 self.write(json.dumps(config['streams']))
226
227
228 if __name__ == '__main__':
229 arg = docopt("""
230 Usage: sse_collector.py [options]
231
232 -v Verbose
233 -i Info level only
234 """)
235
236 if True:
237 enableTwistedLog()
238 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO)
239 defer.setDebugging(True)
240
241 graphClients = GraphClients() 192 graphClients = GraphClients()
242 193
243 reactor.listenTCP( 194 app = Starlette(
244 9072, 195 debug=True,
245 cyclone.web.Application( # 196 routes=[
246 handlers=[ 197 Route('/state', State),
247 (r'/state', State), 198 Route('/graph/', GraphList),
248 (r'/graph/', GraphList), 199 Route('/graph/{stream_id:str}', PatchSink),
249 (r'/graph/(.+)', PatchSink), 200 ])
250 (r'/metrics', Metrics), 201 app.state.graphClients = graphClients
251 ], graphClients=graphClients), 202
252 interface='::') 203 app.add_middleware(PrometheusMiddleware, app_name='collector')
253 reactor.run() 204 app.add_route("/metrics", handle_metrics)
205 return app
206
207
208 app = main()