Changeset - f1df317f7c4c
[Not reviewed]
default
0 5 0
drewp@bigasterisk.com - 3 years ago 2022-05-31 09:05:47
drewp@bigasterisk.com
effectSequencer mostly ported to asyncio
5 files changed with 109 insertions and 96 deletions:
0 comments (0 inline, 0 general)
light9/effect/sequencer/sequencer.py
Show inline comments
 
'''
 
copies from effectloop.py, which this should replace
 
'''
 

	
 
from louie import dispatcher
 
import asyncio
 
from louie import dispatcher,All
 
from rdflib import URIRef
 
from twisted.internet import reactor
 
from twisted.internet import defer
 
from twisted.internet.defer import Deferred, inlineCallbacks
 
from twisted.internet.inotify import INotify
 
from twisted.python.filepath import FilePath
 
import logging, bisect, time
 
import traceback
 
from decimal import Decimal
 
from typing import Any, Callable, Dict, List, Tuple, cast, Union
 
from typing import Any, Callable, Coroutine, Dict, List, Tuple, cast, Union
 

	
 
from light9.ascoltami.musictime_client import MusicTime
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.namespaces import L9, RDF
 
@@ -25,12 +26,15 @@ from light9.metrics import metrics
 

	
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 

	
 

	
 
class StateUpdate(All):
 
    pass
 

	
 
def pyType(n):
 
    ret = n.toPython()
 
    if isinstance(ret, Decimal):
 
        return float(ret)
 
    return ret
 

	
 
@@ -144,14 +148,15 @@ class CodeWatcher(object):
 

	
 

	
 
class Sequencer(object):
 

	
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], Deferred],
 
                 fps=40):
 
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
 
                 fps=40,
 
                 ):
 
        self.graph = graph
 
        self.fps = fps
 
        metrics('update_loop_goal_fps').set(self.fps)
 
        metrics('update_loop_goal_latency').set(1 / self.fps)
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 
@@ -161,14 +166,14 @@ class Sequencer(object):
 
        self._compileGraphCall = None
 
        self.notes: Dict[Song, List[Note]] = {}  # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.lastLoopSucceeded = False
 

	
 
        self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        self.updateLoop()
 
        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        asyncio.create_task(self.updateLoop())
 

	
 
    def onCodeChange(self):
 
        log.debug('seq.onCodeChange')
 
        self.graph.addHandler(self.compileGraph)
 
        #self.updateLoop()
 

	
 
@@ -195,44 +200,46 @@ class Sequencer(object):
 
                anyErrors = True
 
                continue
 
            self.notes[song].append(n)
 
        if not anyErrors:
 
            log.info('built all notes')
 

	
 
    @inlineCallbacks
 
    def updateLoop(self) -> None:
 
        frameStart = time.time()
 
        try:
 
            sec = yield self.update()
 
        except Exception as e:
 
            self.lastLoopSucceeded = False
 
            traceback.print_exc()
 
            log.warn('updateLoop: %r', e)
 
            reactor.callLater(1, self.updateLoop)
 
        else:
 
            took = time.time() - frameStart
 
            metrics('update_loop_latency').observe(took)
 
    async def updateLoop(self):
 
        while True:
 
            frameStart = time.time()
 
            try:
 
                sec = await self.update()
 
            except Exception as e:
 
                self.lastLoopSucceeded = False
 
                traceback.print_exc()
 
                log.warn('updateLoop: %r', e)
 
                await asyncio.sleep(1)
 
                continue
 
            else:
 
                took = time.time() - frameStart
 
                metrics('update_loop_latency').observe(took)
 

	
 
            if not self.lastLoopSucceeded:
 
                log.info('Sequencer.update is working')
 
                self.lastLoopSucceeded = True
 
                if not self.lastLoopSucceeded:
 
                    log.info('Sequencer.update is working')
 
                    self.lastLoopSucceeded = True
 

	
 
            delay = max(0, 1 / self.fps - took)
 
            reactor.callLater(delay, self.updateLoop)
 
                delay = max(0, 1 / self.fps - took)
 
                await asyncio.sleep(delay)
 
                continue
 

	
 
    @metrics('update_call').time()
 
    @inlineCallbacks
 
    def update(self) -> Deferred:
 

	
 
    async def update(self):
 
        with metrics('update_s0_getMusic').time():
 
            musicState = self.music.getLatest()
 
            musicState = {'t':123.0,'song':'http://light9.bigasterisk.com/show/dance2019/song5'}#self.music.getLatest()
 
            if not musicState.get('song') or not isinstance(
 
                    musicState.get('t'), float):
 
                return defer.succeed(0.0)
 
                return
 
            song = Song(URIRef(musicState['song']))
 
            dispatcher.send('state',
 
            # print('dispsend')
 
            # import pdb;pdb.set_trace()
 
            dispatcher.send(StateUpdate,
 
                            update={
 
                                'song': str(song),
 
                                't': musicState['t']
 
                            })
 

	
 
        with metrics('update_s1_eval').time():
 
@@ -245,16 +252,15 @@ class Sequencer(object):
 
                except Exception:
 
                    traceback.print_exc()
 
                    raise
 
                noteReports.append(report)
 
                settings.append(s)
 
            devSettings = DeviceSettings.fromList(self.graph, settings)
 

	
 
        dispatcher.send('state', update={'songNotes': noteReports})
 
        dispatcher.send(StateUpdate, update={'songNotes': noteReports})
 

	
 
        with metrics('update_s3_send').time():  # our measurement
 
            sendSecs = yield self.sendToCollector(devSettings)
 
            sendSecs = await self.sendToCollector(devSettings)
 

	
 
        # sendToCollector's own measurement.
 
        # (sometimes it's None, not sure why, and neither is mypy)
 
        #if isinstance(sendSecs, float):
 
        #    metrics('update_s3_send_client').observe(sendSecs)
light9/effect/sequencer/service.py
Show inline comments
 
"""
 
plays back effect notes from the timeline
 
"""
 

	
 
import functools
 
import asyncio
 
import json
 
import logging
 
import time
 

	
 
from light9 import networking
 
from light9.collector.collector_client_asyncio import sendToCollector
 
from light9.effect.sequencer.sequencer import StateUpdate, Sequencer
 
from light9.effect.settings import DeviceSettings
 
from light9.metrics import metrics, metricsRoute
 
from light9.run_local import log
 
from twisted.internet import reactor
 
from light9.metrics import metrics, metricsRoute
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from light9 import networking, showconfig
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
from light9.effect.sequencer.sequencer import Sequencer
 
from light9.collector.collector_client import sendToCollector
 

	
 
from light9 import clientsession
 

	
 
from prometheus_client import Summary
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.applications import Starlette
 
from starlette.endpoints import WebSocketEndpoint
 
from starlette.responses import Response
 
from starlette.routing import Route, WebSocketRoute
 
from starlette.routing import Route
 
from starlette.types import Receive, Scope, Send
 
from starlette.websockets import WebSocket
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 

	
 
# class Updates(cyclone.sse.SSEHandler):
 
async def changes():
 
    state = {}
 
    q = asyncio.Queue()
 

	
 
#     def __init__(self, application, request, **kwargs) -> None:
 
#         cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
#         self.state: Dict = {}
 
#         dispatcher.connect(self.updateState, 'state')
 
#         self.numConnected = 0
 
    def onBroadcast(update):
 
        state.update(update)
 
        q.put_nowait(None)
 

	
 
#     def updateState(self, update: Dict):
 
#         self.state.update(update)
 
    dispatcher.connect(onBroadcast, StateUpdate)
 

	
 
#     def bind(self) -> None:
 
#         self.numConnected += 1
 

	
 
#         if self.numConnected == 1:
 
#             self.loop()
 
    lastSend = 0
 
    while True:
 
        await q.get()
 
        now = time.time()
 
        if now > lastSend + .2:
 
            lastSend = now
 
            yield json.dumps(state)
 

	
 
#     def loop(self) -> None:
 
#         if self.numConnected == 0:
 
#             return
 
#         self.sendEvent(self.state)
 
#         reactor.callLater(.1, self.loop)
 

	
 
#     def unbind(self) -> None:
 
#         self.numConnected -= 1
 
async def send_page_updates(request):
 
    return EventSourceResponse(changes())
 

	
 

	
 
def main():
 
    # session = clientsession.getUri('effectSequencer', options)
 
    session = 'effectSequencer'
 
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('sse_starlette.sse').setLevel(logging.INFO)
 

	
 
    seq = Sequencer(
 
        graph,
 
        lambda settings: sendToCollector(
 
            'effectSequencer',
 
            '',#session,
 
            settings,
 
            # This seems to be safe here (and lets us get from
 
            # 20fpx to 40fpx), even though it leads to big stalls
 
            # if I use it on KC.
 
            useZmq=True))
 
    async def send(settings: DeviceSettings):
 
        await sendToCollector('effectSequencer', session, settings)
 

	
 
    seq = Sequencer(graph, send)
 

	
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            # WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
 
            Route('/updates', endpoint=send_page_updates),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
light9/effect/sequencer/web/Light9SequencerUi.ts
Show inline comments
 
@@ -71,20 +71,22 @@ export class Light9SequencerUi extends L
 
        display: inline-block;
 
        min-width: 4em;
 
      }
 
    `,
 
  ];
 
  render() {
 
    return html`
 
      <rdfdb-synced-graph></rdfdb-synced-graph>
 
    return [
 
      html` <rdfdb-synced-graph></rdfdb-synced-graph>
 

	
 
        <h1>Sequencer <a href="metrics">[metrics]</a></h1>
 

	
 
      <h1>Sequencer <a href="metrics/">[metrics]</a></h1>
 
        <h2>Song</h2>`,
 
      this.report
 
        ? html`
 

	
 
      <h2>Song</h2>
 

	
 
      <resource-display uri=${this.report.songUri}"></resource-display>
 
      <resource-display .uri=${this.graph.Uri(this.report.song)}"></resource-display>
 
      t=${this.report.roundT}
 

	
 
      <h3>Notes</h3>
 

	
 
      <table>
 
        <tr>
 
@@ -92,42 +94,45 @@ export class Light9SequencerUi extends L
 
          <th>Effect class</th>
 
          <th>Effect settings</th>
 
          <th>Devices affected</th>
 
        </tr>
 
        ${this.report.songNotes.map(
 
          (item: Note) => html`
 
            <tr class$="${item.rowClass}">
 
              <td><resource-display uri="${item.note}"></resource-display></td>
 
              <td><resource-display uri="${item.effectClass}"></resource-display></td>
 
            <tr class="${item.rowClass}">
 
              <td><resource-display .uri="${this.graph.Uri(item.note)}"></resource-display></td>
 
              <td><resource-display .uri="${this.graph.Uri(item.effectClass)}"></resource-display></td>
 
              <td>
 
                ${item.effectSettingsPairs.map(
 
                  (item) => html`
 
                    <div>
 
                      <span class="effectSetting">
 
                        <resource-display uri="${item.effectAttr}"></resource-display>:
 
                        <resource-display .uri="${this.graph.Uri(item.effectAttr)}"></resource-display>:
 
                        <span class="number">${item.value}</span>
 
                      </span>
 
                    </div>
 
                  `
 
                )}
 
              </td>
 
              <td>${item.devicesAffected}</td>
 
            </tr>
 
          `
 
        )}
 
      </table>
 
    `;
 
    `
 
        : html`waiting for first report...`,
 
    ];
 
  }
 

	
 
  graph!: SyncedGraph;
 
  @property() report!: Report;
 

	
 
  constructor() {
 
    super();
 
    getTopGraph().then((g) => {
 
      var source = new EventSource("./api/updates");
 
      this.graph = g;
 
      const source = new EventSource("./api/updates");
 
      source.addEventListener("message", this.onMessage.bind(this));
 
    });
 
  }
 
  onMessage(ev: MessageEvent) {
 
    const report = JSON.parse(ev.data) as Report;
 
    report.roundT = Math.floor((report.t || 0) * 1000) / 1000;
pdm.lock
Show inline comments
 
@@ -850,12 +850,21 @@ summary = "Python 2 and 3 compatibility 
 
name = "sniffio"
 
version = "1.2.0"
 
requires_python = ">=3.5"
 
summary = "Sniff out which async library your code is running under"
 

	
 
[[package]]
 
name = "sse-starlette"
 
version = "0.10.3"
 
requires_python = ">=3.6"
 
summary = "\"SSE plugin for Starlette\""
 
dependencies = [
 
    "starlette",
 
]
 

	
 
[[package]]
 
name = "stack-data"
 
version = "0.2.0"
 
summary = "Extract data from python stack frames and tracebacks for informative displays"
 
dependencies = [
 
    "asttokens",
 
    "executing",
 
@@ -1133,13 +1142,13 @@ summary = "Interfaces for Python"
 
dependencies = [
 
    "setuptools",
 
]
 

	
 
[metadata]
 
lock_version = "3.1"
 
content_hash = "sha256:d0af3ae9f90e4ec5ef0d136dcf10cdacdaa1f4d69488639235aa81db15520c5f"
 
content_hash = "sha256:d15be600b685c807bd0ff133a19705e8c3a9b741f371deba462783e1ef3691e5"
 

	
 
[metadata.files]
 
"aiohttp 3.8.1" = [
 
    {file = "aiohttp-3.8.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:1ed0b6477896559f17b9eaeb6d38e07f7f9ffe40b9f0f9627ae8b9926ae260a8"},
 
    {file = "aiohttp-3.8.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7dadf3c307b31e0e61689cbf9e06be7a867c563d5a63ce9dca578f956609abf8"},
 
    {file = "aiohttp-3.8.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:a79004bb58748f31ae1cbe9fa891054baaa46fb106c2dc7af9f8e3304dc30316"},
 
@@ -2134,12 +2143,16 @@ content_hash = "sha256:d0af3ae9f90e4ec5e
 
    {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
 
]
 
"sniffio 1.2.0" = [
 
    {file = "sniffio-1.2.0-py3-none-any.whl", hash = "sha256:471b71698eac1c2112a40ce2752bb2f4a4814c22a54a3eed3676bc0f5ca9f663"},
 
    {file = "sniffio-1.2.0.tar.gz", hash = "sha256:c4666eecec1d3f50960c6bdf61ab7bc350648da6c126e3cf6898d8cd4ddcd3de"},
 
]
 
"sse-starlette 0.10.3" = [
 
    {file = "sse_starlette-0.10.3-py3-none-any.whl", hash = "sha256:ca2de945af80b83f1efda6144df9e13db83880b3b87c660044b64f199395e8b7"},
 
    {file = "sse-starlette-0.10.3.tar.gz", hash = "sha256:840607fed361360cc76f8408a25f0eca887e7cab3c3ee44f9762f179428e2bd4"},
 
]
 
"stack-data 0.2.0" = [
 
    {file = "stack_data-0.2.0-py3-none-any.whl", hash = "sha256:999762f9c3132308789affa03e9271bbbe947bf78311851f4d485d8402ed858e"},
 
    {file = "stack_data-0.2.0.tar.gz", hash = "sha256:45692d41bd633a9503a5195552df22b583caf16f0b27c4e58c98d88c8b648e12"},
 
]
 
"starlette 0.19.0" = [
 
    {file = "starlette-0.19.0-py3-none-any.whl", hash = "sha256:de752c8f6c2ac6ef78bfe44058fc61dc04895eba24d4e47d2ae254ba5c125c5e"},
pyproject.toml
Show inline comments
 
@@ -41,12 +41,13 @@ dependencies = [
 
    "starlette[standard]>=0.19.0",
 
    "prometheus-client>=0.14.1",
 
    "starlette-exporter>=0.12.0",
 
    "PyGObject>=3.42.1",
 
    "aiohttp>=3.8.1",
 
    "rdfdb @ https://projects.bigasterisk.com/rdfdb/rdfdb-0.23.0.tar.gz",
 
    "sse-starlette>=0.10.3",
 
]
 
requires-python = ">=3.9"
 

	
 
[project.urls]
 
Homepage = ""
 

	
0 comments (0 inline, 0 general)