diff --git a/light9/effect/sequencer/sequencer.py b/light9/effect/sequencer/sequencer.py
--- a/light9/effect/sequencer/sequencer.py
+++ b/light9/effect/sequencer/sequencer.py
@@ -2,7 +2,8 @@
copies from effectloop.py, which this should replace
'''
-from louie import dispatcher
+import asyncio
+from louie import dispatcher,All
from rdflib import URIRef
from twisted.internet import reactor
from twisted.internet import defer
@@ -12,7 +13,7 @@ from twisted.python.filepath import File
import logging, bisect, time
import traceback
from decimal import Decimal
-from typing import Any, Callable, Dict, List, Tuple, cast, Union
+from typing import Any, Callable, Coroutine, Dict, List, Tuple, cast, Union
from light9.ascoltami.musictime_client import MusicTime
from light9.effect import effecteval
@@ -28,6 +29,9 @@ import imp
log = logging.getLogger('sequencer')
+class StateUpdate(All):
+ pass
+
def pyType(n):
ret = n.toPython()
if isinstance(ret, Decimal):
@@ -147,8 +151,9 @@ class Sequencer(object):
def __init__(self,
graph: SyncedGraph,
- sendToCollector: Callable[[DeviceSettings], Deferred],
- fps=40):
+ sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
+ fps=40,
+ ):
self.graph = graph
self.fps = fps
metrics('update_loop_goal_fps').set(self.fps)
@@ -164,8 +169,8 @@ class Sequencer(object):
self.graph.addHandler(self.compileGraph)
self.lastLoopSucceeded = False
- self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
- self.updateLoop()
+ # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
+ asyncio.create_task(self.updateLoop())
def onCodeChange(self):
log.debug('seq.onCodeChange')
@@ -198,38 +203,40 @@ class Sequencer(object):
if not anyErrors:
log.info('built all notes')
- @inlineCallbacks
- def updateLoop(self) -> None:
- frameStart = time.time()
- try:
- sec = yield self.update()
- except Exception as e:
- self.lastLoopSucceeded = False
- traceback.print_exc()
- log.warn('updateLoop: %r', e)
- reactor.callLater(1, self.updateLoop)
- else:
- took = time.time() - frameStart
- metrics('update_loop_latency').observe(took)
+ async def updateLoop(self):
+ while True:
+ frameStart = time.time()
+ try:
+ sec = await self.update()
+ except Exception as e:
+ self.lastLoopSucceeded = False
+ traceback.print_exc()
+ log.warn('updateLoop: %r', e)
+ await asyncio.sleep(1)
+ continue
+ else:
+ took = time.time() - frameStart
+ metrics('update_loop_latency').observe(took)
- if not self.lastLoopSucceeded:
- log.info('Sequencer.update is working')
- self.lastLoopSucceeded = True
+ if not self.lastLoopSucceeded:
+ log.info('Sequencer.update is working')
+ self.lastLoopSucceeded = True
- delay = max(0, 1 / self.fps - took)
- reactor.callLater(delay, self.updateLoop)
+ delay = max(0, 1 / self.fps - took)
+ await asyncio.sleep(delay)
+ continue
@metrics('update_call').time()
- @inlineCallbacks
- def update(self) -> Deferred:
-
+ async def update(self):
with metrics('update_s0_getMusic').time():
- musicState = self.music.getLatest()
+ musicState = {'t':123.0,'song':'http://light9.bigasterisk.com/show/dance2019/song5'}#self.music.getLatest()
if not musicState.get('song') or not isinstance(
musicState.get('t'), float):
- return defer.succeed(0.0)
+ return
song = Song(URIRef(musicState['song']))
- dispatcher.send('state',
+ # print('dispsend')
+ # import pdb;pdb.set_trace()
+ dispatcher.send(StateUpdate,
update={
'song': str(song),
't': musicState['t']
@@ -248,11 +255,10 @@ class Sequencer(object):
noteReports.append(report)
settings.append(s)
devSettings = DeviceSettings.fromList(self.graph, settings)
-
- dispatcher.send('state', update={'songNotes': noteReports})
+ dispatcher.send(StateUpdate, update={'songNotes': noteReports})
with metrics('update_s3_send').time(): # our measurement
- sendSecs = yield self.sendToCollector(devSettings)
+ sendSecs = await self.sendToCollector(devSettings)
# sendToCollector's own measurement.
# (sometimes it's None, not sure why, and neither is mypy)
diff --git a/light9/effect/sequencer/service.py b/light9/effect/sequencer/service.py
--- a/light9/effect/sequencer/service.py
+++ b/light9/effect/sequencer/service.py
@@ -2,76 +2,64 @@
plays back effect notes from the timeline
"""
-import functools
+import asyncio
+import json
+import logging
+import time
+
+from light9 import networking
+from light9.collector.collector_client_asyncio import sendToCollector
+from light9.effect.sequencer.sequencer import StateUpdate, Sequencer
+from light9.effect.settings import DeviceSettings
+from light9.metrics import metrics, metricsRoute
from light9.run_local import log
-from twisted.internet import reactor
-from light9.metrics import metrics, metricsRoute
+from louie import dispatcher
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
-from light9 import networking, showconfig
-import optparse, sys, logging
-import cyclone.web
-from rdflib import URIRef
-from light9.effect.sequencer.sequencer import Sequencer
-from light9.collector.collector_client import sendToCollector
-
-from light9 import clientsession
-
-from prometheus_client import Summary
-from rdfdb.syncedgraph.syncedgraph import SyncedGraph
+from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
-from starlette.endpoints import WebSocketEndpoint
-from starlette.responses import Response
-from starlette.routing import Route, WebSocketRoute
+from starlette.routing import Route
from starlette.types import Receive, Scope, Send
-from starlette.websockets import WebSocket
from starlette_exporter import PrometheusMiddleware, handle_metrics
-# class Updates(cyclone.sse.SSEHandler):
+async def changes():
+ state = {}
+ q = asyncio.Queue()
-# def __init__(self, application, request, **kwargs) -> None:
-# cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
-# self.state: Dict = {}
-# dispatcher.connect(self.updateState, 'state')
-# self.numConnected = 0
+ def onBroadcast(update):
+ state.update(update)
+ q.put_nowait(None)
-# def updateState(self, update: Dict):
-# self.state.update(update)
+ dispatcher.connect(onBroadcast, StateUpdate)
-# def bind(self) -> None:
-# self.numConnected += 1
-
-# if self.numConnected == 1:
-# self.loop()
+ lastSend = 0
+ while True:
+ await q.get()
+ now = time.time()
+ if now > lastSend + .2:
+ lastSend = now
+ yield json.dumps(state)
-# def loop(self) -> None:
-# if self.numConnected == 0:
-# return
-# self.sendEvent(self.state)
-# reactor.callLater(.1, self.loop)
-# def unbind(self) -> None:
-# self.numConnected -= 1
+async def send_page_updates(request):
+ return EventSourceResponse(changes())
+
def main():
- # session = clientsession.getUri('effectSequencer', options)
+ session = 'effectSequencer'
graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
+ logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
+ logging.getLogger('sse_starlette.sse').setLevel(logging.INFO)
- seq = Sequencer(
- graph,
- lambda settings: sendToCollector(
- 'effectSequencer',
- '',#session,
- settings,
- # This seems to be safe here (and lets us get from
- # 20fpx to 40fpx), even though it leads to big stalls
- # if I use it on KC.
- useZmq=True))
+ async def send(settings: DeviceSettings):
+ await sendToCollector('effectSequencer', session, settings)
+
+ seq = Sequencer(graph, send)
app = Starlette(
debug=True,
routes=[
- # WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
+ Route('/updates', endpoint=send_page_updates),
],
)
diff --git a/light9/effect/sequencer/web/Light9SequencerUi.ts b/light9/effect/sequencer/web/Light9SequencerUi.ts
--- a/light9/effect/sequencer/web/Light9SequencerUi.ts
+++ b/light9/effect/sequencer/web/Light9SequencerUi.ts
@@ -74,14 +74,16 @@ export class Light9SequencerUi extends L
`,
];
render() {
- return html`
-