Changeset - 0e7d8349087c
[Not reviewed]
default
0 2 0
drewp@bigasterisk.com - 3 years ago 2022-05-31 07:13:20
drewp@bigasterisk.com
WIP port effectSequencer to asyncio
2 files changed with 61 insertions and 89 deletions:
0 comments (0 inline, 0 general)
light9/effect/sequencer/sequencer.py
Show inline comments
 
'''
 
copies from effectloop.py, which this should replace
 
'''
 

	
 
from louie import dispatcher
 
from rdflib import URIRef
 
from twisted.internet import reactor
 
from twisted.internet import defer
 
from twisted.internet.defer import Deferred, inlineCallbacks
 
from twisted.internet.inotify import INotify
 
from twisted.python.filepath import FilePath
 
import cyclone.sse
 
import logging, bisect, time
 
import traceback
 
from decimal import Decimal
 
from typing import Any, Callable, Dict, List, Tuple, cast, Union
 

	
 
from light9.ascoltami.musictime_client import MusicTime
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
@@ -250,39 +249,12 @@ class Sequencer(object):
 
                settings.append(s)
 
            devSettings = DeviceSettings.fromList(self.graph, settings)
 

	
 
        dispatcher.send('state', update={'songNotes': noteReports})
 

	
 
        with metrics('update_s3_send').time():  # our measurement
 
            sendSecs = yield self.sendToCollector(devSettings)
 

	
 
        # sendToCollector's own measurement.
 
        # (sometimes it's None, not sure why, and neither is mypy)
 
        #if isinstance(sendSecs, float):
 
        #    metrics('update_s3_send_client').observe(sendSecs)
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
 

	
 
    def __init__(self, application, request, **kwargs) -> None:
 
        cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
        self.state: Dict = {}
 
        dispatcher.connect(self.updateState, 'state')
 
        self.numConnected = 0
 

	
 
    def updateState(self, update: Dict):
 
        self.state.update(update)
 

	
 
    def bind(self) -> None:
 
        self.numConnected += 1
 

	
 
        if self.numConnected == 1:
 
            self.loop()
 

	
 
    def loop(self) -> None:
 
        if self.numConnected == 0:
 
            return
 
        self.sendEvent(self.state)
 
        reactor.callLater(.1, self.loop)
 

	
 
    def unbind(self) -> None:
 
        self.numConnected -= 1
light9/effect/sequencer/service.py
Show inline comments
 
@@ -7,78 +7,78 @@ from light9.run_local import log
 
from twisted.internet import reactor
 
from light9.metrics import metrics, metricsRoute
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from light9 import networking, showconfig
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
from light9.effect.sequencer.sequencer import Sequencer
 
from light9.collector.collector_client import sendToCollector
 

	
 
from light9 import clientsession
 

	
 

	
 
class App(object):
 

	
 
    def __init__(self, show, session):
 
        self.show = show
 
        self.session = session
 

	
 
        self.graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
        self.graph.initiallySynced.addCallback(self.launch)
 

	
 
    def launch(self, *args):
 
        self.seq = Sequencer(
 
            self.graph,
 
            lambda settings: sendToCollector(
 
                'effectSequencer',
 
                self.session,
 
                settings,
 
                # This seems to be safe here (and lets us get from
 
                # 20fpx to 40fpx), even though it leads to big stalls
 
                # if I use it on KC.
 
                useZmq=True))
 

	
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/()', cyclone.web.StaticFileHandler, {
 
                "path": "light9/effect/",
 
                "default_filename": "sequencer.html"
 
            }),
 
            (r'/updates', Updates),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  seq=self.seq,
 
                                                  graph=self.graph,
 
                                                #   stats=self.stats
 
                                                  )
 
        reactor.listenTCP(networking.effectSequencer.port, self.cycloneApp)
 
        log.info("listening on %s" % networking.effectSequencer.port)
 
from prometheus_client import Summary
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from starlette.applications import Starlette
 
from starlette.endpoints import WebSocketEndpoint
 
from starlette.responses import Response
 
from starlette.routing import Route, WebSocketRoute
 
from starlette.types import Receive, Scope, Send
 
from starlette.websockets import WebSocket
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 

	
 
if __name__ == "__main__":
 
    parser = optparse.OptionParser()
 
    parser.add_option(
 
        '--show',
 
        help='show URI, like http://light9.bigasterisk.com/show/dance2008',
 
        default=showconfig.showUri())
 
    parser.add_option("-v",
 
                      "--verbose",
 
                      action="store_true",
 
                      help="logging.DEBUG")
 
    parser.add_option("--twistedlog",
 
                      action="store_true",
 
                      help="twisted logging")
 
    clientsession.add_option(parser)
 
    (options, args) = parser.parse_args()
 
    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 
# class Updates(cyclone.sse.SSEHandler):
 

	
 
#     def __init__(self, application, request, **kwargs) -> None:
 
#         cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
#         self.state: Dict = {}
 
#         dispatcher.connect(self.updateState, 'state')
 
#         self.numConnected = 0
 

	
 
#     def updateState(self, update: Dict):
 
#         self.state.update(update)
 

	
 
#     def bind(self) -> None:
 
#         self.numConnected += 1
 

	
 
#         if self.numConnected == 1:
 
#             self.loop()
 

	
 
#     def loop(self) -> None:
 
#         if self.numConnected == 0:
 
#             return
 
#         self.sendEvent(self.state)
 
#         reactor.callLater(.1, self.loop)
 

	
 
#     def unbind(self) -> None:
 
#         self.numConnected -= 1
 

	
 
    if not options.show:
 
        raise ValueError("missing --show http://...")
 
def main():
 
    # session = clientsession.getUri('effectSequencer', options)
 
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 

	
 
    session = clientsession.getUri('effectSequencer', options)
 
    seq = Sequencer(
 
        graph,
 
        lambda settings: sendToCollector(
 
            'effectSequencer',
 
            '',#session,
 
            settings,
 
            # This seems to be safe here (and lets us get from
 
            # 20fpx to 40fpx), even though it leads to big stalls
 
            # if I use it on KC.
 
            useZmq=True))
 

	
 
    app = App(URIRef(options.show), session)
 
    if options.twistedlog:
 
        from twisted.python import log as twlog
 
        twlog.startLogging(sys.stderr)
 
    reactor.run()
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            # WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    return app
 

	
 

	
 
app = main()
0 comments (0 inline, 0 general)