changeset 2100:f1df317f7c4c

effectSequencer mostly ported to asyncio
author drewp@bigasterisk.com
date Tue, 31 May 2022 02:05:47 -0700
parents 5f0c2e350283
children 4248f40ddcae
files light9/effect/sequencer/sequencer.py light9/effect/sequencer/service.py light9/effect/sequencer/web/Light9SequencerUi.ts pdm.lock pyproject.toml
diffstat 5 files changed, 109 insertions(+), 96 deletions(-) [+]
line wrap: on
line diff
--- a/light9/effect/sequencer/sequencer.py	Tue May 31 02:05:13 2022 -0700
+++ b/light9/effect/sequencer/sequencer.py	Tue May 31 02:05:47 2022 -0700
@@ -2,7 +2,8 @@
 copies from effectloop.py, which this should replace
 '''
 
-from louie import dispatcher
+import asyncio
+from louie import dispatcher,All
 from rdflib import URIRef
 from twisted.internet import reactor
 from twisted.internet import defer
@@ -12,7 +13,7 @@
 import logging, bisect, time
 import traceback
 from decimal import Decimal
-from typing import Any, Callable, Dict, List, Tuple, cast, Union
+from typing import Any, Callable, Coroutine, Dict, List, Tuple, cast, Union
 
 from light9.ascoltami.musictime_client import MusicTime
 from light9.effect import effecteval
@@ -28,6 +29,9 @@
 log = logging.getLogger('sequencer')
 
 
+class StateUpdate(All):
+    pass
+
 def pyType(n):
     ret = n.toPython()
     if isinstance(ret, Decimal):
@@ -147,8 +151,9 @@
 
     def __init__(self,
                  graph: SyncedGraph,
-                 sendToCollector: Callable[[DeviceSettings], Deferred],
-                 fps=40):
+                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
+                 fps=40,
+                 ):
         self.graph = graph
         self.fps = fps
         metrics('update_loop_goal_fps').set(self.fps)
@@ -164,8 +169,8 @@
         self.graph.addHandler(self.compileGraph)
         self.lastLoopSucceeded = False
 
-        self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
-        self.updateLoop()
+        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
+        asyncio.create_task(self.updateLoop())
 
     def onCodeChange(self):
         log.debug('seq.onCodeChange')
@@ -198,38 +203,40 @@
         if not anyErrors:
             log.info('built all notes')
 
-    @inlineCallbacks
-    def updateLoop(self) -> None:
-        frameStart = time.time()
-        try:
-            sec = yield self.update()
-        except Exception as e:
-            self.lastLoopSucceeded = False
-            traceback.print_exc()
-            log.warn('updateLoop: %r', e)
-            reactor.callLater(1, self.updateLoop)
-        else:
-            took = time.time() - frameStart
-            metrics('update_loop_latency').observe(took)
+    async def updateLoop(self):
+        while True:
+            frameStart = time.time()
+            try:
+                sec = await self.update()
+            except Exception as e:
+                self.lastLoopSucceeded = False
+                traceback.print_exc()
+                log.warn('updateLoop: %r', e)
+                await asyncio.sleep(1)
+                continue
+            else:
+                took = time.time() - frameStart
+                metrics('update_loop_latency').observe(took)
 
-            if not self.lastLoopSucceeded:
-                log.info('Sequencer.update is working')
-                self.lastLoopSucceeded = True
+                if not self.lastLoopSucceeded:
+                    log.info('Sequencer.update is working')
+                    self.lastLoopSucceeded = True
 
-            delay = max(0, 1 / self.fps - took)
-            reactor.callLater(delay, self.updateLoop)
+                delay = max(0, 1 / self.fps - took)
+                await asyncio.sleep(delay)
+                continue
 
     @metrics('update_call').time()
-    @inlineCallbacks
-    def update(self) -> Deferred:
-
+    async def update(self):
         with metrics('update_s0_getMusic').time():
-            musicState = self.music.getLatest()
+            musicState = {'t':123.0,'song':'http://light9.bigasterisk.com/show/dance2019/song5'}#self.music.getLatest()
             if not musicState.get('song') or not isinstance(
                     musicState.get('t'), float):
-                return defer.succeed(0.0)
+                return
             song = Song(URIRef(musicState['song']))
-            dispatcher.send('state',
+            # print('dispsend')
+            # import pdb;pdb.set_trace()
+            dispatcher.send(StateUpdate,
                             update={
                                 'song': str(song),
                                 't': musicState['t']
@@ -248,11 +255,10 @@
                 noteReports.append(report)
                 settings.append(s)
             devSettings = DeviceSettings.fromList(self.graph, settings)
-
-        dispatcher.send('state', update={'songNotes': noteReports})
+        dispatcher.send(StateUpdate, update={'songNotes': noteReports})
 
         with metrics('update_s3_send').time():  # our measurement
-            sendSecs = yield self.sendToCollector(devSettings)
+            sendSecs = await self.sendToCollector(devSettings)
 
         # sendToCollector's own measurement.
         # (sometimes it's None, not sure why, and neither is mypy)
--- a/light9/effect/sequencer/service.py	Tue May 31 02:05:13 2022 -0700
+++ b/light9/effect/sequencer/service.py	Tue May 31 02:05:47 2022 -0700
@@ -2,76 +2,64 @@
 plays back effect notes from the timeline
 """
 
-import functools
+import asyncio
+import json
+import logging
+import time
+
+from light9 import networking
+from light9.collector.collector_client_asyncio import sendToCollector
+from light9.effect.sequencer.sequencer import StateUpdate, Sequencer
+from light9.effect.settings import DeviceSettings
+from light9.metrics import metrics, metricsRoute
 from light9.run_local import log
-from twisted.internet import reactor
-from light9.metrics import metrics, metricsRoute
+from louie import dispatcher
 from rdfdb.syncedgraph.syncedgraph import SyncedGraph
-from light9 import networking, showconfig
-import optparse, sys, logging
-import cyclone.web
-from rdflib import URIRef
-from light9.effect.sequencer.sequencer import Sequencer
-from light9.collector.collector_client import sendToCollector
-
-from light9 import clientsession
-
-from prometheus_client import Summary
-from rdfdb.syncedgraph.syncedgraph import SyncedGraph
+from sse_starlette.sse import EventSourceResponse
 from starlette.applications import Starlette
-from starlette.endpoints import WebSocketEndpoint
-from starlette.responses import Response
-from starlette.routing import Route, WebSocketRoute
+from starlette.routing import Route
 from starlette.types import Receive, Scope, Send
-from starlette.websockets import WebSocket
 from starlette_exporter import PrometheusMiddleware, handle_metrics
 
 
-# class Updates(cyclone.sse.SSEHandler):
+async def changes():
+    state = {}
+    q = asyncio.Queue()
 
-#     def __init__(self, application, request, **kwargs) -> None:
-#         cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
-#         self.state: Dict = {}
-#         dispatcher.connect(self.updateState, 'state')
-#         self.numConnected = 0
+    def onBroadcast(update):
+        state.update(update)
+        q.put_nowait(None)
 
-#     def updateState(self, update: Dict):
-#         self.state.update(update)
+    dispatcher.connect(onBroadcast, StateUpdate)
 
-#     def bind(self) -> None:
-#         self.numConnected += 1
-
-#         if self.numConnected == 1:
-#             self.loop()
+    lastSend = 0
+    while True:
+        await q.get()
+        now = time.time()
+        if now > lastSend + .2:
+            lastSend = now
+            yield json.dumps(state)
 
-#     def loop(self) -> None:
-#         if self.numConnected == 0:
-#             return
-#         self.sendEvent(self.state)
-#         reactor.callLater(.1, self.loop)
 
-#     def unbind(self) -> None:
-#         self.numConnected -= 1
+async def send_page_updates(request):
+    return EventSourceResponse(changes())
+
 
 def main():
-    # session = clientsession.getUri('effectSequencer', options)
+    session = 'effectSequencer'
     graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
+    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
+    logging.getLogger('sse_starlette.sse').setLevel(logging.INFO)
 
-    seq = Sequencer(
-        graph,
-        lambda settings: sendToCollector(
-            'effectSequencer',
-            '',#session,
-            settings,
-            # This seems to be safe here (and lets us get from
-            # 20fpx to 40fpx), even though it leads to big stalls
-            # if I use it on KC.
-            useZmq=True))
+    async def send(settings: DeviceSettings):
+        await sendToCollector('effectSequencer', session, settings)
+
+    seq = Sequencer(graph, send)
 
     app = Starlette(
         debug=True,
         routes=[
-            # WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
+            Route('/updates', endpoint=send_page_updates),
         ],
     )
 
--- a/light9/effect/sequencer/web/Light9SequencerUi.ts	Tue May 31 02:05:13 2022 -0700
+++ b/light9/effect/sequencer/web/Light9SequencerUi.ts	Tue May 31 02:05:47 2022 -0700
@@ -74,14 +74,16 @@
     `,
   ];
   render() {
-    return html`
-      <rdfdb-synced-graph></rdfdb-synced-graph>
+    return [
+      html` <rdfdb-synced-graph></rdfdb-synced-graph>
+
+        <h1>Sequencer <a href="metrics">[metrics]</a></h1>
 
-      <h1>Sequencer <a href="metrics/">[metrics]</a></h1>
+        <h2>Song</h2>`,
+      this.report
+        ? html`
 
-      <h2>Song</h2>
-
-      <resource-display uri=${this.report.songUri}"></resource-display>
+      <resource-display .uri=${this.graph.Uri(this.report.song)}"></resource-display>
       t=${this.report.roundT}
 
       <h3>Notes</h3>
@@ -95,15 +97,15 @@
         </tr>
         ${this.report.songNotes.map(
           (item: Note) => html`
-            <tr class$="${item.rowClass}">
-              <td><resource-display uri="${item.note}"></resource-display></td>
-              <td><resource-display uri="${item.effectClass}"></resource-display></td>
+            <tr class="${item.rowClass}">
+              <td><resource-display .uri="${this.graph.Uri(item.note)}"></resource-display></td>
+              <td><resource-display .uri="${this.graph.Uri(item.effectClass)}"></resource-display></td>
               <td>
                 ${item.effectSettingsPairs.map(
                   (item) => html`
                     <div>
                       <span class="effectSetting">
-                        <resource-display uri="${item.effectAttr}"></resource-display>:
+                        <resource-display .uri="${this.graph.Uri(item.effectAttr)}"></resource-display>:
                         <span class="number">${item.value}</span>
                       </span>
                     </div>
@@ -115,7 +117,9 @@
           `
         )}
       </table>
-    `;
+    `
+        : html`waiting for first report...`,
+    ];
   }
 
   graph!: SyncedGraph;
@@ -124,7 +128,8 @@
   constructor() {
     super();
     getTopGraph().then((g) => {
-      var source = new EventSource("./api/updates");
+      this.graph = g;
+      const source = new EventSource("./api/updates");
       source.addEventListener("message", this.onMessage.bind(this));
     });
   }
--- a/pdm.lock	Tue May 31 02:05:13 2022 -0700
+++ b/pdm.lock	Tue May 31 02:05:47 2022 -0700
@@ -853,6 +853,15 @@
 summary = "Sniff out which async library your code is running under"
 
 [[package]]
+name = "sse-starlette"
+version = "0.10.3"
+requires_python = ">=3.6"
+summary = "\"SSE plugin for Starlette\""
+dependencies = [
+    "starlette",
+]
+
+[[package]]
 name = "stack-data"
 version = "0.2.0"
 summary = "Extract data from python stack frames and tracebacks for informative displays"
@@ -1136,7 +1145,7 @@
 
 [metadata]
 lock_version = "3.1"
-content_hash = "sha256:d0af3ae9f90e4ec5ef0d136dcf10cdacdaa1f4d69488639235aa81db15520c5f"
+content_hash = "sha256:d15be600b685c807bd0ff133a19705e8c3a9b741f371deba462783e1ef3691e5"
 
 [metadata.files]
 "aiohttp 3.8.1" = [
@@ -2137,6 +2146,10 @@
     {file = "sniffio-1.2.0-py3-none-any.whl", hash = "sha256:471b71698eac1c2112a40ce2752bb2f4a4814c22a54a3eed3676bc0f5ca9f663"},
     {file = "sniffio-1.2.0.tar.gz", hash = "sha256:c4666eecec1d3f50960c6bdf61ab7bc350648da6c126e3cf6898d8cd4ddcd3de"},
 ]
+"sse-starlette 0.10.3" = [
+    {file = "sse_starlette-0.10.3-py3-none-any.whl", hash = "sha256:ca2de945af80b83f1efda6144df9e13db83880b3b87c660044b64f199395e8b7"},
+    {file = "sse-starlette-0.10.3.tar.gz", hash = "sha256:840607fed361360cc76f8408a25f0eca887e7cab3c3ee44f9762f179428e2bd4"},
+]
 "stack-data 0.2.0" = [
     {file = "stack_data-0.2.0-py3-none-any.whl", hash = "sha256:999762f9c3132308789affa03e9271bbbe947bf78311851f4d485d8402ed858e"},
     {file = "stack_data-0.2.0.tar.gz", hash = "sha256:45692d41bd633a9503a5195552df22b583caf16f0b27c4e58c98d88c8b648e12"},
--- a/pyproject.toml	Tue May 31 02:05:13 2022 -0700
+++ b/pyproject.toml	Tue May 31 02:05:47 2022 -0700
@@ -44,6 +44,7 @@
     "PyGObject>=3.42.1",
     "aiohttp>=3.8.1",
     "rdfdb @ https://projects.bigasterisk.com/rdfdb/rdfdb-0.23.0.tar.gz",
+    "sse-starlette>=0.10.3",
 ]
 requires-python = ">=3.9"