Mercurial > code > home > repos > collector
annotate patchsink.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | 032e59be8fe9 |
children |
rev | line source |
---|---|
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
1 """sends patches out an SSE response to a collector client who wants a collection of graphs""" |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
2 import time |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
3 from typing import Dict, Any, Optional |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
4 import asyncio |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
5 from starlette.requests import Request |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
6 from sse_starlette.sse import EventSourceResponse |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
7 import logging |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
8 import queue |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
9 log = logging.getLogger('writer') |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
10 |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
11 async def iq(q): |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
12 while True: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
13 elem = await q.get() |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
14 yield elem |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
15 |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
16 class PatchSinkResponse(EventSourceResponse): |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
17 |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
18 def __init__( |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
19 self, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
20 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
21 status_code: int = 200, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
22 headers: Optional[Dict] = None, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
23 ping: Optional[int] = None, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
24 user_agent="", |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
25 stream_id: Optional[str] = None, |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
26 ) -> None: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
27 self.q = asyncio.Queue() |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
28 EventSourceResponse.__init__(self, iq(self.q), status_code, headers, ping=ping) |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
29 self.created = time.time() |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
30 self.user_agent = user_agent |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
31 self.lastPatchSentTime = 0.0 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
32 self.streamId = stream_id |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
33 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
34 def sendEvent(self, message: str, event: str): |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
35 self.q.put_nowait(dict(data=message, event=event)) |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
36 |
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
37 def state(self) -> Dict: |
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
38 return { |
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
39 'created': round(self.created, 2), |
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
40 'ageHours': round((time.time() - self.created) / 3600, 2), |
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
41 'streamId': self.streamId, |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
42 # 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
43 # 'foafAgent': self.request.headers.get('X-Foaf-Agent'), |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
44 # 'userAgent': self.request.headers.get('user-agent'), |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
45 } |
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
46 |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
47 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
48 async def PatchSink(request: Request) -> PatchSinkResponse: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
49 log.debug(f"PatchSink for {request.path_params['stream_id']=}") |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
50 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
51 ret= PatchSinkResponse( ping=30, user_agent=request.headers['user-agent'], stream_id=request.path_params['stream_id']) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
52 request.app.state.graphClients.addSseHandler(ret) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
53 return ret |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
54 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
55 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
56 # class PatchSink_old: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
57 # _handlerSerial = 0 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
58 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
59 # def __init__(self, application: cyclone.web.Application, request): |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
60 # cyclone.sse.SSEHandler.__init__(self, application, request) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
61 # self.bound = False |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
62 # self.created = time.time() |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
63 # self.graphClients = self.settings.graphClients |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
64 |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
65 # self._serial = PatchSink._handlerSerial |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
66 # PatchSink._handlerSerial += 1 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
67 # self.lastPatchSentTime: float = 0.0 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
68 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
69 # def __repr__(self) -> str: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
70 # return '<Handler #%s>' % self._serial |
12
032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
drewp@bigasterisk.com
parents:
diff
changeset
|
71 |
13
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
72 # def bind(self, *args, **kwargs): |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
73 # self.streamId = args[0] |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
74 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
75 # self.graphClients.addSseHandler(self) |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
76 # # If something goes wrong with addSseHandler, I don't want to |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
77 # # try removeSseHandler. |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
78 # self.bound = True |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
79 |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
80 # def unbind(self) -> None: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
81 # if self.bound: |
bfd95926be6e
initial port to starlette. missing some disconnect & cleanup functionality
drewp@bigasterisk.com
parents:
12
diff
changeset
|
82 # self.graphClients.removeSseHandler(self) |