Changeset - 0e7d8349087c
[Not reviewed]
default
0 2 0
drewp@bigasterisk.com - 3 years ago 2022-05-31 07:13:20
drewp@bigasterisk.com
WIP port effectSequencer to asyncio
2 files changed with 61 insertions and 89 deletions:
0 comments (0 inline, 0 general)
light9/effect/sequencer/sequencer.py
Show inline comments
 
@@ -9,7 +9,6 @@ from twisted.internet import defer
 
from twisted.internet.defer import Deferred, inlineCallbacks
 
from twisted.internet.inotify import INotify
 
from twisted.python.filepath import FilePath
 
import cyclone.sse
 
import logging, bisect, time
 
import traceback
 
from decimal import Decimal
 
@@ -259,30 +258,3 @@ class Sequencer(object):
 
        # (sometimes it's None, not sure why, and neither is mypy)
 
        #if isinstance(sendSecs, float):
 
        #    metrics('update_s3_send_client').observe(sendSecs)
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
 

	
 
    def __init__(self, application, request, **kwargs) -> None:
 
        cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
        self.state: Dict = {}
 
        dispatcher.connect(self.updateState, 'state')
 
        self.numConnected = 0
 

	
 
    def updateState(self, update: Dict):
 
        self.state.update(update)
 

	
 
    def bind(self) -> None:
 
        self.numConnected += 1
 

	
 
        if self.numConnected == 1:
 
            self.loop()
 

	
 
    def loop(self) -> None:
 
        if self.numConnected == 0:
 
            return
 
        self.sendEvent(self.state)
 
        reactor.callLater(.1, self.loop)
 

	
 
    def unbind(self) -> None:
 
        self.numConnected -= 1
light9/effect/sequencer/service.py
Show inline comments
 
@@ -16,69 +16,69 @@ from light9.collector.collector_client i
 

	
 
from light9 import clientsession
 

	
 

	
 
class App(object):
 

	
 
    def __init__(self, show, session):
 
        self.show = show
 
        self.session = session
 

	
 
        self.graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
        self.graph.initiallySynced.addCallback(self.launch)
 

	
 
    def launch(self, *args):
 
        self.seq = Sequencer(
 
            self.graph,
 
            lambda settings: sendToCollector(
 
                'effectSequencer',
 
                self.session,
 
                settings,
 
                # This seems to be safe here (and lets us get from
 
                # 20fpx to 40fpx), even though it leads to big stalls
 
                # if I use it on KC.
 
                useZmq=True))
 

	
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/()', cyclone.web.StaticFileHandler, {
 
                "path": "light9/effect/",
 
                "default_filename": "sequencer.html"
 
            }),
 
            (r'/updates', Updates),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  seq=self.seq,
 
                                                  graph=self.graph,
 
                                                #   stats=self.stats
 
                                                  )
 
        reactor.listenTCP(networking.effectSequencer.port, self.cycloneApp)
 
        log.info("listening on %s" % networking.effectSequencer.port)
 
from prometheus_client import Summary
 
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
 
from starlette.applications import Starlette
 
from starlette.endpoints import WebSocketEndpoint
 
from starlette.responses import Response
 
from starlette.routing import Route, WebSocketRoute
 
from starlette.types import Receive, Scope, Send
 
from starlette.websockets import WebSocket
 
from starlette_exporter import PrometheusMiddleware, handle_metrics
 

	
 

	
 
if __name__ == "__main__":
 
    parser = optparse.OptionParser()
 
    parser.add_option(
 
        '--show',
 
        help='show URI, like http://light9.bigasterisk.com/show/dance2008',
 
        default=showconfig.showUri())
 
    parser.add_option("-v",
 
                      "--verbose",
 
                      action="store_true",
 
                      help="logging.DEBUG")
 
    parser.add_option("--twistedlog",
 
                      action="store_true",
 
                      help="twisted logging")
 
    clientsession.add_option(parser)
 
    (options, args) = parser.parse_args()
 
    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 
# class Updates(cyclone.sse.SSEHandler):
 

	
 
#     def __init__(self, application, request, **kwargs) -> None:
 
#         cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
#         self.state: Dict = {}
 
#         dispatcher.connect(self.updateState, 'state')
 
#         self.numConnected = 0
 

	
 
#     def updateState(self, update: Dict):
 
#         self.state.update(update)
 

	
 
#     def bind(self) -> None:
 
#         self.numConnected += 1
 

	
 
#         if self.numConnected == 1:
 
#             self.loop()
 

	
 
#     def loop(self) -> None:
 
#         if self.numConnected == 0:
 
#             return
 
#         self.sendEvent(self.state)
 
#         reactor.callLater(.1, self.loop)
 

	
 
#     def unbind(self) -> None:
 
#         self.numConnected -= 1
 

	
 
    if not options.show:
 
        raise ValueError("missing --show http://...")
 
def main():
 
    # session = clientsession.getUri('effectSequencer', options)
 
    graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 

	
 
    session = clientsession.getUri('effectSequencer', options)
 
    seq = Sequencer(
 
        graph,
 
        lambda settings: sendToCollector(
 
            'effectSequencer',
 
            '',#session,
 
            settings,
 
            # This seems to be safe here (and lets us get from
 
            # 20fpx to 40fpx), even though it leads to big stalls
 
            # if I use it on KC.
 
            useZmq=True))
 

	
 
    app = App(URIRef(options.show), session)
 
    if options.twistedlog:
 
        from twisted.python import log as twlog
 
        twlog.startLogging(sys.stderr)
 
    reactor.run()
 
    app = Starlette(
 
        debug=True,
 
        routes=[
 
            # WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
 
        ],
 
    )
 

	
 
    app.add_middleware(PrometheusMiddleware)
 
    app.add_route("/metrics", handle_metrics)
 

	
 
    return app
 

	
 

	
 
app = main()
0 comments (0 inline, 0 general)