# HG changeset patch # User drewp@bigasterisk.com # Date 2022-05-31 09:05:47 # Node ID f1df317f7c4c4896d9d5d718c4068db4e87b8103 # Parent 5f0c2e3502835da4db54304de5ec3a90e900fa81 effectSequencer mostly ported to asyncio diff --git a/light9/effect/sequencer/sequencer.py b/light9/effect/sequencer/sequencer.py --- a/light9/effect/sequencer/sequencer.py +++ b/light9/effect/sequencer/sequencer.py @@ -2,7 +2,8 @@ copies from effectloop.py, which this should replace ''' -from louie import dispatcher +import asyncio +from louie import dispatcher,All from rdflib import URIRef from twisted.internet import reactor from twisted.internet import defer @@ -12,7 +13,7 @@ from twisted.python.filepath import File import logging, bisect, time import traceback from decimal import Decimal -from typing import Any, Callable, Dict, List, Tuple, cast, Union +from typing import Any, Callable, Coroutine, Dict, List, Tuple, cast, Union from light9.ascoltami.musictime_client import MusicTime from light9.effect import effecteval @@ -28,6 +29,9 @@ import imp log = logging.getLogger('sequencer') +class StateUpdate(All): + pass + def pyType(n): ret = n.toPython() if isinstance(ret, Decimal): @@ -147,8 +151,9 @@ class Sequencer(object): def __init__(self, graph: SyncedGraph, - sendToCollector: Callable[[DeviceSettings], Deferred], - fps=40): + sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]], + fps=40, + ): self.graph = graph self.fps = fps metrics('update_loop_goal_fps').set(self.fps) @@ -164,8 +169,8 @@ class Sequencer(object): self.graph.addHandler(self.compileGraph) self.lastLoopSucceeded = False - self.codeWatcher = CodeWatcher(onChange=self.onCodeChange) - self.updateLoop() + # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange) + asyncio.create_task(self.updateLoop()) def onCodeChange(self): log.debug('seq.onCodeChange') @@ -198,38 +203,40 @@ class Sequencer(object): if not anyErrors: log.info('built all notes') - @inlineCallbacks - def updateLoop(self) -> None: - frameStart = time.time() - try: - sec = yield self.update() - except Exception as e: - self.lastLoopSucceeded = False - traceback.print_exc() - log.warn('updateLoop: %r', e) - reactor.callLater(1, self.updateLoop) - else: - took = time.time() - frameStart - metrics('update_loop_latency').observe(took) + async def updateLoop(self): + while True: + frameStart = time.time() + try: + sec = await self.update() + except Exception as e: + self.lastLoopSucceeded = False + traceback.print_exc() + log.warn('updateLoop: %r', e) + await asyncio.sleep(1) + continue + else: + took = time.time() - frameStart + metrics('update_loop_latency').observe(took) - if not self.lastLoopSucceeded: - log.info('Sequencer.update is working') - self.lastLoopSucceeded = True + if not self.lastLoopSucceeded: + log.info('Sequencer.update is working') + self.lastLoopSucceeded = True - delay = max(0, 1 / self.fps - took) - reactor.callLater(delay, self.updateLoop) + delay = max(0, 1 / self.fps - took) + await asyncio.sleep(delay) + continue @metrics('update_call').time() - @inlineCallbacks - def update(self) -> Deferred: - + async def update(self): with metrics('update_s0_getMusic').time(): - musicState = self.music.getLatest() + musicState = {'t':123.0,'song':'http://light9.bigasterisk.com/show/dance2019/song5'}#self.music.getLatest() if not musicState.get('song') or not isinstance( musicState.get('t'), float): - return defer.succeed(0.0) + return song = Song(URIRef(musicState['song'])) - dispatcher.send('state', + # print('dispsend') + # import pdb;pdb.set_trace() + dispatcher.send(StateUpdate, update={ 'song': str(song), 't': musicState['t'] @@ -248,11 +255,10 @@ class Sequencer(object): noteReports.append(report) settings.append(s) devSettings = DeviceSettings.fromList(self.graph, settings) - - dispatcher.send('state', update={'songNotes': noteReports}) + dispatcher.send(StateUpdate, update={'songNotes': noteReports}) with metrics('update_s3_send').time(): # our measurement - sendSecs = yield self.sendToCollector(devSettings) + sendSecs = await self.sendToCollector(devSettings) # sendToCollector's own measurement. # (sometimes it's None, not sure why, and neither is mypy) diff --git a/light9/effect/sequencer/service.py b/light9/effect/sequencer/service.py --- a/light9/effect/sequencer/service.py +++ b/light9/effect/sequencer/service.py @@ -2,76 +2,64 @@ plays back effect notes from the timeline """ -import functools +import asyncio +import json +import logging +import time + +from light9 import networking +from light9.collector.collector_client_asyncio import sendToCollector +from light9.effect.sequencer.sequencer import StateUpdate, Sequencer +from light9.effect.settings import DeviceSettings +from light9.metrics import metrics, metricsRoute from light9.run_local import log -from twisted.internet import reactor -from light9.metrics import metrics, metricsRoute +from louie import dispatcher from rdfdb.syncedgraph.syncedgraph import SyncedGraph -from light9 import networking, showconfig -import optparse, sys, logging -import cyclone.web -from rdflib import URIRef -from light9.effect.sequencer.sequencer import Sequencer -from light9.collector.collector_client import sendToCollector - -from light9 import clientsession - -from prometheus_client import Summary -from rdfdb.syncedgraph.syncedgraph import SyncedGraph +from sse_starlette.sse import EventSourceResponse from starlette.applications import Starlette -from starlette.endpoints import WebSocketEndpoint -from starlette.responses import Response -from starlette.routing import Route, WebSocketRoute +from starlette.routing import Route from starlette.types import Receive, Scope, Send -from starlette.websockets import WebSocket from starlette_exporter import PrometheusMiddleware, handle_metrics -# class Updates(cyclone.sse.SSEHandler): +async def changes(): + state = {} + q = asyncio.Queue() -# def __init__(self, application, request, **kwargs) -> None: -# cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs) -# self.state: Dict = {} -# dispatcher.connect(self.updateState, 'state') -# self.numConnected = 0 + def onBroadcast(update): + state.update(update) + q.put_nowait(None) -# def updateState(self, update: Dict): -# self.state.update(update) + dispatcher.connect(onBroadcast, StateUpdate) -# def bind(self) -> None: -# self.numConnected += 1 - -# if self.numConnected == 1: -# self.loop() + lastSend = 0 + while True: + await q.get() + now = time.time() + if now > lastSend + .2: + lastSend = now + yield json.dumps(state) -# def loop(self) -> None: -# if self.numConnected == 0: -# return -# self.sendEvent(self.state) -# reactor.callLater(.1, self.loop) -# def unbind(self) -> None: -# self.numConnected -= 1 +async def send_page_updates(request): + return EventSourceResponse(changes()) + def main(): - # session = clientsession.getUri('effectSequencer', options) + session = 'effectSequencer' graph = SyncedGraph(networking.rdfdb.url, "effectSequencer") + logging.getLogger('autodepgraphapi').setLevel(logging.INFO) + logging.getLogger('sse_starlette.sse').setLevel(logging.INFO) - seq = Sequencer( - graph, - lambda settings: sendToCollector( - 'effectSequencer', - '',#session, - settings, - # This seems to be safe here (and lets us get from - # 20fpx to 40fpx), even though it leads to big stalls - # if I use it on KC. - useZmq=True)) + async def send(settings: DeviceSettings): + await sendToCollector('effectSequencer', session, settings) + + seq = Sequencer(graph, send) app = Starlette( debug=True, routes=[ - # WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)), + Route('/updates', endpoint=send_page_updates), ], ) diff --git a/light9/effect/sequencer/web/Light9SequencerUi.ts b/light9/effect/sequencer/web/Light9SequencerUi.ts --- a/light9/effect/sequencer/web/Light9SequencerUi.ts +++ b/light9/effect/sequencer/web/Light9SequencerUi.ts @@ -74,14 +74,16 @@ export class Light9SequencerUi extends L `, ]; render() { - return html` - + return [ + html` + +

Sequencer [metrics]

-

Sequencer [metrics]

+

Song

`, + this.report + ? html` -

Song

- - + t=${this.report.roundT}

Notes

@@ -95,15 +97,15 @@ export class Light9SequencerUi extends L ${this.report.songNotes.map( (item: Note) => html` - - - + + + ${item.effectSettingsPairs.map( (item) => html`
- : + : ${item.value}
@@ -115,7 +117,9 @@ export class Light9SequencerUi extends L ` )} - `; + ` + : html`waiting for first report...`, + ]; } graph!: SyncedGraph; @@ -124,7 +128,8 @@ export class Light9SequencerUi extends L constructor() { super(); getTopGraph().then((g) => { - var source = new EventSource("./api/updates"); + this.graph = g; + const source = new EventSource("./api/updates"); source.addEventListener("message", this.onMessage.bind(this)); }); } diff --git a/pdm.lock b/pdm.lock --- a/pdm.lock +++ b/pdm.lock @@ -853,6 +853,15 @@ requires_python = ">=3.5" summary = "Sniff out which async library your code is running under" [[package]] +name = "sse-starlette" +version = "0.10.3" +requires_python = ">=3.6" +summary = "\"SSE plugin for Starlette\"" +dependencies = [ + "starlette", +] + +[[package]] name = "stack-data" version = "0.2.0" summary = "Extract data from python stack frames and tracebacks for informative displays" @@ -1136,7 +1145,7 @@ dependencies = [ [metadata] lock_version = "3.1" -content_hash = "sha256:d0af3ae9f90e4ec5ef0d136dcf10cdacdaa1f4d69488639235aa81db15520c5f" +content_hash = "sha256:d15be600b685c807bd0ff133a19705e8c3a9b741f371deba462783e1ef3691e5" [metadata.files] "aiohttp 3.8.1" = [ @@ -2137,6 +2146,10 @@ content_hash = "sha256:d0af3ae9f90e4ec5e {file = "sniffio-1.2.0-py3-none-any.whl", hash = "sha256:471b71698eac1c2112a40ce2752bb2f4a4814c22a54a3eed3676bc0f5ca9f663"}, {file = "sniffio-1.2.0.tar.gz", hash = "sha256:c4666eecec1d3f50960c6bdf61ab7bc350648da6c126e3cf6898d8cd4ddcd3de"}, ] +"sse-starlette 0.10.3" = [ + {file = "sse_starlette-0.10.3-py3-none-any.whl", hash = "sha256:ca2de945af80b83f1efda6144df9e13db83880b3b87c660044b64f199395e8b7"}, + {file = "sse-starlette-0.10.3.tar.gz", hash = "sha256:840607fed361360cc76f8408a25f0eca887e7cab3c3ee44f9762f179428e2bd4"}, +] "stack-data 0.2.0" = [ {file = "stack_data-0.2.0-py3-none-any.whl", hash = "sha256:999762f9c3132308789affa03e9271bbbe947bf78311851f4d485d8402ed858e"}, {file = "stack_data-0.2.0.tar.gz", hash = "sha256:45692d41bd633a9503a5195552df22b583caf16f0b27c4e58c98d88c8b648e12"}, diff --git a/pyproject.toml b/pyproject.toml --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ dependencies = [ "PyGObject>=3.42.1", "aiohttp>=3.8.1", "rdfdb @ https://projects.bigasterisk.com/rdfdb/rdfdb-0.23.0.tar.gz", + "sse-starlette>=0.10.3", ] requires-python = ">=3.9"