Changeset - f1df317f7c4c
[Not reviewed]
default
0 5 0
drewp@bigasterisk.com - 3 years ago 2022-05-31 09:05:47
drewp@bigasterisk.com
effectSequencer mostly ported to asyncio
5 files changed with 96 insertions and 83 deletions:
0 comments (0 inline, 0 general)
light9/effect/sequencer/sequencer.py
Show inline comments
 
'''
 
copies from effectloop.py, which this should replace
 
'''
 

	
 
from louie import dispatcher
 
import asyncio
 
from louie import dispatcher,All
 
from rdflib import URIRef
 
from twisted.internet import reactor
 
from twisted.internet import defer
 
from twisted.internet.defer import Deferred, inlineCallbacks
 
from twisted.internet.inotify import INotify
 
from twisted.python.filepath import FilePath
 
import logging, bisect, time
 
import traceback
 
from decimal import Decimal
 
from typing import Any, Callable, Dict, List, Tuple, cast, Union
 
from typing import Any, Callable, Coroutine, Dict, List, Tuple, cast, Union
 

	
 
from light9.ascoltami.musictime_client import MusicTime
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from light9.metrics import metrics
 

	
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 

	
 

	
 
class StateUpdate(All):
 
    pass
 

	
 
def pyType(n):
 
    ret = n.toPython()
 
    if isinstance(ret, Decimal):
 
        return float(ret)
 
    return ret
 

	
 

	
 
class Note(object):
 

	
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule,
 
                 simpleOutputs):
 
        g = self.graph = graph
 
@@ -138,43 +142,44 @@ class CodeWatcher(object):
 
            log.info("reload effecteval")
 
            imp.reload(effecteval)
 
            self.onChange()
 

	
 
        # in case we got an event at the start of the write
 
        reactor.callLater(.1, go)
 

	
 

	
 
class Sequencer(object):
 

	
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], Deferred],
 
                 fps=40):
 
                 sendToCollector: Callable[[DeviceSettings], Coroutine[None ,None,None]],
 
                 fps=40,
 
                 ):
 
        self.graph = graph
 
        self.fps = fps
 
        metrics('update_loop_goal_fps').set(self.fps)
 
        metrics('update_loop_goal_latency').set(1 / self.fps)
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 

	
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
        self._compileGraphCall = None
 
        self.notes: Dict[Song, List[Note]] = {}  # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.lastLoopSucceeded = False
 

	
 
        self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        self.updateLoop()
 
        # self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        asyncio.create_task(self.updateLoop())
 

	
 
    def onCodeChange(self):
 
        log.debug('seq.onCodeChange')
 
        self.graph.addHandler(self.compileGraph)
 
        #self.updateLoop()
 

	
 
    @metrics('compile_graph').time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        for song in self.graph.subjects(RDF.type, L9['Song']):
 

	
 
            def compileSong(song: Song = cast(Song, song)) -> None:
 
@@ -189,72 +194,73 @@ class Sequencer(object):
 
        for note in self.graph.objects(song, L9['note']):
 
            try:
 
                n = Note(self.graph, NoteUri(cast(NoteUri, note)), effecteval,
 
                         self.simpleOutputs)
 
            except Exception:
 
                log.warn(f"failed to build Note {note} - skipping")
 
                anyErrors = True
 
                continue
 
            self.notes[song].append(n)
 
        if not anyErrors:
 
            log.info('built all notes')
 

	
 
    @inlineCallbacks
 
    def updateLoop(self) -> None:
 
    async def updateLoop(self):
 
        while True:
 
        frameStart = time.time()
 
        try:
 
            sec = yield self.update()
 
                sec = await self.update()
 
        except Exception as e:
 
            self.lastLoopSucceeded = False
 
            traceback.print_exc()
 
            log.warn('updateLoop: %r', e)
 
            reactor.callLater(1, self.updateLoop)
 
                await asyncio.sleep(1)
 
                continue
 
        else:
 
            took = time.time() - frameStart
 
            metrics('update_loop_latency').observe(took)
 

	
 
            if not self.lastLoopSucceeded:
 
                log.info('Sequencer.update is working')
 
                self.lastLoopSucceeded = True
 

	
 
            delay = max(0, 1 / self.fps - took)
 
            reactor.callLater(delay, self.updateLoop)
 
                await asyncio.sleep(delay)
 
                continue
 

	
 
    @metrics('update_call').time()
 
    @inlineCallbacks
 
    def update(self) -> Deferred:
 

	
 
    async def update(self):
 
        with metrics('update_s0_getMusic').time():
 
            musicState = self.music.getLatest()
 
            musicState = {'t':123.0,'song':'http://light9.bigasterisk.com/show/dance2019/song5'}#self.music.getLatest()
 
            if not musicState.get('song') or not isinstance(
 
                    musicState.get('t'), float):
 
                return defer.succeed(0.0)
 
                return
 
            song = Song(URIRef(musicState['song']))
 
            dispatcher.send('state',
 
            # print('dispsend')
 
            # import pdb;pdb.set_trace()
 
            dispatcher.send(StateUpdate,
 
                            update={
 
                                'song': str(song),
 
                                't': musicState['t']
 
                            })
 

	
 
        with metrics('update_s1_eval').time():
 
            settings = []
 
            songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
            noteReports = []
 
            for note in songNotes:
 
                try:
 
                    s, report = note.outputSettings(musicState['t'])
 
                except Exception:
 
                    traceback.print_exc()
 
                    raise
 
                noteReports.append(report)
 
                settings.append(s)
 
            devSettings = DeviceSettings.fromList(self.graph, settings)
 

	
 
        dispatcher.send('state', update={'songNotes': noteReports})
 
        dispatcher.send(StateUpdate, update={'songNotes': noteReports})
 

	
 
        with metrics('update_s3_send').time():  # our measurement
 
            sendSecs = yield self.sendToCollector(devSettings)
 
            sendSecs = await self.sendToCollector(devSettings)
 

	
 
        # sendToCollector's own measurement.
 
        # (sometimes it's None, not sure why, and neither is mypy)
 
        #if isinstance(sendSecs, float):
 
        #    metrics('update_s3_send_client').observe(sendSecs)
light9/effect/sequencer/service.py
Show inline comments
 
"""
 
plays back effect notes from the timeline
 
"""
 

	
 
import functools
 
import asyncio
 
import json
 
import logging
 
import time
 

	
 
from light9 import networking
 
from light9.collector.collector_client_asyncio import sendToCollector
 
from light9.effect.sequencer.sequencer import StateUpdate, Sequencer
 
from light9.effect.settings import DeviceSettings
 
from light9.metrics import metrics, metricsRoute
 
from light9.run_local import log
 
from twisted.internet import reactor
 
from light9.metrics import metrics, metricsRoute
 
from louie import dispatcher
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from light9 import networking, showconfig
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
from light9.effect.sequencer.sequencer import Sequencer
 
from light9.collector.collector_client import sendToCollector
 

	
 
from light9 import clientsession
 

	
 
from prometheus_client import Summary
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from sse_starlette.sse import EventSourceResponse
 
from starlette.applications import Starlette
 
from starlette.endpoints import WebSocketEndpoint
 
from starlette.responses import Response
 
from starlette.routing import Route, WebSocketRoute
 
from starlette.routing import Route
 
from starlette.types import Receive, Scope, Send
 
from starlette.websockets import WebSocket
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 

	
 
# class Updates(cyclone.sse.SSEHandler):
 
async def changes():
 
    state = {}
 
    q = asyncio.Queue()
 

	
 
#     def __init__(self, application, request, **kwargs) -> None:
 
#         cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
#         self.state: Dict = {}
 
#         dispatcher.connect(self.updateState, 'state')
 
#         self.numConnected = 0
 
    def onBroadcast(update):
 
        state.update(update)
 
        q.put_nowait(None)
 

	
 
#     def updateState(self, update: Dict):
 
#         self.state.update(update)
 
    dispatcher.connect(onBroadcast, StateUpdate)
 

	
 
#     def bind(self) -> None:
 
#         self.numConnected += 1
 

	
 
#         if self.numConnected == 1:
 
#             self.loop()
 
    lastSend = 0
 
    while True:
 
        await q.get()
 
        now = time.time()
 
        if now > lastSend + .2:
 
            lastSend = now
 
            yield json.dumps(state)
 

	
 
#     def loop(self) -> None:
 
#         if self.numConnected == 0:
 
#             return
 
#         self.sendEvent(self.state)
 
#         reactor.callLater(.1, self.loop)
 

	
 
#     def unbind(self) -> None:
 
#         self.numConnected -= 1
 
async def send_page_updates(request):
 
    return EventSourceResponse(changes())
 

	
 

	
 
def main():
 
    # session = clientsession.getUri('effectSequencer', options)
 
    session = 'effectSequencer'
 
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
 
    logging.getLogger('sse_starlette.sse').setLevel(logging.INFO)
 

	
 
    seq = Sequencer(
 
        graph,
 
        lambda settings: sendToCollector(
 
            'effectSequencer',
 
            '',#session,
 
            settings,
 
            # This seems to be safe here (and lets us get from
 
            # 20fpx to 40fpx), even though it leads to big stalls
 
            # if I use it on KC.
 
            useZmq=True))
 
    async def send(settings: DeviceSettings):
 
        await sendToCollector('effectSequencer', session, settings)
 

	
 
    seq = Sequencer(graph, send)
 

	
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            # WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
 
            Route('/updates', endpoint=send_page_updates),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    return app
 

	
 

	
 
app = main()
light9/effect/sequencer/web/Light9SequencerUi.ts
Show inline comments
 
@@ -65,75 +65,80 @@ export class Light9SequencerUi extends L
 
      .chart > div {
 
        background: #a4a54f;
 
        width: 8px;
 
        margin: 0 1px;
 
      }
 
      .number {
 
        display: inline-block;
 
        min-width: 4em;
 
      }
 
    `,
 
  ];
 
  render() {
 
    return html`
 
      <rdfdb-synced-graph></rdfdb-synced-graph>
 
    return [
 
      html` <rdfdb-synced-graph></rdfdb-synced-graph>
 

	
 
        <h1>Sequencer <a href="metrics">[metrics]</a></h1>
 

	
 
      <h1>Sequencer <a href="metrics/">[metrics]</a></h1>
 
        <h2>Song</h2>`,
 
      this.report
 
        ? html`
 

	
 
      <h2>Song</h2>
 

	
 
      <resource-display uri=${this.report.songUri}"></resource-display>
 
      <resource-display .uri=${this.graph.Uri(this.report.song)}"></resource-display>
 
      t=${this.report.roundT}
 

	
 
      <h3>Notes</h3>
 

	
 
      <table>
 
        <tr>
 
          <th>Note</th>
 
          <th>Effect class</th>
 
          <th>Effect settings</th>
 
          <th>Devices affected</th>
 
        </tr>
 
        ${this.report.songNotes.map(
 
          (item: Note) => html`
 
            <tr class$="${item.rowClass}">
 
              <td><resource-display uri="${item.note}"></resource-display></td>
 
              <td><resource-display uri="${item.effectClass}"></resource-display></td>
 
            <tr class="${item.rowClass}">
 
              <td><resource-display .uri="${this.graph.Uri(item.note)}"></resource-display></td>
 
              <td><resource-display .uri="${this.graph.Uri(item.effectClass)}"></resource-display></td>
 
              <td>
 
                ${item.effectSettingsPairs.map(
 
                  (item) => html`
 
                    <div>
 
                      <span class="effectSetting">
 
                        <resource-display uri="${item.effectAttr}"></resource-display>:
 
                        <resource-display .uri="${this.graph.Uri(item.effectAttr)}"></resource-display>:
 
                        <span class="number">${item.value}</span>
 
                      </span>
 
                    </div>
 
                  `
 
                )}
 
              </td>
 
              <td>${item.devicesAffected}</td>
 
            </tr>
 
          `
 
        )}
 
      </table>
 
    `;
 
    `
 
        : html`waiting for first report...`,
 
    ];
 
  }
 

	
 
  graph!: SyncedGraph;
 
  @property() report!: Report;
 

	
 
  constructor() {
 
    super();
 
    getTopGraph().then((g) => {
 
      var source = new EventSource("./api/updates");
 
      this.graph = g;
 
      const source = new EventSource("./api/updates");
 
      source.addEventListener("message", this.onMessage.bind(this));
 
    });
 
  }
 
  onMessage(ev: MessageEvent) {
 
    const report = JSON.parse(ev.data) as Report;
 
    report.roundT = Math.floor((report.t || 0) * 1000) / 1000;
 
    report.recentFps = Math.floor((report.recentFps || 0) * 10) / 10;
 
    report.recentDeltasStyle = (report.recentDeltas || []).map((dt) => {
 
      const height = Math.min(40, (dt / 0.085) * 20);
 
      return `height: ${height}px;`;
 
    });
 
    report.songUri = this.graph.Uri(report.song);
pdm.lock
Show inline comments
 
@@ -844,24 +844,33 @@ summary = "Easily download, build, insta
 
name = "six"
 
version = "1.16.0"
 
requires_python = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*"
 
summary = "Python 2 and 3 compatibility utilities"
 

	
 
[[package]]
 
name = "sniffio"
 
version = "1.2.0"
 
requires_python = ">=3.5"
 
summary = "Sniff out which async library your code is running under"
 

	
 
[[package]]
 
name = "sse-starlette"
 
version = "0.10.3"
 
requires_python = ">=3.6"
 
summary = "\"SSE plugin for Starlette\""
 
dependencies = [
 
    "starlette",
 
]
 

	
 
[[package]]
 
name = "stack-data"
 
version = "0.2.0"
 
summary = "Extract data from python stack frames and tracebacks for informative displays"
 
dependencies = [
 
    "asttokens",
 
    "executing",
 
    "pure-eval",
 
]
 

	
 
[[package]]
 
name = "standardservice"
 
version = "0.6.0"
 
@@ -1127,25 +1136,25 @@ dependencies = [
 

	
 
[[package]]
 
name = "zope.interface"
 
version = "5.4.0"
 
requires_python = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
 
summary = "Interfaces for Python"
 
dependencies = [
 
    "setuptools",
 
]
 

	
 
[metadata]
 
lock_version = "3.1"
 
content_hash = "sha256:d0af3ae9f90e4ec5ef0d136dcf10cdacdaa1f4d69488639235aa81db15520c5f"
 
content_hash = "sha256:d15be600b685c807bd0ff133a19705e8c3a9b741f371deba462783e1ef3691e5"
 

	
 
[metadata.files]
 
"aiohttp 3.8.1" = [
 
    {file = "aiohttp-3.8.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:1ed0b6477896559f17b9eaeb6d38e07f7f9ffe40b9f0f9627ae8b9926ae260a8"},
 
    {file = "aiohttp-3.8.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7dadf3c307b31e0e61689cbf9e06be7a867c563d5a63ce9dca578f956609abf8"},
 
    {file = "aiohttp-3.8.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:a79004bb58748f31ae1cbe9fa891054baaa46fb106c2dc7af9f8e3304dc30316"},
 
    {file = "aiohttp-3.8.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:12de6add4038df8f72fac606dff775791a60f113a725c960f2bab01d8b8e6b15"},
 
    {file = "aiohttp-3.8.1-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6f0d5f33feb5f69ddd57a4a4bd3d56c719a141080b445cbf18f238973c5c9923"},
 
    {file = "aiohttp-3.8.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:eaba923151d9deea315be1f3e2b31cc39a6d1d2f682f942905951f4e40200922"},
 
    {file = "aiohttp-3.8.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:099ebd2c37ac74cce10a3527d2b49af80243e2a4fa39e7bce41617fbc35fa3c1"},
 
    {file = "aiohttp-3.8.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:2e5d962cf7e1d426aa0e528a7e198658cdc8aa4fe87f781d039ad75dcd52c516"},
 
    {file = "aiohttp-3.8.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:fa0ffcace9b3aa34d205d8130f7873fcfefcb6a4dd3dd705b0dab69af6712642"},
 
@@ -2128,24 +2137,28 @@ content_hash = "sha256:d0af3ae9f90e4ec5e
 
"setuptools 62.0.0" = [
 
    {file = "setuptools-62.0.0-py3-none-any.whl", hash = "sha256:a65e3802053e99fc64c6b3b29c11132943d5b8c8facbcc461157511546510967"},
 
    {file = "setuptools-62.0.0.tar.gz", hash = "sha256:7999cbd87f1b6e1f33bf47efa368b224bed5e27b5ef2c4d46580186cbcb1a86a"},
 
]
 
"six 1.16.0" = [
 
    {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
 
    {file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
 
]
 
"sniffio 1.2.0" = [
 
    {file = "sniffio-1.2.0-py3-none-any.whl", hash = "sha256:471b71698eac1c2112a40ce2752bb2f4a4814c22a54a3eed3676bc0f5ca9f663"},
 
    {file = "sniffio-1.2.0.tar.gz", hash = "sha256:c4666eecec1d3f50960c6bdf61ab7bc350648da6c126e3cf6898d8cd4ddcd3de"},
 
]
 
"sse-starlette 0.10.3" = [
 
    {file = "sse_starlette-0.10.3-py3-none-any.whl", hash = "sha256:ca2de945af80b83f1efda6144df9e13db83880b3b87c660044b64f199395e8b7"},
 
    {file = "sse-starlette-0.10.3.tar.gz", hash = "sha256:840607fed361360cc76f8408a25f0eca887e7cab3c3ee44f9762f179428e2bd4"},
 
]
 
"stack-data 0.2.0" = [
 
    {file = "stack_data-0.2.0-py3-none-any.whl", hash = "sha256:999762f9c3132308789affa03e9271bbbe947bf78311851f4d485d8402ed858e"},
 
    {file = "stack_data-0.2.0.tar.gz", hash = "sha256:45692d41bd633a9503a5195552df22b583caf16f0b27c4e58c98d88c8b648e12"},
 
]
 
"starlette 0.19.0" = [
 
    {file = "starlette-0.19.0-py3-none-any.whl", hash = "sha256:de752c8f6c2ac6ef78bfe44058fc61dc04895eba24d4e47d2ae254ba5c125c5e"},
 
    {file = "starlette-0.19.0.tar.gz", hash = "sha256:4a1a92aa89dbacc3a4c694a2c112863e88449730ff99b421a9b71fb2213bcd9c"},
 
]
 
"starlette-exporter 0.12.0" = [
 
    {file = "starlette_exporter-0.12.0-py3-none-any.whl", hash = "sha256:8d9537e94edef0a2afc396dfdc37687aa95dd594d00dbdab72bdd9dba6c28222"},
 
    {file = "starlette_exporter-0.12.0.tar.gz", hash = "sha256:18d95d09cfb45427e6f54ae591982b5ef900aa150ce9b41e717675b18c5bdb74"},
 
]
pyproject.toml
Show inline comments
 
@@ -35,24 +35,25 @@ dependencies = [
 
    "webcolors>=1.11.1",
 
    "watchdog>=2.1.7",
 
    "standardservice @ https://projects.bigasterisk.com/standardservice/standardservice-0.6.0.tar.gz",
 
    "cycloneerr @ https://projects.bigasterisk.com/cycloneerr/cycloneerr-0.4.0.tar.gz",
 
    "web.py>=0.62",
 
    "uvicorn[standard]>=0.17.6",
 
    "starlette[standard]>=0.19.0",
 
    "prometheus-client>=0.14.1",
 
    "starlette-exporter>=0.12.0",
 
    "PyGObject>=3.42.1",
 
    "aiohttp>=3.8.1",
 
    "rdfdb @ https://projects.bigasterisk.com/rdfdb/rdfdb-0.23.0.tar.gz",
 
    "sse-starlette>=0.10.3",
 
]
 
requires-python = ">=3.9"
 

	
 
[project.urls]
 
Homepage = ""
 

	
 
[project.optional-dependencies]
 
[tool.pdm]
 

	
 
[tool.pdm.dev-dependencies]
 
dev = [
 
    "coverage",
0 comments (0 inline, 0 general)