Changeset - 9aa046cc9b33
[Not reviewed]
default
0 14 1
drewp@bigasterisk.com - 3 years ago 2022-05-11 06:01:26
drewp@bigasterisk.com
replace greplin with prometheus throughout (untested)
15 files changed with 225 insertions and 183 deletions:
0 comments (0 inline, 0 general)
bin/captureDevice
Show inline comments
 
#!bin/python
 

	
 
from rdflib import URIRef
 
from twisted.internet import reactor
 
from twisted.internet.defer import inlineCallbacks, Deferred
 

	
 
import logging
 
import optparse
 
import os
 
import time
 
import treq
 
import cyclone.web, cyclone.websocket, cyclone.httpclient
 
from greplin import scales
 

	
 
from light9.metrics import metrics, metricsRoute
 
from run_local import log
 
from cycloneerr import PrettyErrorHandler
 

	
 
from light9.namespaces import L9, RDF
 
from light9 import networking, showconfig
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9.paint.capture import writeCaptureDescription
 
from greplin.scales.cyclonehandler import StatsHandler
 
from light9.effect.settings import DeviceSettings
 
from light9.collector.collector_client import sendToCollector
 
from rdfdb.patch import Patch
 
from light9.zmqtransport import parseJsonMessage
 

	
 
stats = scales.collection('/webServer', scales.PmfStat('setAttr',
 
                                                       recalcPeriod=1))
 

	
 

	
 
class Camera(object):
 

	
 
    def __init__(self, imageUrl):
 
        self.imageUrl = imageUrl
 

	
 
    def takePic(self, uri, writePath):
 
        log.info('takePic %s', uri)
 
        return treq.get(
 
            self.imageUrl).addCallbacks(lambda r: self._done(writePath, r),
 
                                        log.error)
 
@@ -149,43 +145,41 @@ class Capture(object):
 
                                self.dev, path, self.settingsCache, settings)
 

	
 
        reactor.callLater(0, self.step)
 

	
 

	
 
camera = Camera(
 
    'http://plus:8200/picamserve/pic?res=1080&resize=800&iso=800&redgain=1.6&bluegain=1.6&shutter=60000&x=0&w=1&y=0&h=.952'
 
)
 

	
 

	
 
class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    @metrics('set_attr').time()
 
    def put(self):
 
        with stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(
 
                self.request.body)
 
            self.set_status(202)
 

	
 

	
 
def launch(graph):
 

	
 
    cap = Capture(graph, dev=L9['device/aura5'])
 
    reactor.listenTCP(networking.captureDevice.port,
 
                      cyclone.web.Application(handlers=[
 
                          (r'/()', cyclone.web.StaticFileHandler, {
 
                              "path": "light9/web",
 
                              "default_filename": "captureDevice.html"
 
                          }),
 
                          (r'/stats/(.*)', StatsHandler, {
 
                              'serverName': 'captureDevice'
 
                          }),
 
                          metricsRoute(),
 
                      ]),
 
                      interface='::',
 
                      cap=cap)
 
    log.info('serving http on %s', networking.captureDevice.port)
 

	
 

	
 
def main():
 
    parser = optparse.OptionParser()
 
    parser.add_option("-v",
 
                      "--verbose",
 
                      action="store_true",
 
                      help="logging.DEBUG")
bin/effecteval
Show inline comments
 
@@ -2,35 +2,32 @@
 

	
 
from run_local import log
 
from twisted.internet import reactor
 
from twisted.internet.defer import inlineCallbacks, returnValue
 
import cyclone.web, cyclone.websocket, cyclone.httpclient
 
import sys, optparse, logging, json, itertools
 
from rdflib import URIRef, Literal
 

	
 
from light9 import networking, showconfig
 
from light9.effecteval.effect import EffectNode
 
from light9.effect.edit import getMusicStatus, songNotePatch
 
from light9.effecteval.effectloop import makeEffectLoop
 
from greplin.scales.cyclonehandler import StatsHandler
 
from light9.metrics import metrics, metricsRoute
 
from light9.namespaces import L9
 
from rdfdb.patch import Patch
 
from rdfdb.syncedgraph import SyncedGraph
 
from greplin import scales
 
from standardservice.scalessetup import gatherProcessStats
 

	
 
from cycloneerr import PrettyErrorHandler
 
from light9.coffee import StaticCoffee
 

	
 
gatherProcessStats()
 

	
 

	
 
class EffectEdit(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    def get(self):
 
        self.set_header('Content-Type', 'text/html')
 
        self.write(open("light9/effecteval/effect.html").read())
 

	
 
    def delete(self):
 
        graph = self.settings.graph
 
        uri = URIRef(self.get_argument('uri'))
 
        with graph.currentState(tripleFilter=(None, L9['effect'], uri)) as g:
 
@@ -217,65 +214,53 @@ class SongEffectsEval(PrettyErrorHandler
 
        # return dmx dict for all effects in the song, already combined
 

	
 

	
 
class App(object):
 

	
 
    def __init__(self, show, outputWhere):
 
        self.show = show
 
        self.outputWhere = outputWhere
 
        self.graph = SyncedGraph(networking.rdfdb.url, "effectEval")
 
        self.graph.initiallySynced.addCallback(self.launch).addErrback(
 
            log.error)
 

	
 
        self.stats = scales.collection(
 
            '/',
 
            scales.PmfStat('sendLevels', recalcPeriod=1),
 
            scales.PmfStat('getMusic', recalcPeriod=1),
 
            scales.PmfStat('evals', recalcPeriod=1),
 
            scales.PmfStat('sendOutput', recalcPeriod=1),
 
            scales.IntStat('errors'),
 
        )
 

	
 
    def launch(self, *args):
 
        log.info('launch')
 
        if self.outputWhere:
 
            self.loop = makeEffectLoop(self.graph, self.stats, self.outputWhere)
 
            self.loop = makeEffectLoop(self.graph, self.outputWhere)
 
            self.loop.startLoop()
 

	
 
        SFH = cyclone.web.StaticFileHandler
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/()', SFH, {
 
                'path': 'light9/effecteval',
 
                'default_filename': 'index.html'
 
            }),
 
            (r'/effect', EffectEdit),
 
            (r'/effect\.js', StaticCoffee, {
 
                'src': 'light9/effecteval/effect.coffee'
 
            }),
 
            (r'/(effect-components\.html)', SFH, {
 
                'path': 'light9/effecteval'
 
            }),
 
            (r'/effectUpdates', EffectUpdates),
 
            (r'/code', Code),
 
            (r'/songEffectsUpdates', SongEffectsUpdates),
 
            (r'/effect/eval', EffectEval),
 
            (r'/songEffects', SongEffects),
 
            (r'/songEffects/eval', SongEffectsEval),
 
            (r'/stats/(.*)', StatsHandler, {
 
                'serverName': 'effecteval'
 
            }),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  graph=self.graph,
 
                                                  stats=self.stats)
 
                                                  graph=self.graph)
 
        reactor.listenTCP(networking.effectEval.port, self.cycloneApp)
 
        log.info("listening on %s" % networking.effectEval.port)
 

	
 

	
 
if __name__ == "__main__":
 
    parser = optparse.OptionParser()
 
    parser.add_option(
 
        '--show',
 
        help='show URI, like http://light9.bigasterisk.com/show/dance2008',
 
        default=showconfig.showUri())
 
    parser.add_option("-v",
 
                      "--verbose",
bin/effectsequencer
Show inline comments
 
#!bin/python
 
"""
 
plays back effect notes from the timeline
 
"""
 

	
 
from run_local import log
 
from twisted.internet import reactor
 
# from greplin.scales.cyclonehandler import StatsHandler
 
from light9.metrics import metrics, metricsRoute
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9 import networking, showconfig
 
# from greplin import scales
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
from light9.effect.sequencer import Sequencer, Updates
 
from light9.collector.collector_client import sendToCollector
 

	
 
from light9 import clientsession
 

	
 

	
 
class App(object):
 

	
 
    def __init__(self, show, session):
 
        self.show = show
 
        self.session = session
 

	
 
        self.graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
        self.graph.initiallySynced.addCallback(self.launch)
 

	
 
        # self.stats = scales.collection(
 
        #     '/',
 
        #     scales.PmfStat('sendLevels', recalcPeriod=1),
 
        #     scales.PmfStat('getMusic', recalcPeriod=1),
 
        #     scales.PmfStat('evals', recalcPeriod=1),
 
        #     scales.PmfStat('sendOutput', recalcPeriod=1),
 
        #     scales.IntStat('errors'),
 
        # )
 

	
 
    def launch(self, *args):
 
        self.seq = Sequencer(
 
            self.graph,
 
            lambda settings: sendToCollector(
 
                'effectSequencer',
 
                self.session,
 
                settings,
 
                # This seems to be safe here (and lets us get from
 
                # 20fpx to 40fpx), even though it leads to big stalls
 
                # if I use it on KC.
 
                useZmq=True))
 

	
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/()', cyclone.web.StaticFileHandler, {
 
                "path": "light9/effect/",
 
                "default_filename": "sequencer.html"
 
            }),
 
            (r'/updates', Updates),
 
            # (r'/stats/(.*)', StatsHandler, {
 
            #     'serverName': 'effectsequencer'
 
            # }),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  seq=self.seq,
 
                                                  graph=self.graph,
 
                                                #   stats=self.stats
 
                                                  )
 
        reactor.listenTCP(networking.effectSequencer.port, self.cycloneApp)
 
        log.info("listening on %s" % networking.effectSequencer.port)
 

	
 

	
 
if __name__ == "__main__":
 
    parser = optparse.OptionParser()
bin/paintserver
Show inline comments
 
#!bin/python
 

	
 
from run_local import log
 
import json
 
from twisted.internet import reactor
 
from greplin.scales.cyclonehandler import StatsHandler
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9 import networking, showconfig
 
from greplin import scales
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
from light9 import clientsession
 
import light9.paint.solve
 
from cycloneerr import PrettyErrorHandler
 
from light9.namespaces import L9, DEV
 
from light9.metrics import metrics
 
import imp
 

	
 

	
 
class Solve(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    def post(self):
 
        painting = json.loads(self.request.body)
 
        with self.settings.stats.solve.time():
 
        with metrics('solve').time():
 
            img = self.settings.solver.draw(painting)
 
            sample, sampleDist = self.settings.solver.bestMatch(
 
                img, device=DEV['aura2'])
 
            with self.settings.graph.currentState() as g:
 
                bestPath = g.value(sample, L9['imagePath']).replace(L9[''], '')
 
            #out = solver.solve(painting)
 
            #layers = solver.simulationLayers(out)
 

	
 
        self.write(
 
            json.dumps({
 
                'bestMatch': {
 
                    'uri': sample,
 
@@ -44,72 +43,65 @@ class Solve(PrettyErrorHandler, cyclone.
 
    def reloadSolver(self):
 
        imp.reload(light9.paint.solve)
 
        self.settings.solver = light9.paint.solve.Solver(self.settings.graph)
 
        self.settings.solver.loadSamples()
 

	
 

	
 
class BestMatches(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    def post(self):
 
        body = json.loads(self.request.body)
 
        painting = body['painting']
 
        devs = [URIRef(d) for d in body['devices']]
 
        with self.settings.stats.solve.time():
 
        with metrics('solve').time():
 
            img = self.settings.solver.draw(painting)
 
            outSettings = self.settings.solver.bestMatches(img, devs)
 
            self.write(json.dumps({'settings': outSettings.asList()}))
 

	
 

	
 
class App(object):
 

	
 
    def __init__(self, show, session):
 
        self.show = show
 
        self.session = session
 

	
 
        self.graph = SyncedGraph(networking.rdfdb.url, "paintServer")
 
        self.graph.initiallySynced.addCallback(self.launch).addErrback(
 
            log.error)
 

	
 
        self.stats = scales.collection(
 
            '/',
 
            scales.PmfStat('solve', recalcPeriod=1),
 
        )
 

	
 
    def launch(self, *args):
 

	
 
        self.solver = light9.paint.solve.Solver(
 
            self.graph,
 
            sessions=[
 
                L9['show/dance2017/capture/aura1/cap1876596'],
 
                L9['show/dance2017/capture/aura2/cap1876792'],
 
                L9['show/dance2017/capture/aura3/cap1877057'],
 
                L9['show/dance2017/capture/aura4/cap1877241'],
 
                L9['show/dance2017/capture/aura5/cap1877406'],
 
                L9['show/dance2017/capture/q1/cap1874255'],
 
                L9['show/dance2017/capture/q2/cap1873665'],
 
                L9['show/dance2017/capture/q3/cap1876223'],
 
            ])
 
        self.solver.loadSamples()
 

	
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/stats/(.*)', StatsHandler, {
 
                'serverName': 'paintserver'
 
            }),
 
            (r'/solve', Solve),
 
            (r'/bestMatches', BestMatches),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  graph=self.graph,
 
                                                  solver=self.solver,
 
                                                  stats=self.stats)
 
                                                  solver=self.solver)
 
        reactor.listenTCP(networking.paintServer.port, self.cycloneApp)
 
        log.info("listening on %s" % networking.paintServer.port)
 

	
 

	
 
if __name__ == "__main__":
 
    parser = optparse.OptionParser()
 
    parser.add_option(
 
        '--show',
 
        help='show URI, like http://light9.bigasterisk.com/show/dance2008',
 
        default=showconfig.showUri())
 
    parser.add_option("-v",
 
                      "--verbose",
bin/patchserver
Show inline comments
 
@@ -3,52 +3,47 @@
 
from run_local import log
 

	
 
from rdflib import URIRef
 
from twisted.internet import reactor
 
from twisted.internet.defer import inlineCallbacks, Deferred
 

	
 
import logging
 
import optparse
 
import os
 
import time
 
import treq
 
import cyclone.web, cyclone.websocket, cyclone.httpclient
 
from greplin import scales
 

	
 
from cycloneerr import PrettyErrorHandler
 

	
 
from light9.namespaces import L9, RDF
 
from light9 import networking, showconfig
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
from greplin.scales.cyclonehandler import StatsHandler
 
from light9.effect.settings import DeviceSettings
 
from rdfdb.patch import Patch
 
from light9.metrics import metrics, metricsRoute
 

	
 
stats = scales.collection('/webServer', scales.PmfStat('setAttr',
 
                                                       recalcPeriod=1))
 

	
 

	
 
def launch(graph):
 
    if 0:
 
        reactor.listenTCP(
 
            networking.captureDevice.port,
 
            cyclone.web.Application(handlers=[
 
                (r'/()', cyclone.web.StaticFileHandler, {
 
                    "path": "light9/web",
 
                    "default_filename": "patchServer.html"
 
                }),
 
                (r'/stats/(.*)', StatsHandler, {
 
                    'serverName': 'patchServer'
 
                }),
 
                metricsRoute(),
 
            ]),
 
            interface='::',
 
        )
 
        log.info('serving http on %s', networking.captureDevice.port)
 

	
 
    def prn():
 
        width = {}
 
        for dc in graph.subjects(RDF.type, L9['DeviceClass']):
 
            for attr in graph.objects(dc, L9['attr']):
 
                width[dc] = max(
 
                    width.get(dc, 0),
 
                    graph.value(attr, L9['dmxOffset']).toPython() + 1)
bin/vidref
Show inline comments
 
@@ -10,50 +10,42 @@ light9/vidref/videorecorder.py capture f
 
light9/vidref/replay.py backend for vidref.js playback element- figures out which frames go with the current song and time
 
light9/vidref/index.html web ui for watching current stage and song playback
 
light9/vidref/setup.html web ui for setup of camera params and frame crop
 
light9/web/light9-vidref-live.js LitElement for live video frames
 
light9/web/light9-vidref-playback.js LitElement for video playback
 

	
 
"""
 
from run_local import log
 

	
 
from typing import cast
 
import logging, optparse, json, base64, os, glob
 

	
 
# from greplin import scales
 
# from greplin.scales.cyclonehandler import StatsHandler
 
from light9.metrics import metrics, metricsRoute
 

	
 
from rdflib import URIRef
 
from twisted.internet import reactor, defer
 
import cyclone.web, cyclone.httpclient, cyclone.websocket
 

	
 
from cycloneerr import PrettyErrorHandler
 
from light9 import networking, showconfig
 
from light9.newtypes import Song
 
from light9.vidref import videorecorder
 
from rdfdb.syncedgraph import SyncedGraph
 
# from standardservice.scalessetup import gatherProcessStats
 

	
 
parser = optparse.OptionParser()
 
parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG")
 
(options, args) = parser.parse_args()
 

	
 
log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 

	
 
# gatherProcessStats()
 
# stats = scales.collection(
 
#     '/webServer',
 
#     scales.RecentFpsStat('liveWebsocketFrameFps'),
 
#     scales.IntStat('liveClients'),
 
# )
 

	
 

	
 
class Snapshot(cyclone.web.RequestHandler):
 

	
 
    @defer.inlineCallbacks
 
    def post(self):
 
        # save next pic
 
        # return /snapshot/path
 
        try:
 
            snapshotDir = 'todo'
 
            outputFilename = yield self.settings.gui.snapshot()
 

	
 
            assert outputFilename.startswith(snapshotDir)
 
@@ -69,36 +61,34 @@ class Snapshot(cyclone.web.RequestHandle
 
            raise
 

	
 

	
 
pipeline = videorecorder.GstSource(
 
    #'/dev/v4l/by-id/usb-Bison_HD_Webcam_200901010001-video-index0'
 
    '/dev/v4l/by-id/usb-Generic_FULL_HD_1080P_Webcam_200901010001-video-index0')
 

	
 

	
 
class Live(cyclone.websocket.WebSocketHandler):
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        pipeline.liveImages.subscribe(on_next=self.onFrame)
 
        # stats.liveClients += 1
 
        metrics('live_clients').offset(1)
 

	
 
    def connectionLost(self, reason):
 
        #self.subj.dispose()
 
        # stats.liveClients -= 1
 
        pass
 
        metrics('live_clients').offset(-1)
 

	
 
    def onFrame(self, cf: videorecorder.CaptureFrame):
 
        if cf is None: return
 

	
 
        # stats.liveWebsocketFrameFps.mark()
 

	
 
        with metrics('live_websocket_frame_fps').time():
 
        self.sendMessage(
 
            json.dumps({
 
                'jpeg': base64.b64encode(cf.asJpeg()).decode('ascii'),
 
                'description': f't={cf.t}',
 
            }))
 

	
 

	
 
class SnapshotPic(cyclone.web.StaticFileHandler):
 
    pass
 

	
 

	
 
class Time(PrettyErrorHandler, cyclone.web.RequestHandler):
 
@@ -180,21 +170,19 @@ reactor.listenTCP(
 
                'path': 'light9/vidref',
 
                'default_filename': 'setup.html'
 
            }),
 
            (r'/live', Live),
 
            (r'/clips', Clips),
 
            (r'/replayMap', ReplayMap),
 
            (r'/snapshot', Snapshot),
 
            (r'/snapshot/(.*)', SnapshotPic, {
 
                "path": 'todo',
 
            }),
 
            (r'/time', Time),
 
            (r'/time/stream', TimeStream),
 
            # (r'/stats/(.*)', StatsHandler, {
 
            #     'serverName': 'vidref'
 
            # }),
 
            metricsRoute(),
 
        ],
 
        debug=True,
 
    ))
 
log.info("serving on %s" % port)
 

	
 
reactor.run()
light9/ascoltami/player.py
Show inline comments
 
#!/usr/bin/python
 
"""
 
alternate to the mpd music player, for ascoltami
 
"""
 

	
 
import time, logging, traceback
 
from gi.repository import Gst
 
from twisted.internet import task
 
from greplin import scales
 

	
 
from light9.metrics import metrics
 
log = logging.getLogger()
 

	
 
stats = scales.collection(
 
    '/player',
 
    # scales.RecentFpsStat('currentTimeFps'),
 
)
 

	
 

	
 
class Player(object):
 

	
 
    def __init__(self, autoStopOffset=4, onEOS=None):
 
        """autoStopOffset is the number of seconds before the end of
 
        song before automatically stopping (which is really pausing).
 
        onEOS is an optional function to be called when we reach the
 
        end of a stream (for example, can be used to advance the song).
 
        It is called with one argument which is the URI of the song that
 
        just finished."""
 
        self.autoStopOffset = autoStopOffset
 
@@ -133,25 +128,25 @@ class Player(object):
 
        more OS caching.
 

	
 
        i don't care that it's blocking.
 
        """
 
        log.info("preloading %s", songPath)
 
        assert songPath.startswith("file://"), songPath
 
        try:
 
            open(songPath[len("file://"):], 'rb').read()
 
        except IOError as e:
 
            log.error("couldn't preload %s, %r", songPath, e)
 
            raise
 

	
 
    # @stats.currentTimeFps.rate()
 
    @metrics('current_time').time()
 
    def currentTime(self):
 
        success, cur = self.playbin.query_position(Gst.Format.TIME)
 
        if not success:
 
            return 0
 
        return cur / Gst.SECOND
 

	
 
    def duration(self):
 
        success, dur = self.playbin.query_duration(Gst.Format.TIME)
 
        if not success:
 
            return 0
 
        return dur / Gst.SECOND
 

	
light9/ascoltami/webapp.py
Show inline comments
 
import json, socket, subprocess, os, logging, time
 

	
 
from cyclone import template
 
from rdflib import URIRef
 
import cyclone.web, cyclone.websocket
 
# from greplin.scales.cyclonehandler import StatsHandler
 

	
 
from cycloneerr import PrettyErrorHandler
 
from light9.metrics import metricsRoute
 
from light9.namespaces import L9
 
from light9.showconfig import getSongsFromShow, songOnDisk
 
from twisted.internet import reactor
 
_songUris = {}  # locationUri : song
 
log = logging.getLogger()
 
loader = template.Loader(os.path.dirname(__file__))
 

	
 

	
 
def songLocation(graph, songUri):
 
    loc = URIRef("file://%s" % songOnDisk(songUri))
 
    _songUris[loc] = songUri
 
    return loc
 
@@ -196,17 +196,15 @@ class goButton(PrettyErrorHandler, cyclo
 

	
 

	
 
def makeWebApp(app):
 
    return cyclone.web.Application(handlers=[
 
        (r"/", root),
 
        (r"/time", timeResource),
 
        (r"/time/stream", timeStreamResource),
 
        (r"/song", songResource),
 
        (r"/songs", songs),
 
        (r"/seekPlayOrPause", seekPlayOrPause),
 
        (r"/output", output),
 
        (r"/go", goButton),
 
        # (r'/stats/(.*)', StatsHandler, {
 
        #     'serverName': 'ascoltami'
 
        # }),
 
        metricsRoute(),
 
    ],
 
                                   app=app)
light9/collector/collector_client.py
Show inline comments
 
from light9 import networking
 
from light9.effect.settings import DeviceSettings
 
from light9.metrics import metrics
 
from twisted.internet import defer
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
import json, time, logging
 
import treq
 
# from greplin import scales
 

	
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 
# stats = scales.collection(
 
#     '/collectorClient',
 
#     scales.PmfStat('send', recalcPeriod=1),
 
# )
 

	
 

	
 
class TwistedZmqClient(object):
 

	
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 

	
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
@@ -49,24 +44,24 @@ def sendToCollector(client, session, set
 
                    useZmq=False) -> defer.Deferred:
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings).encode('utf8')
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
 

	
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        # stats.send = dt
 
        metrics('send').observe(dt)
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 

	
 
    d.addCallback(onDone)
 

	
 
    def onErr(err):
 
        log.warn('sendToCollector failed: %r', err)
 

	
 
    d.addErrback(onErr)
 
    return d
light9/collector/output.py
Show inline comments
 
from rdflib import URIRef
 
import socket
 
import struct
 
import time
 
import usb.core
 
import logging
 
from twisted.internet import threads, reactor, task
 
from greplin import scales
 
from light9.metrics import metrics
 
log = logging.getLogger('output')
 
logAllDmx = logging.getLogger('output.allDmx')
 

	
 

	
 
class Output(object):
 
    """
 
    send a binary buffer of values to some output device. Call update
 
    as often as you want- the result will be sent as soon as possible,
 
    and with repeats as needed to outlast hardware timeouts.
 

	
 
    This base class doesn't ever call _write. Subclasses below have
 
    strategies for that.
 
    """
 
    uri: URIRef
 

	
 
    def __init__(self, uri: URIRef):
 
        self.uri = uri
 

	
 
        self.statPath = '/output%s' % self.shortId()
 
        scales.init(self, self.statPath)
 

	
 
        self._writeStats = scales.collection(
 
            self.statPath + '/write', scales.IntStat('succeed'),
 
            scales.IntStat('fail'), scales.PmfStat('call', recalcPeriod=1),
 
            scales.RecentFpsStat('fps'))
 

	
 
        self._currentBuffer = b''
 

	
 
        if log.isEnabledFor(logging.DEBUG):
 
            self._lastLoggedMsg = ''
 
            task.LoopingCall(self._periodicLog).start(1)
 

	
 
    def reconnect(self):
 
        pass
 

	
 
    def shortId(self) -> str:
 
        """short string to distinguish outputs"""
 
        return self.uri.rstrip('/').rsplit('/')[-1]
 
@@ -82,48 +74,47 @@ class BackgroundLoopOutput(Output):
 
    def __init__(self, uri, rate=22):
 
        super().__init__(uri)
 
        self.rate = rate
 
        self._currentBuffer = b''
 

	
 
        self._loop()
 

	
 
    def _loop(self):
 
        start = time.time()
 
        sendingBuffer = self._currentBuffer
 

	
 
        def done(worked):
 
            self._writeStats.succeed += 1
 
            metrics('write_success', output=self.shortId()).incr()
 
            reactor.callLater(max(0, start + 1 / self.rate - time.time()),
 
                              self._loop)
 

	
 
        def err(e):
 
            self._writeStats.fail += 1
 
            metrics('write_fail', output=self.shortId()).incr()
 
            log.error(e)
 
            reactor.callLater(.2, self._loop)
 

	
 
        d = threads.deferToThread(self._write, sendingBuffer)
 
        d.addCallbacks(done, err)
 

	
 

	
 
class FtdiDmx(BackgroundLoopOutput):
 

	
 
    def __init__(self, uri, lastDmxChannel, rate=22):
 
        super().__init__(uri)
 
        self.lastDmxChannel = lastDmxChannel
 
        from .dmx_controller_output import OpenDmxUsb
 
        self.dmx = OpenDmxUsb()
 

	
 
    def _write(self, buf):
 
        self._writeStats.fps.mark()
 
        with self._writeStats.call.time():
 
        with metrics('write', output=self.shortId()).time():
 
            if not buf:
 
                logAllDmx.debug('%s: empty buf- no output', self.shortId())
 
                return
 

	
 
            # ok to truncate the last channels if they just went
 
            # to 0? No it is not. DMX receivers don't add implicit
 
            # zeros there.
 
            buf = bytes([0]) + buf[:self.lastDmxChannel]
 

	
 
            if logAllDmx.isEnabledFor(logging.DEBUG):
 
                # for testing fps, smooth fades, etc
 
                logAllDmx.debug('%s: %s...' %
 
@@ -138,27 +129,25 @@ class ArtnetDmx(BackgroundLoopOutput):
 
        """sends UDP messages to the given host/port"""
 
        super().__init__(uri, rate)
 
        packet = bytearray()
 
        packet.extend(map(ord, "Art-Net"))
 
        packet.append(0x00)  # Null terminate Art-Net
 
        packet.extend([0x00, 0x50])  # Opcode ArtDMX 0x5000 (Little endian)
 
        packet.extend([0x00, 0x0e])  # Protocol version 14
 
        self.base_packet = packet
 
        self.sequence_counter = 255
 
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 

	
 
    def _write(self, buf):
 
        self._writeStats.fps.mark()
 
        with self._writeStats.call.time():
 

	
 
       with metrics('write', output=self.shortId()).time():
 
            if not buf:
 
                logAllDmx.debug('%s: empty buf- no output', self.shortId())
 
                return
 

	
 
            if logAllDmx.isEnabledFor(logging.DEBUG):
 
                # for testing fps, smooth fades, etc
 
                logAllDmx.debug('%s: %s...' %
 
                                (self.shortId(), ' '.join(map(str, buf[:32]))))
 

	
 
            if self.sequence_counter:
 
                self.sequence_counter += 1
 
                if self.sequence_counter > 255:
 
@@ -207,26 +196,26 @@ class Udmx(BackgroundLoopOutput):
 
        self._connected = 1
 
        self._reconnections += 1
 

	
 
    #def update(self, buf:bytes):
 
    #    self._write(buf)
 

	
 
    #def _loop(self):
 
    #    pass
 
    def _write(self, buf):
 
        if not self.dev:
 
            log.info('%s: trying to connect', self.shortId())
 
            raise ValueError()
 
        self._writeStats.fps.mark()
 
        with self._writeStats.call.time():
 

	
 
        with metrics('write', output=self.shortId()).time():
 
            try:
 
                if not buf:
 
                    logAllDmx.debug('%s: empty buf- no output', self.shortId())
 
                    return
 

	
 
                # ok to truncate the last channels if they just went
 
                # to 0? No it is not. DMX receivers don't add implicit
 
                # zeros there.
 
                buf = buf[:self.lastDmxChannel]
 

	
 
                if logAllDmx.isEnabledFor(logging.DEBUG):
 
                    # for testing fps, smooth fades, etc
light9/effect/sequencer.py
Show inline comments
 
@@ -13,49 +13,30 @@ import cyclone.sse
 
import logging, bisect, time
 
import traceback
 
from decimal import Decimal
 
from typing import Any, Callable, Dict, List, Tuple, cast, Union
 

	
 
from light9.ascoltami.musictime_client import MusicTime
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song
 
from rdfdb.syncedgraph import SyncedGraph
 
# from standardservice.scalessetup import gatherProcessStats
 
from light9.metrics import metrics
 

	
 
# from greplin import scales
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 

	
 
# gatherProcessStats()
 
# updateStats = scales.collection(
 
#     '/update',
 
#     scales.PmfStat('s0_getMusic', recalcPeriod=1),
 
#     scales.PmfStat('s1_eval', recalcPeriod=1),
 
#     #scales.PmfStat('s3_send_client', recalcPeriod=1),
 
#     scales.PmfStat('s3_send', recalcPeriod=1),
 
#     scales.PmfStat('updateLoopLatency', recalcPeriod=1),
 
#     scales.DoubleStat('updateLoopLatencyGoal'),
 
#     scales.RecentFpsStat('updateFps'),
 
#     scales.DoubleStat('goalFps'),
 
# )
 
# compileStats = scales.collection(
 
#     '/compile',
 
#     scales.PmfStat('graph', recalcPeriod=1),
 
#     scales.PmfStat('song', recalcPeriod=1),
 
# )
 

	
 

	
 
def pyType(n):
 
    ret = n.toPython()
 
    if isinstance(ret, Decimal):
 
        return float(ret)
 
    return ret
 

	
 

	
 
class Note(object):
 

	
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule,
 
                 simpleOutputs):
 
@@ -162,56 +143,56 @@ class CodeWatcher(object):
 
        # in case we got an event at the start of the write
 
        reactor.callLater(.1, go)
 

	
 

	
 
class Sequencer(object):
 

	
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], Deferred],
 
                 fps=40):
 
        self.graph = graph
 
        self.fps = fps
 
        # updateStats.goalFps = self.fps
 
        # updateStats.updateLoopLatencyGoal = 1 / self.fps
 
        metrics('update_loop_goal_fps').set(self.fps)
 
        metrics('update_loop_goal_latency').set(1 / self.fps)
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 

	
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
        self._compileGraphCall = None
 
        self.notes: Dict[Song, List[Note]] = {}  # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.lastLoopSucceeded = False
 

	
 
        self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        self.updateLoop()
 

	
 
    def onCodeChange(self):
 
        log.debug('seq.onCodeChange')
 
        self.graph.addHandler(self.compileGraph)
 
        #self.updateLoop()
 

	
 
    # @compileStats.graph.time()
 
    @metrics('compile_graph').time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        for song in self.graph.subjects(RDF.type, L9['Song']):
 

	
 
            def compileSong(song: Song = cast(Song, song)) -> None:
 
                self.compileSong(song)
 

	
 
            self.graph.addHandler(compileSong)
 

	
 
    # @compileStats.song.time()
 
    @metrics('compile_song').time()
 
    def compileSong(self, song: Song) -> None:
 
        anyErrors = False
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            try:
 
                n = Note(self.graph, NoteUri(note), effecteval,
 
                         self.simpleOutputs)
 
            except Exception:
 
                log.warn(f"failed to build Note {note} - skipping")
 
                anyErrors = True
 
                continue
 
            self.notes[song].append(n)
 
@@ -221,72 +202,72 @@ class Sequencer(object):
 
    @inlineCallbacks
 
    def updateLoop(self) -> None:
 
        frameStart = time.time()
 
        try:
 
            sec = yield self.update()
 
        except Exception as e:
 
            self.lastLoopSucceeded = False
 
            traceback.print_exc()
 
            log.warn('updateLoop: %r', e)
 
            reactor.callLater(1, self.updateLoop)
 
        else:
 
            took = time.time() - frameStart
 
            # updateStats.updateLoopLatency = took
 
            metrics('update_loop_latency').observe(took)
 

	
 
            if not self.lastLoopSucceeded:
 
                log.info('Sequencer.update is working')
 
                self.lastLoopSucceeded = True
 

	
 
            delay = max(0, 1 / self.fps - took)
 
            reactor.callLater(delay, self.updateLoop)
 

	
 
    # @updateStats.updateFps.rate()
 
    @metrics('update_call').time()
 
    @inlineCallbacks
 
    def update(self) -> Deferred:
 

	
 
        if 1:#with updateStats.s0_getMusic.time():
 
        with metrics('update_s0_getMusic').time():
 
            musicState = self.music.getLatest()
 
            if not musicState.get('song') or not isinstance(
 
                    musicState.get('t'), float):
 
                return defer.succeed(0.0)
 
            song = Song(URIRef(musicState['song']))
 
            dispatcher.send('state',
 
                            update={
 
                                'song': str(song),
 
                                't': musicState['t']
 
                            })
 

	
 
        if 1:#with updateStats.s1_eval.time():
 
        with metrics('update_s1_eval').time():
 
            settings = []
 
            songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
            noteReports = []
 
            for note in songNotes:
 
                try:
 
                    s, report = note.outputSettings(musicState['t'])
 
                except Exception:
 
                    traceback.print_exc()
 
                    raise
 
                noteReports.append(report)
 
                settings.append(s)
 
            devSettings = DeviceSettings.fromList(self.graph, settings)
 

	
 
        dispatcher.send('state', update={'songNotes': noteReports})
 

	
 
        if 1:#with updateStats.s3_send.time():  # our measurement
 
        with metrics('update_s3_send').time():  # our measurement
 
            sendSecs = yield self.sendToCollector(devSettings)
 

	
 
        # sendToCollector's own measurement.
 
        # (sometimes it's None, not sure why, and neither is mypy)
 
        #if isinstance(sendSecs, float):
 
        #    updateStats.s3_send_client = sendSecs
 
        #    metrics('update_s3_send_client').observe(sendSecs)
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
 

	
 
    def __init__(self, application, request, **kwargs) -> None:
 
        cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
 
        self.state: Dict = {}
 
        dispatcher.connect(self.updateState, 'state')
 
        self.numConnected = 0
 

	
 
    def updateState(self, update: Dict):
 
        self.state.update(update)
light9/effecteval/effectloop.py
Show inline comments
 
@@ -5,34 +5,35 @@ from twisted.internet import reactor
 
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
 
from twisted.internet.error import TimeoutError
 
import numpy
 
import serial
 
import treq
 

	
 
from light9 import Effects
 
from light9 import Submaster
 
from light9 import dmxclient
 
from light9 import networking
 
from light9.effecteval.effect import EffectNode
 
from light9.namespaces import L9, RDF
 
from light9.metrics import metrics
 

	
 
log = logging.getLogger('effectloop')
 

	
 

	
 
class EffectLoop(object):
 
    """maintains a collection of the current EffectNodes, gets time from
 
    music player, sends dmx"""
 

	
 
    def __init__(self, graph, stats):
 
        self.graph, self.stats = graph, stats
 
    def __init__(self, graph):
 
        self.graph = graph
 
        self.currentSong = None
 
        self.currentEffects = [
 
        ]  # EffectNodes for the current song plus the submaster ones
 
        self.lastLogTime = 0
 
        self.lastLogMsg = ""
 
        self.lastErrorLog = 0
 
        self.graph.addHandler(self.setEffects)
 
        self.period = 1 / 30
 
        self.coastSecs = .3  # main reason to keep this low is to notice play/pause
 
        self.songTimeFetch = 0
 
        self.songIsPlaying = False
 
        self.songTimeFromRequest = 0
 
@@ -80,25 +81,25 @@ class EffectLoop(object):
 
                self.songTimeFromRequest = response['t']
 
                returnValue((response['t'], (response['song'] and
 
                                             URIRef(response['song']))))
 

	
 
        estimated = self.songTimeFromRequest
 
        if self.currentSong is not None and self.currentPlaying:
 
            estimated += now - self.requestTime
 
        returnValue((estimated, self.currentSong))
 

	
 
    @inlineCallbacks
 
    def updateTimeFromMusic(self):
 
        t1 = time.time()
 
        with self.stats.getMusic.time():
 
        with metrics('get_music').time():
 
            self.songTime, song = yield self.getSongTime()
 
            self.songTimeFetch = time.time()
 

	
 
        if song != self.currentSong:
 
            self.currentSong = song
 
            # this may be piling on the handlers
 
            self.graph.addHandler(self.setEffects)
 

	
 
        elapsed = time.time() - t1
 
        reactor.callLater(max(0, self.period - elapsed),
 
                          self.updateTimeFromMusic)
 

	
 
@@ -107,41 +108,41 @@ class EffectLoop(object):
 
        t = self.songTime
 
        if self.currentPlaying:
 
            t += max(0, now - self.songTimeFetch)
 
        return t
 

	
 
    @inlineCallbacks
 
    def sendLevels(self):
 
        t1 = time.time()
 
        log.debug("time since last call: %.1f ms" %
 
                  (1000 * (t1 - self.lastSendLevelsTime)))
 
        self.lastSendLevelsTime = t1
 
        try:
 
            with self.stats.sendLevels.time():
 
            with metrics('send_levels').time():
 
                if self.currentSong is not None:
 
                    log.debug('allEffectOutputs')
 
                    with self.stats.evals.time():
 
                    with metrics('evals').time():
 
                        outputs = self.allEffectOutputs(
 
                            self.estimatedSongTime())
 
                    log.debug('combineOutputs')
 
                    combined = self.combineOutputs(outputs)
 
                    self.logLevels(t1, combined)
 
                    log.debug('sendOutput')
 
                    with self.stats.sendOutput.time():
 
                    with metrics('send_output').time():
 
                        yield self.sendOutput(combined)
 

	
 
                elapsed = time.time() - t1
 
                dt = max(0, self.period - elapsed)
 
        except Exception:
 
            self.stats.errors += 1
 
            metrics('errors').incr()
 
            traceback.print_exc()
 
            dt = .5
 

	
 
        reactor.callLater(dt, self.sendLevels)
 

	
 
    def combineOutputs(self, outputs):
 
        """pick usable effect outputs and reduce them into one for sendOutput"""
 
        outputs = [x for x in outputs if isinstance(x, Submaster.Submaster)]
 
        out = Submaster.sub_maxes(*outputs)
 

	
 
        return out
 

	
 
@@ -303,19 +304,19 @@ class LedLoop(EffectLoop):
 
            log.debug('value changed: %s(%s %s)', meth, selectArgs, value)
 

	
 
            getattr(self.board, meth)(*(selectArgs + (value,)))
 
            self.lastSent[key] = compValue
 

	
 
        yield succeed(None)  # there was an attempt at an async send
 

	
 
    def logMessage(self, out):
 
        return str([(w, p.tolist() if isinstance(p, numpy.ndarray) else p)
 
                    for w, p in list(out.items())])
 

	
 

	
 
def makeEffectLoop(graph, stats, outputWhere):
 
def makeEffectLoop(graph, outputWhere):
 
    if outputWhere == 'dmx':
 
        return EffectLoop(graph, stats)
 
        return EffectLoop(graph)
 
    elif outputWhere == 'leds':
 
        return LedLoop(graph, stats)
 
        return LedLoop(graph)
 
    else:
 
        raise NotImplementedError("unknown output system %r" % outputWhere)
light9/metrics.py
Show inline comments
 
new file 100644
 
"""for easier porting, and less boilerplate, allow these styles using the
 
form of the call to set up the right type of metric automatically:
 

	
 
  from metrics import metrics
 
  metrics.setProcess('pretty_name')
 

	
 
  @metrics('loop').time()              # a common one to get the fps of each service. Gets us qty and time
 
  def frame():
 
      if err:
 
         metrics('foo_errors').incr()  # if you incr it, it's a counter
 

	
 
  @metrics('foo_calls').time()         # qty & time because it's a decorator
 
  def foo(): 
 

	
 
  metrics('goal_fps').set(f)           # a gauge because we called set()
 

	
 
  with metrics('recompute'): ...       # ctxmgr also makes a timer
 
     time_this_part()
 

	
 
I don't see a need for labels yet, but maybe some code will want like
 
metrics('foo', label1=one). Need histogram? Info?
 

	
 
"""
 
from typing import Dict, Tuple, Callable, Type, TypeVar, cast
 
import cyclone.web
 
from prometheus_client import Counter, Gauge, Metric, Summary
 
from prometheus_client.exposition import generate_latest
 
from prometheus_client.registry import REGISTRY
 

	
 
_created: Dict[str, Metric] = {}
 

	
 
# _process=sys.argv[0]
 
# def setProcess(name: str):
 
#   global _process
 
#   _process = name
 

	
 
MT = TypeVar("MT")
 

	
 

	
 
class _MetricsRequest:
 

	
 
    def __init__(self, name: str, **labels):
 
        self.name = name
 
        self.labels = labels
 

	
 
    def _ensure(self, cls: Type[MT]) -> MT:
 
        if self.name not in _created:
 
            _created[self.name] = cls(name=self.name, documentation=self.name, labelnames=self.labels.keys())
 
        m = _created[self.name]
 
        if self.labels:
 
            m = m.labels(**self.labels)
 
        return m
 

	
 
    def __call__(self, fn) -> Callable:
 
        return timed_fn
 

	
 
    def set(self, v: float):
 
        self._ensure(Gauge).set(v)
 

	
 
    def inc(self):
 
        self._ensure(Counter).inc()
 

	
 
    def offset(self, amount: float):
 
        self._ensure(Gauge).inc(amount)
 

	
 
    def time(self):
 
        return self._ensure(Summary).time()
 

	
 
    def observe(self, x: float):
 
        return self._ensure(Summary).observe(x)
 

	
 
    def __enter__(self):
 
        return self._ensure(Summary).__enter__()
 

	
 

	
 
def metrics(name: str, **labels):
 
    return _MetricsRequest(name, **labels)
 

	
 

	
 
class _CycloneMetrics(cyclone.web.RequestHandler):
 

	
 
    def get(self):
 
        self.add_header('content-type', 'text/plain')
 
        self.write(generate_latest(REGISTRY))
 

	
 

	
 
def metricsRoute() -> Tuple[str, Type[cyclone.web.RequestHandler]]:
 
    return ('/metrics', _CycloneMetrics)
 

	
 

	
 
"""
 
stuff we used to have in greplin. Might be nice to get (client-side-computed) min/max/stddev back.
 

	
 
class PmfStat(Stat):
 
  A stat that stores min, max, mean, standard deviation, and some
 
  percentiles for arbitrary floating-point data. This is potentially a
 
  bit expensive, so its child values are only updated once every
 
  twenty seconds.
 

	
 

	
 

	
 
metrics consumer side can do this with the changing counts:
 

	
 
class RecentFps(object):
 
  def __init__(self, window=20):
 
    self.window = window
 
    self.recentTimes = []
 

	
 
  def mark(self):
 
    now = time.time()
 
    self.recentTimes.append(now)
 
    self.recentTimes = self.recentTimes[-self.window:]
 

	
 
  def rate(self):
 
    def dec(innerFunc):
 
      def f(*a, **kw):
 
        self.mark()
 
        return innerFunc(*a, **kw)
 
      return f
 
    return dec
 

	
 
  def __call__(self):
 
    if len(self.recentTimes) < 2:
 
      return {}
 
    recents = sorted(round(1 / (b - a), 3)
 
                      for a, b in zip(self.recentTimes[:-1],
 
                                      self.recentTimes[1:]))
 
    avg = (len(self.recentTimes) - 1) / (
 
      self.recentTimes[-1] - self.recentTimes[0])
 
    return {'average': round(avg, 5), 'recents': recents}
 

	
 

	
 
i think prometheus covers this one:
 

	
 
import psutil
 
def gatherProcessStats():
 
    procStats = scales.collection('/process',
 
                                  scales.DoubleStat('time'),
 
                                  scales.DoubleStat('cpuPercent'),
 
                                  scales.DoubleStat('memMb'),
 
    )
 
    proc = psutil.Process()
 
    lastCpu = [0.]
 
    def updateTimeStat():
 
        now = time.time()
 
        procStats.time = round(now, 3)
 
        if now - lastCpu[0] > 3:
 
            procStats.cpuPercent = round(proc.cpu_percent(), 6) # (since last call)
 
            lastCpu[0] = now
 
        procStats.memMb = round(proc.memory_info().rss / 1024 / 1024, 6)
 
    task.LoopingCall(updateTimeStat).start(.1)
 

	
 
"""
 

	
 

	
 
class M:
 

	
 
    def __call__(self, name):
 
        return
light9/vidref/videorecorder.py
Show inline comments
 
from dataclasses import dataclass
 
from io import BytesIO
 
from typing import Optional
 
import time, logging, os, traceback
 

	
 
import gi
 
gi.require_version('Gst', '1.0')
 
gi.require_version('GstBase', '1.0')
 

	
 
from gi.repository import Gst
 
# from greplin import scales
 
from rdflib import URIRef
 
from rx.subject import BehaviorSubject
 
from twisted.internet import threads
 
import PIL.Image
 
import moviepy.editor
 
import numpy
 

	
 
from light9 import showconfig
 
from light9.ascoltami.musictime_client import MusicTime
 
from light9.newtypes import Song
 

	
 
from light9.metrics import metrics
 
log = logging.getLogger()
 

	
 
# stats = scales.collection(
 
#     '/recorder',
 
#     scales.PmfStat('jpegEncode', recalcPeriod=1),
 
#     scales.IntStat('deletes'),
 
#     scales.PmfStat('waitForNextImg', recalcPeriod=1),
 
#     scales.PmfStat('crop', recalcPeriod=1),
 
#     scales.RecentFpsStat('encodeFrameFps'),
 
#     scales.RecentFpsStat('queueGstFrameFps'),
 
# )
 

	
 

	
 
@dataclass
 
class CaptureFrame:
 
    img: PIL.Image
 
    song: Song
 
    t: float
 
    isPlaying: bool
 
    imgJpeg: Optional[bytes] = None
 

	
 
    # @stats.jpegEncode.time()
 
    @metrics('jpeg_encode').time()
 
    def asJpeg(self):
 
        if not self.imgJpeg:
 
            output = BytesIO()
 
            self.img.save(output, 'jpeg', quality=80)
 
            self.imgJpeg = output.getvalue()
 
        return self.imgJpeg
 

	
 

	
 
def songDir(song: Song) -> bytes:
 
    return os.path.join(
 
        showconfig.root(), b'video',
 
        song.replace('http://', '').replace('/', '_').encode('ascii'))
 
@@ -64,25 +53,25 @@ def takeUri(songPath: bytes) -> URIRef:
 
        ['http://light9.bigasterisk.com/show', song[-2], song[-1], take]))
 

	
 

	
 
def deleteClip(uri: URIRef):
 
    # uri http://light9.bigasterisk.com/show/dance2019/song6/take_155
 
    # path show/dance2019/video/light9.bigasterisk.com_show_dance2019_song6/take_155.*
 
    w = uri.split('/')[-4:]
 
    path = '/'.join([
 
        w[0], w[1], 'video', f'light9.bigasterisk.com_{w[0]}_{w[1]}_{w[2]}',
 
        w[3]
 
    ])
 
    log.info(f'deleting {uri} {path}')
 
    # stats.deletes += 1
 
    metrics('deletes').incr()
 
    for fn in [path + '.mp4', path + '.timing']:
 
        os.remove(fn)
 

	
 

	
 
class FramesToVideoFiles:
 
    """
 

	
 
    nextWriteAction: 'ignore'
 
    currentOutputClip: None
 

	
 
    (frames come in for new video)
 
    nextWriteAction: 'saveFrame'
 
@@ -164,39 +153,39 @@ class FramesToVideoFiles:
 
                                                   bitrate='150000')
 
        except (StopIteration, RuntimeError):
 
            self.frameMap.close()
 

	
 
        log.info('write_videofile done')
 
        self.currentOutputClip = None
 

	
 
        if self.currentClipFrameCount < 400:
 
            log.info('too small- deleting')
 
            deleteClip(takeUri(self.outMp4.encode('ascii')))
 

	
 
    def _bg_make_frame(self, video_time_secs):
 
        # stats.encodeFrameFps.mark()
 
        metrics('encodeFrameFps').incr()
 
        if self.nextWriteAction == 'close':
 
            raise StopIteration  # the one in write_videofile
 
        elif self.nextWriteAction == 'notWritingClip':
 
            raise NotImplementedError
 
        elif self.nextWriteAction == 'saveFrames':
 
            pass
 
        else:
 
            raise NotImplementedError(self.nextWriteAction)
 

	
 
        # should be a little queue to miss fewer frames
 
        t1 = time.time()
 
        while self.nextImg is None:
 
            time.sleep(.015)
 
        # stats.waitForNextImg = time.time() - t1
 
        metrics('wait_for_next_img').observe(time.time() - t1)
 
        cf, self.nextImg = self.nextImg, None
 

	
 
        self.frameMap.write(f'video {video_time_secs:g} = song {cf.t:g}\n')
 
        self.currentClipFrameCount += 1
 
        return numpy.asarray(cf.img)
 

	
 

	
 
class GstSource:
 

	
 
    def __init__(self, dev):
 
        """
 
        make new gst pipeline
 
@@ -239,35 +228,35 @@ class GstSource:
 
            (result, mapinfo) = buf.map(Gst.MapFlags.READ)
 
            try:
 
                img = PIL.Image.frombytes(
 
                    'RGB', (caps.get_structure(0).get_value('width'),
 
                            caps.get_structure(0).get_value('height')),
 
                    mapinfo.data)
 
                img = self.crop(img)
 
            finally:
 
                buf.unmap(mapinfo)
 
            # could get gst's frame time and pass it to getLatest
 
            latest = self.musicTime.getLatest()
 
            if 'song' in latest:
 
                # stats.queueGstFrameFps.mark()
 
                metrics('queue_gst_frame_fps').incr()
 
                self.liveImages.on_next(
 
                    CaptureFrame(img=img,
 
                                 song=Song(latest['song']),
 
                                 t=latest['t'],
 
                                 isPlaying=latest['playing']))
 
        except Exception:
 
            traceback.print_exc()
 
        return Gst.FlowReturn.OK
 

	
 
    # @stats.crop.time()
 
    @metrics('crop').time()
 
    def crop(self, img):
 
        return img.crop((40, 100, 790, 310))
 

	
 
    def setupPipelineError(self, pipe, cb):
 
        bus = pipe.get_bus()
 

	
 
        def onBusMessage(bus, msg):
 

	
 
            print('nusmsg', msg)
 
            if msg.type == Gst.MessageType.ERROR:
 
                _, txt = msg.parse_error()
 
                cb(txt)
light9/zmqtransport.py
Show inline comments
 
import json
 
from rdflib import URIRef, Literal
 
from greplin import scales
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
 
from light9.metrics import metrics
 
import logging
 

	
 
log = logging.getLogger('zmq')
 

	
 

	
 
def parseJsonMessage(msg):
 
    body = json.loads(msg)
 
    settings = []
 
    for device, attr, value in body['settings']:
 
        if isinstance(value, str) and value.startswith('http'):
 
            value = URIRef(value)
 
        else:
 
            value = Literal(value)
 
        settings.append((URIRef(device), URIRef(attr), value))
 
    return body['client'], body['clientSession'], settings, body['sendTime']
 

	
 

	
 
def startZmq(port, collector):
 
    stats = scales.collection(
 
        '/zmqServer',
 
        scales.PmfStat('setAttr', recalcPeriod=1),
 
        scales.RecentFpsStat('setAttrFps'),
 
    )
 

	
 
    zf = ZmqFactory()
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
 

	
 
    class Pull(ZmqPullConnection):
 
        #highWaterMark = 3
 
        def onPull(self, message):
 
            stats.setAttrFps.mark()
 
            with stats.setAttr.time():
 
            with metrics('zmq_server_set_attr').time():
 
                # todo: new compressed protocol where you send all URIs up
 
                # front and then use small ints to refer to devices and
 
                # attributes in subsequent requests.
 
                client, clientSession, settings, sendTime = parseJsonMessage(
 
                    message[0])
 
                collector.setAttrs(client, clientSession, settings, sendTime)
 

	
 
    Pull(zf, e)
0 comments (0 inline, 0 general)