Changeset - 9aa046cc9b33
[Not reviewed]
default
0 14 1
drewp@bigasterisk.com - 3 years ago 2022-05-11 06:01:26
drewp@bigasterisk.com
replace greplin with prometheus throughout (untested)
15 files changed with 225 insertions and 183 deletions:
0 comments (0 inline, 0 general)
bin/captureDevice
Show inline comments
 
@@ -7,29 +7,25 @@ from twisted.internet.defer import inlin
 
import logging
 
import optparse
 
import os
 
import time
 
import treq
 
import cyclone.web, cyclone.websocket, cyclone.httpclient
 
from greplin import scales
 

	
 
from light9.metrics import metrics, metricsRoute
 
from run_local import log
 
from cycloneerr import PrettyErrorHandler
 

	
 
from light9.namespaces import L9, RDF
 
from light9 import networking, showconfig
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9.paint.capture import writeCaptureDescription
 
from greplin.scales.cyclonehandler import StatsHandler
 
from light9.effect.settings import DeviceSettings
 
from light9.collector.collector_client import sendToCollector
 
from rdfdb.patch import Patch
 
from light9.zmqtransport import parseJsonMessage
 

	
 
stats = scales.collection('/webServer', scales.PmfStat('setAttr',
 
                                                       recalcPeriod=1))
 

	
 

	
 
class Camera(object):
 

	
 
    def __init__(self, imageUrl):
 
        self.imageUrl = imageUrl
 
@@ -155,14 +151,14 @@ camera = Camera(
 
    'http://plus:8200/picamserve/pic?res=1080&resize=800&iso=800&redgain=1.6&bluegain=1.6&shutter=60000&x=0&w=1&y=0&h=.952'
 
)
 

	
 

	
 
class Attrs(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    @metrics('set_attr').time()
 
    def put(self):
 
        with stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(
 
                self.request.body)
 
            self.set_status(202)
 

	
 

	
 
def launch(graph):
 
@@ -171,15 +167,13 @@ def launch(graph):
 
    reactor.listenTCP(networking.captureDevice.port,
 
                      cyclone.web.Application(handlers=[
 
                          (r'/()', cyclone.web.StaticFileHandler, {
 
                              "path": "light9/web",
 
                              "default_filename": "captureDevice.html"
 
                          }),
 
                          (r'/stats/(.*)', StatsHandler, {
 
                              'serverName': 'captureDevice'
 
                          }),
 
                          metricsRoute(),
 
                      ]),
 
                      interface='::',
 
                      cap=cap)
 
    log.info('serving http on %s', networking.captureDevice.port)
 

	
 

	
bin/effecteval
Show inline comments
 
@@ -8,23 +8,20 @@ import sys, optparse, logging, json, ite
 
from rdflib import URIRef, Literal
 

	
 
from light9 import networking, showconfig
 
from light9.effecteval.effect import EffectNode
 
from light9.effect.edit import getMusicStatus, songNotePatch
 
from light9.effecteval.effectloop import makeEffectLoop
 
from greplin.scales.cyclonehandler import StatsHandler
 
from light9.metrics import metrics, metricsRoute
 
from light9.namespaces import L9
 
from rdfdb.patch import Patch
 
from rdfdb.syncedgraph import SyncedGraph
 
from greplin import scales
 
from standardservice.scalessetup import gatherProcessStats
 

	
 
from cycloneerr import PrettyErrorHandler
 
from light9.coffee import StaticCoffee
 

	
 
gatherProcessStats()
 

	
 

	
 
class EffectEdit(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    def get(self):
 
        self.set_header('Content-Type', 'text/html')
 
@@ -223,25 +220,16 @@ class App(object):
 
        self.show = show
 
        self.outputWhere = outputWhere
 
        self.graph = SyncedGraph(networking.rdfdb.url, "effectEval")
 
        self.graph.initiallySynced.addCallback(self.launch).addErrback(
 
            log.error)
 

	
 
        self.stats = scales.collection(
 
            '/',
 
            scales.PmfStat('sendLevels', recalcPeriod=1),
 
            scales.PmfStat('getMusic', recalcPeriod=1),
 
            scales.PmfStat('evals', recalcPeriod=1),
 
            scales.PmfStat('sendOutput', recalcPeriod=1),
 
            scales.IntStat('errors'),
 
        )
 

	
 
    def launch(self, *args):
 
        log.info('launch')
 
        if self.outputWhere:
 
            self.loop = makeEffectLoop(self.graph, self.stats, self.outputWhere)
 
            self.loop = makeEffectLoop(self.graph, self.outputWhere)
 
            self.loop.startLoop()
 

	
 
        SFH = cyclone.web.StaticFileHandler
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/()', SFH, {
 
                'path': 'light9/effecteval',
 
@@ -257,19 +245,16 @@ class App(object):
 
            (r'/effectUpdates', EffectUpdates),
 
            (r'/code', Code),
 
            (r'/songEffectsUpdates', SongEffectsUpdates),
 
            (r'/effect/eval', EffectEval),
 
            (r'/songEffects', SongEffects),
 
            (r'/songEffects/eval', SongEffectsEval),
 
            (r'/stats/(.*)', StatsHandler, {
 
                'serverName': 'effecteval'
 
            }),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  graph=self.graph,
 
                                                  stats=self.stats)
 
                                                  graph=self.graph)
 
        reactor.listenTCP(networking.effectEval.port, self.cycloneApp)
 
        log.info("listening on %s" % networking.effectEval.port)
 

	
 

	
 
if __name__ == "__main__":
 
    parser = optparse.OptionParser()
bin/effectsequencer
Show inline comments
 
@@ -2,16 +2,15 @@
 
"""
 
plays back effect notes from the timeline
 
"""
 

	
 
from run_local import log
 
from twisted.internet import reactor
 
# from greplin.scales.cyclonehandler import StatsHandler
 
from light9.metrics import metrics, metricsRoute
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9 import networking, showconfig
 
# from greplin import scales
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
from light9.effect.sequencer import Sequencer, Updates
 
from light9.collector.collector_client import sendToCollector
 

	
 
@@ -24,21 +23,12 @@ class App(object):
 
        self.show = show
 
        self.session = session
 

	
 
        self.graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
        self.graph.initiallySynced.addCallback(self.launch)
 

	
 
        # self.stats = scales.collection(
 
        #     '/',
 
        #     scales.PmfStat('sendLevels', recalcPeriod=1),
 
        #     scales.PmfStat('getMusic', recalcPeriod=1),
 
        #     scales.PmfStat('evals', recalcPeriod=1),
 
        #     scales.PmfStat('sendOutput', recalcPeriod=1),
 
        #     scales.IntStat('errors'),
 
        # )
 

	
 
    def launch(self, *args):
 
        self.seq = Sequencer(
 
            self.graph,
 
            lambda settings: sendToCollector(
 
                'effectSequencer',
 
                self.session,
 
@@ -51,15 +41,13 @@ class App(object):
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/()', cyclone.web.StaticFileHandler, {
 
                "path": "light9/effect/",
 
                "default_filename": "sequencer.html"
 
            }),
 
            (r'/updates', Updates),
 
            # (r'/stats/(.*)', StatsHandler, {
 
            #     'serverName': 'effectsequencer'
 
            # }),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  seq=self.seq,
 
                                                  graph=self.graph,
 
                                                #   stats=self.stats
 
                                                  )
bin/paintserver
Show inline comments
 
#!bin/python
 

	
 
from run_local import log
 
import json
 
from twisted.internet import reactor
 
from greplin.scales.cyclonehandler import StatsHandler
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9 import networking, showconfig
 
from greplin import scales
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
from light9 import clientsession
 
import light9.paint.solve
 
from cycloneerr import PrettyErrorHandler
 
from light9.namespaces import L9, DEV
 
from light9.metrics import metrics
 
import imp
 

	
 

	
 
class Solve(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    def post(self):
 
        painting = json.loads(self.request.body)
 
        with self.settings.stats.solve.time():
 
        with metrics('solve').time():
 
            img = self.settings.solver.draw(painting)
 
            sample, sampleDist = self.settings.solver.bestMatch(
 
                img, device=DEV['aura2'])
 
            with self.settings.graph.currentState() as g:
 
                bestPath = g.value(sample, L9['imagePath']).replace(L9[''], '')
 
            #out = solver.solve(painting)
 
@@ -50,13 +49,13 @@ class Solve(PrettyErrorHandler, cyclone.
 
class BestMatches(PrettyErrorHandler, cyclone.web.RequestHandler):
 

	
 
    def post(self):
 
        body = json.loads(self.request.body)
 
        painting = body['painting']
 
        devs = [URIRef(d) for d in body['devices']]
 
        with self.settings.stats.solve.time():
 
        with metrics('solve').time():
 
            img = self.settings.solver.draw(painting)
 
            outSettings = self.settings.solver.bestMatches(img, devs)
 
            self.write(json.dumps({'settings': outSettings.asList()}))
 

	
 

	
 
class App(object):
 
@@ -66,16 +65,12 @@ class App(object):
 
        self.session = session
 

	
 
        self.graph = SyncedGraph(networking.rdfdb.url, "paintServer")
 
        self.graph.initiallySynced.addCallback(self.launch).addErrback(
 
            log.error)
 

	
 
        self.stats = scales.collection(
 
            '/',
 
            scales.PmfStat('solve', recalcPeriod=1),
 
        )
 

	
 
    def launch(self, *args):
 

	
 
        self.solver = light9.paint.solve.Solver(
 
            self.graph,
 
            sessions=[
 
@@ -88,22 +83,19 @@ class App(object):
 
                L9['show/dance2017/capture/q2/cap1873665'],
 
                L9['show/dance2017/capture/q3/cap1876223'],
 
            ])
 
        self.solver.loadSamples()
 

	
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/stats/(.*)', StatsHandler, {
 
                'serverName': 'paintserver'
 
            }),
 
            (r'/solve', Solve),
 
            (r'/bestMatches', BestMatches),
 
            metricsRoute(),
 
        ],
 
                                                  debug=True,
 
                                                  graph=self.graph,
 
                                                  solver=self.solver,
 
                                                  stats=self.stats)
 
                                                  solver=self.solver)
 
        reactor.listenTCP(networking.paintServer.port, self.cycloneApp)
 
        log.info("listening on %s" % networking.paintServer.port)
 

	
 

	
 
if __name__ == "__main__":
 
    parser = optparse.OptionParser()
bin/patchserver
Show inline comments
 
@@ -9,40 +9,35 @@ from twisted.internet.defer import inlin
 
import logging
 
import optparse
 
import os
 
import time
 
import treq
 
import cyclone.web, cyclone.websocket, cyclone.httpclient
 
from greplin import scales
 

	
 
from cycloneerr import PrettyErrorHandler
 

	
 
from light9.namespaces import L9, RDF
 
from light9 import networking, showconfig
 
from rdfdb.syncedgraph import SyncedGraph
 

	
 
from greplin.scales.cyclonehandler import StatsHandler
 
from light9.effect.settings import DeviceSettings
 
from rdfdb.patch import Patch
 
from light9.metrics import metrics, metricsRoute
 

	
 
stats = scales.collection('/webServer', scales.PmfStat('setAttr',
 
                                                       recalcPeriod=1))
 

	
 

	
 
def launch(graph):
 
    if 0:
 
        reactor.listenTCP(
 
            networking.captureDevice.port,
 
            cyclone.web.Application(handlers=[
 
                (r'/()', cyclone.web.StaticFileHandler, {
 
                    "path": "light9/web",
 
                    "default_filename": "patchServer.html"
 
                }),
 
                (r'/stats/(.*)', StatsHandler, {
 
                    'serverName': 'patchServer'
 
                }),
 
                metricsRoute(),
 
            ]),
 
            interface='::',
 
        )
 
        log.info('serving http on %s', networking.captureDevice.port)
 

	
 
    def prn():
bin/vidref
Show inline comments
 
@@ -16,38 +16,30 @@ light9/web/light9-vidref-playback.js Lit
 
"""
 
from run_local import log
 

	
 
from typing import cast
 
import logging, optparse, json, base64, os, glob
 

	
 
# from greplin import scales
 
# from greplin.scales.cyclonehandler import StatsHandler
 
from light9.metrics import metrics, metricsRoute
 

	
 
from rdflib import URIRef
 
from twisted.internet import reactor, defer
 
import cyclone.web, cyclone.httpclient, cyclone.websocket
 

	
 
from cycloneerr import PrettyErrorHandler
 
from light9 import networking, showconfig
 
from light9.newtypes import Song
 
from light9.vidref import videorecorder
 
from rdfdb.syncedgraph import SyncedGraph
 
# from standardservice.scalessetup import gatherProcessStats
 

	
 
parser = optparse.OptionParser()
 
parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG")
 
(options, args) = parser.parse_args()
 

	
 
log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 

	
 
# gatherProcessStats()
 
# stats = scales.collection(
 
#     '/webServer',
 
#     scales.RecentFpsStat('liveWebsocketFrameFps'),
 
#     scales.IntStat('liveClients'),
 
# )
 

	
 

	
 
class Snapshot(cyclone.web.RequestHandler):
 

	
 
    @defer.inlineCallbacks
 
    def post(self):
 
        # save next pic
 
@@ -75,24 +67,22 @@ pipeline = videorecorder.GstSource(
 

	
 

	
 
class Live(cyclone.websocket.WebSocketHandler):
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        pipeline.liveImages.subscribe(on_next=self.onFrame)
 
        # stats.liveClients += 1
 
        metrics('live_clients').offset(1)
 

	
 
    def connectionLost(self, reason):
 
        #self.subj.dispose()
 
        # stats.liveClients -= 1
 
        pass
 
        metrics('live_clients').offset(-1)
 

	
 
    def onFrame(self, cf: videorecorder.CaptureFrame):
 
        if cf is None: return
 

	
 
        # stats.liveWebsocketFrameFps.mark()
 

	
 
        with metrics('live_websocket_frame_fps').time():
 
        self.sendMessage(
 
            json.dumps({
 
                'jpeg': base64.b64encode(cf.asJpeg()).decode('ascii'),
 
                'description': f't={cf.t}',
 
            }))
 

	
 
@@ -186,15 +176,13 @@ reactor.listenTCP(
 
            (r'/snapshot', Snapshot),
 
            (r'/snapshot/(.*)', SnapshotPic, {
 
                "path": 'todo',
 
            }),
 
            (r'/time', Time),
 
            (r'/time/stream', TimeStream),
 
            # (r'/stats/(.*)', StatsHandler, {
 
            #     'serverName': 'vidref'
 
            # }),
 
            metricsRoute(),
 
        ],
 
        debug=True,
 
    ))
 
log.info("serving on %s" % port)
 

	
 
reactor.run()
light9/ascoltami/player.py
Show inline comments
 
@@ -3,20 +3,15 @@
 
alternate to the mpd music player, for ascoltami
 
"""
 

	
 
import time, logging, traceback
 
from gi.repository import Gst
 
from twisted.internet import task
 
from greplin import scales
 

	
 
from light9.metrics import metrics
 
log = logging.getLogger()
 

	
 
stats = scales.collection(
 
    '/player',
 
    # scales.RecentFpsStat('currentTimeFps'),
 
)
 

	
 

	
 
class Player(object):
 

	
 
    def __init__(self, autoStopOffset=4, onEOS=None):
 
        """autoStopOffset is the number of seconds before the end of
 
@@ -139,13 +134,13 @@ class Player(object):
 
        try:
 
            open(songPath[len("file://"):], 'rb').read()
 
        except IOError as e:
 
            log.error("couldn't preload %s, %r", songPath, e)
 
            raise
 

	
 
    # @stats.currentTimeFps.rate()
 
    @metrics('current_time').time()
 
    def currentTime(self):
 
        success, cur = self.playbin.query_position(Gst.Format.TIME)
 
        if not success:
 
            return 0
 
        return cur / Gst.SECOND
 

	
light9/ascoltami/webapp.py
Show inline comments
 
import json, socket, subprocess, os, logging, time
 

	
 
from cyclone import template
 
from rdflib import URIRef
 
import cyclone.web, cyclone.websocket
 
# from greplin.scales.cyclonehandler import StatsHandler
 

	
 
from cycloneerr import PrettyErrorHandler
 
from light9.metrics import metricsRoute
 
from light9.namespaces import L9
 
from light9.showconfig import getSongsFromShow, songOnDisk
 
from twisted.internet import reactor
 
_songUris = {}  # locationUri : song
 
log = logging.getLogger()
 
loader = template.Loader(os.path.dirname(__file__))
 
@@ -202,11 +202,9 @@ def makeWebApp(app):
 
        (r"/time/stream", timeStreamResource),
 
        (r"/song", songResource),
 
        (r"/songs", songs),
 
        (r"/seekPlayOrPause", seekPlayOrPause),
 
        (r"/output", output),
 
        (r"/go", goButton),
 
        # (r'/stats/(.*)', StatsHandler, {
 
        #     'serverName': 'ascoltami'
 
        # }),
 
        metricsRoute(),
 
    ],
 
                                   app=app)
light9/collector/collector_client.py
Show inline comments
 
from light9 import networking
 
from light9.effect.settings import DeviceSettings
 
from light9.metrics import metrics
 
from twisted.internet import defer
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
import json, time, logging
 
import treq
 
# from greplin import scales
 

	
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 
# stats = scales.collection(
 
#     '/collectorClient',
 
#     scales.PmfStat('send', recalcPeriod=1),
 
# )
 

	
 

	
 
class TwistedZmqClient(object):
 

	
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
@@ -55,13 +50,13 @@ def sendToCollector(client, session, set
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
 

	
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        # stats.send = dt
 
        metrics('send').observe(dt)
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 

	
 
    d.addCallback(onDone)
 

	
light9/collector/output.py
Show inline comments
 
@@ -2,13 +2,13 @@ from rdflib import URIRef
 
import socket
 
import struct
 
import time
 
import usb.core
 
import logging
 
from twisted.internet import threads, reactor, task
 
from greplin import scales
 
from light9.metrics import metrics
 
log = logging.getLogger('output')
 
logAllDmx = logging.getLogger('output.allDmx')
 

	
 

	
 
class Output(object):
 
    """
 
@@ -21,20 +21,12 @@ class Output(object):
 
    """
 
    uri: URIRef
 

	
 
    def __init__(self, uri: URIRef):
 
        self.uri = uri
 

	
 
        self.statPath = '/output%s' % self.shortId()
 
        scales.init(self, self.statPath)
 

	
 
        self._writeStats = scales.collection(
 
            self.statPath + '/write', scales.IntStat('succeed'),
 
            scales.IntStat('fail'), scales.PmfStat('call', recalcPeriod=1),
 
            scales.RecentFpsStat('fps'))
 

	
 
        self._currentBuffer = b''
 

	
 
        if log.isEnabledFor(logging.DEBUG):
 
            self._lastLoggedMsg = ''
 
            task.LoopingCall(self._periodicLog).start(1)
 

	
 
@@ -88,18 +80,18 @@ class BackgroundLoopOutput(Output):
 

	
 
    def _loop(self):
 
        start = time.time()
 
        sendingBuffer = self._currentBuffer
 

	
 
        def done(worked):
 
            self._writeStats.succeed += 1
 
            metrics('write_success', output=self.shortId()).incr()
 
            reactor.callLater(max(0, start + 1 / self.rate - time.time()),
 
                              self._loop)
 

	
 
        def err(e):
 
            self._writeStats.fail += 1
 
            metrics('write_fail', output=self.shortId()).incr()
 
            log.error(e)
 
            reactor.callLater(.2, self._loop)
 

	
 
        d = threads.deferToThread(self._write, sendingBuffer)
 
        d.addCallbacks(done, err)
 

	
 
@@ -110,14 +102,13 @@ class FtdiDmx(BackgroundLoopOutput):
 
        super().__init__(uri)
 
        self.lastDmxChannel = lastDmxChannel
 
        from .dmx_controller_output import OpenDmxUsb
 
        self.dmx = OpenDmxUsb()
 

	
 
    def _write(self, buf):
 
        self._writeStats.fps.mark()
 
        with self._writeStats.call.time():
 
        with metrics('write', output=self.shortId()).time():
 
            if not buf:
 
                logAllDmx.debug('%s: empty buf- no output', self.shortId())
 
                return
 

	
 
            # ok to truncate the last channels if they just went
 
            # to 0? No it is not. DMX receivers don't add implicit
 
@@ -144,15 +135,13 @@ class ArtnetDmx(BackgroundLoopOutput):
 
        packet.extend([0x00, 0x0e])  # Protocol version 14
 
        self.base_packet = packet
 
        self.sequence_counter = 255
 
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 

	
 
    def _write(self, buf):
 
        self._writeStats.fps.mark()
 
        with self._writeStats.call.time():
 

	
 
       with metrics('write', output=self.shortId()).time():
 
            if not buf:
 
                logAllDmx.debug('%s: empty buf- no output', self.shortId())
 
                return
 

	
 
            if logAllDmx.isEnabledFor(logging.DEBUG):
 
                # for testing fps, smooth fades, etc
 
@@ -213,14 +202,14 @@ class Udmx(BackgroundLoopOutput):
 
    #def _loop(self):
 
    #    pass
 
    def _write(self, buf):
 
        if not self.dev:
 
            log.info('%s: trying to connect', self.shortId())
 
            raise ValueError()
 
        self._writeStats.fps.mark()
 
        with self._writeStats.call.time():
 

	
 
        with metrics('write', output=self.shortId()).time():
 
            try:
 
                if not buf:
 
                    logAllDmx.debug('%s: empty buf- no output', self.shortId())
 
                    return
 

	
 
                # ok to truncate the last channels if they just went
light9/effect/sequencer.py
Show inline comments
 
@@ -19,37 +19,18 @@ from light9.ascoltami.musictime_client i
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song
 
from rdfdb.syncedgraph import SyncedGraph
 
# from standardservice.scalessetup import gatherProcessStats
 
from light9.metrics import metrics
 

	
 
# from greplin import scales
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 

	
 
# gatherProcessStats()
 
# updateStats = scales.collection(
 
#     '/update',
 
#     scales.PmfStat('s0_getMusic', recalcPeriod=1),
 
#     scales.PmfStat('s1_eval', recalcPeriod=1),
 
#     #scales.PmfStat('s3_send_client', recalcPeriod=1),
 
#     scales.PmfStat('s3_send', recalcPeriod=1),
 
#     scales.PmfStat('updateLoopLatency', recalcPeriod=1),
 
#     scales.DoubleStat('updateLoopLatencyGoal'),
 
#     scales.RecentFpsStat('updateFps'),
 
#     scales.DoubleStat('goalFps'),
 
# )
 
# compileStats = scales.collection(
 
#     '/compile',
 
#     scales.PmfStat('graph', recalcPeriod=1),
 
#     scales.PmfStat('song', recalcPeriod=1),
 
# )
 

	
 

	
 
def pyType(n):
 
    ret = n.toPython()
 
    if isinstance(ret, Decimal):
 
        return float(ret)
 
    return ret
 
@@ -168,14 +149,14 @@ class Sequencer(object):
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], Deferred],
 
                 fps=40):
 
        self.graph = graph
 
        self.fps = fps
 
        # updateStats.goalFps = self.fps
 
        # updateStats.updateLoopLatencyGoal = 1 / self.fps
 
        metrics('update_loop_goal_fps').set(self.fps)
 
        metrics('update_loop_goal_latency').set(1 / self.fps)
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 

	
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
        self._compileGraphCall = None
 
@@ -189,23 +170,23 @@ class Sequencer(object):
 

	
 
    def onCodeChange(self):
 
        log.debug('seq.onCodeChange')
 
        self.graph.addHandler(self.compileGraph)
 
        #self.updateLoop()
 

	
 
    # @compileStats.graph.time()
 
    @metrics('compile_graph').time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        for song in self.graph.subjects(RDF.type, L9['Song']):
 

	
 
            def compileSong(song: Song = cast(Song, song)) -> None:
 
                self.compileSong(song)
 

	
 
            self.graph.addHandler(compileSong)
 

	
 
    # @compileStats.song.time()
 
    @metrics('compile_song').time()
 
    def compileSong(self, song: Song) -> None:
 
        anyErrors = False
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            try:
 
                n = Note(self.graph, NoteUri(note), effecteval,
 
@@ -227,38 +208,38 @@ class Sequencer(object):
 
            self.lastLoopSucceeded = False
 
            traceback.print_exc()
 
            log.warn('updateLoop: %r', e)
 
            reactor.callLater(1, self.updateLoop)
 
        else:
 
            took = time.time() - frameStart
 
            # updateStats.updateLoopLatency = took
 
            metrics('update_loop_latency').observe(took)
 

	
 
            if not self.lastLoopSucceeded:
 
                log.info('Sequencer.update is working')
 
                self.lastLoopSucceeded = True
 

	
 
            delay = max(0, 1 / self.fps - took)
 
            reactor.callLater(delay, self.updateLoop)
 

	
 
    # @updateStats.updateFps.rate()
 
    @metrics('update_call').time()
 
    @inlineCallbacks
 
    def update(self) -> Deferred:
 

	
 
        if 1:#with updateStats.s0_getMusic.time():
 
        with metrics('update_s0_getMusic').time():
 
            musicState = self.music.getLatest()
 
            if not musicState.get('song') or not isinstance(
 
                    musicState.get('t'), float):
 
                return defer.succeed(0.0)
 
            song = Song(URIRef(musicState['song']))
 
            dispatcher.send('state',
 
                            update={
 
                                'song': str(song),
 
                                't': musicState['t']
 
                            })
 

	
 
        if 1:#with updateStats.s1_eval.time():
 
        with metrics('update_s1_eval').time():
 
            settings = []
 
            songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
            noteReports = []
 
            for note in songNotes:
 
                try:
 
                    s, report = note.outputSettings(musicState['t'])
 
@@ -268,19 +249,19 @@ class Sequencer(object):
 
                noteReports.append(report)
 
                settings.append(s)
 
            devSettings = DeviceSettings.fromList(self.graph, settings)
 

	
 
        dispatcher.send('state', update={'songNotes': noteReports})
 

	
 
        if 1:#with updateStats.s3_send.time():  # our measurement
 
        with metrics('update_s3_send').time():  # our measurement
 
            sendSecs = yield self.sendToCollector(devSettings)
 

	
 
        # sendToCollector's own measurement.
 
        # (sometimes it's None, not sure why, and neither is mypy)
 
        #if isinstance(sendSecs, float):
 
        #    updateStats.s3_send_client = sendSecs
 
        #    metrics('update_s3_send_client').observe(sendSecs)
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
 

	
 
    def __init__(self, application, request, **kwargs) -> None:
 
        cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
light9/effecteval/effectloop.py
Show inline comments
 
@@ -11,22 +11,23 @@ import treq
 
from light9 import Effects
 
from light9 import Submaster
 
from light9 import dmxclient
 
from light9 import networking
 
from light9.effecteval.effect import EffectNode
 
from light9.namespaces import L9, RDF
 
from light9.metrics import metrics
 

	
 
log = logging.getLogger('effectloop')
 

	
 

	
 
class EffectLoop(object):
 
    """maintains a collection of the current EffectNodes, gets time from
 
    music player, sends dmx"""
 

	
 
    def __init__(self, graph, stats):
 
        self.graph, self.stats = graph, stats
 
    def __init__(self, graph):
 
        self.graph = graph
 
        self.currentSong = None
 
        self.currentEffects = [
 
        ]  # EffectNodes for the current song plus the submaster ones
 
        self.lastLogTime = 0
 
        self.lastLogMsg = ""
 
        self.lastErrorLog = 0
 
@@ -86,13 +87,13 @@ class EffectLoop(object):
 
            estimated += now - self.requestTime
 
        returnValue((estimated, self.currentSong))
 

	
 
    @inlineCallbacks
 
    def updateTimeFromMusic(self):
 
        t1 = time.time()
 
        with self.stats.getMusic.time():
 
        with metrics('get_music').time():
 
            self.songTime, song = yield self.getSongTime()
 
            self.songTimeFetch = time.time()
 

	
 
        if song != self.currentSong:
 
            self.currentSong = song
 
            # this may be piling on the handlers
 
@@ -113,29 +114,29 @@ class EffectLoop(object):
 
    def sendLevels(self):
 
        t1 = time.time()
 
        log.debug("time since last call: %.1f ms" %
 
                  (1000 * (t1 - self.lastSendLevelsTime)))
 
        self.lastSendLevelsTime = t1
 
        try:
 
            with self.stats.sendLevels.time():
 
            with metrics('send_levels').time():
 
                if self.currentSong is not None:
 
                    log.debug('allEffectOutputs')
 
                    with self.stats.evals.time():
 
                    with metrics('evals').time():
 
                        outputs = self.allEffectOutputs(
 
                            self.estimatedSongTime())
 
                    log.debug('combineOutputs')
 
                    combined = self.combineOutputs(outputs)
 
                    self.logLevels(t1, combined)
 
                    log.debug('sendOutput')
 
                    with self.stats.sendOutput.time():
 
                    with metrics('send_output').time():
 
                        yield self.sendOutput(combined)
 

	
 
                elapsed = time.time() - t1
 
                dt = max(0, self.period - elapsed)
 
        except Exception:
 
            self.stats.errors += 1
 
            metrics('errors').incr()
 
            traceback.print_exc()
 
            dt = .5
 

	
 
        reactor.callLater(dt, self.sendLevels)
 

	
 
    def combineOutputs(self, outputs):
 
@@ -309,13 +310,13 @@ class LedLoop(EffectLoop):
 

	
 
    def logMessage(self, out):
 
        return str([(w, p.tolist() if isinstance(p, numpy.ndarray) else p)
 
                    for w, p in list(out.items())])
 

	
 

	
 
def makeEffectLoop(graph, stats, outputWhere):
 
def makeEffectLoop(graph, outputWhere):
 
    if outputWhere == 'dmx':
 
        return EffectLoop(graph, stats)
 
        return EffectLoop(graph)
 
    elif outputWhere == 'leds':
 
        return LedLoop(graph, stats)
 
        return LedLoop(graph)
 
    else:
 
        raise NotImplementedError("unknown output system %r" % outputWhere)
light9/metrics.py
Show inline comments
 
new file 100644
 
"""for easier porting, and less boilerplate, allow these styles using the
 
form of the call to set up the right type of metric automatically:
 

	
 
  from metrics import metrics
 
  metrics.setProcess('pretty_name')
 

	
 
  @metrics('loop').time()              # a common one to get the fps of each service. Gets us qty and time
 
  def frame():
 
      if err:
 
         metrics('foo_errors').incr()  # if you incr it, it's a counter
 

	
 
  @metrics('foo_calls').time()         # qty & time because it's a decorator
 
  def foo(): 
 

	
 
  metrics('goal_fps').set(f)           # a gauge because we called set()
 

	
 
  with metrics('recompute'): ...       # ctxmgr also makes a timer
 
     time_this_part()
 

	
 
I don't see a need for labels yet, but maybe some code will want like
 
metrics('foo', label1=one). Need histogram? Info?
 

	
 
"""
 
from typing import Dict, Tuple, Callable, Type, TypeVar, cast
 
import cyclone.web
 
from prometheus_client import Counter, Gauge, Metric, Summary
 
from prometheus_client.exposition import generate_latest
 
from prometheus_client.registry import REGISTRY
 

	
 
_created: Dict[str, Metric] = {}
 

	
 
# _process=sys.argv[0]
 
# def setProcess(name: str):
 
#   global _process
 
#   _process = name
 

	
 
MT = TypeVar("MT")
 

	
 

	
 
class _MetricsRequest:
 

	
 
    def __init__(self, name: str, **labels):
 
        self.name = name
 
        self.labels = labels
 

	
 
    def _ensure(self, cls: Type[MT]) -> MT:
 
        if self.name not in _created:
 
            _created[self.name] = cls(name=self.name, documentation=self.name, labelnames=self.labels.keys())
 
        m = _created[self.name]
 
        if self.labels:
 
            m = m.labels(**self.labels)
 
        return m
 

	
 
    def __call__(self, fn) -> Callable:
 
        return timed_fn
 

	
 
    def set(self, v: float):
 
        self._ensure(Gauge).set(v)
 

	
 
    def inc(self):
 
        self._ensure(Counter).inc()
 

	
 
    def offset(self, amount: float):
 
        self._ensure(Gauge).inc(amount)
 

	
 
    def time(self):
 
        return self._ensure(Summary).time()
 

	
 
    def observe(self, x: float):
 
        return self._ensure(Summary).observe(x)
 

	
 
    def __enter__(self):
 
        return self._ensure(Summary).__enter__()
 

	
 

	
 
def metrics(name: str, **labels):
 
    return _MetricsRequest(name, **labels)
 

	
 

	
 
class _CycloneMetrics(cyclone.web.RequestHandler):
 

	
 
    def get(self):
 
        self.add_header('content-type', 'text/plain')
 
        self.write(generate_latest(REGISTRY))
 

	
 

	
 
def metricsRoute() -> Tuple[str, Type[cyclone.web.RequestHandler]]:
 
    return ('/metrics', _CycloneMetrics)
 

	
 

	
 
"""
 
stuff we used to have in greplin. Might be nice to get (client-side-computed) min/max/stddev back.
 

	
 
class PmfStat(Stat):
 
  A stat that stores min, max, mean, standard deviation, and some
 
  percentiles for arbitrary floating-point data. This is potentially a
 
  bit expensive, so its child values are only updated once every
 
  twenty seconds.
 

	
 

	
 

	
 
metrics consumer side can do this with the changing counts:
 

	
 
class RecentFps(object):
 
  def __init__(self, window=20):
 
    self.window = window
 
    self.recentTimes = []
 

	
 
  def mark(self):
 
    now = time.time()
 
    self.recentTimes.append(now)
 
    self.recentTimes = self.recentTimes[-self.window:]
 

	
 
  def rate(self):
 
    def dec(innerFunc):
 
      def f(*a, **kw):
 
        self.mark()
 
        return innerFunc(*a, **kw)
 
      return f
 
    return dec
 

	
 
  def __call__(self):
 
    if len(self.recentTimes) < 2:
 
      return {}
 
    recents = sorted(round(1 / (b - a), 3)
 
                      for a, b in zip(self.recentTimes[:-1],
 
                                      self.recentTimes[1:]))
 
    avg = (len(self.recentTimes) - 1) / (
 
      self.recentTimes[-1] - self.recentTimes[0])
 
    return {'average': round(avg, 5), 'recents': recents}
 

	
 

	
 
i think prometheus covers this one:
 

	
 
import psutil
 
def gatherProcessStats():
 
    procStats = scales.collection('/process',
 
                                  scales.DoubleStat('time'),
 
                                  scales.DoubleStat('cpuPercent'),
 
                                  scales.DoubleStat('memMb'),
 
    )
 
    proc = psutil.Process()
 
    lastCpu = [0.]
 
    def updateTimeStat():
 
        now = time.time()
 
        procStats.time = round(now, 3)
 
        if now - lastCpu[0] > 3:
 
            procStats.cpuPercent = round(proc.cpu_percent(), 6) # (since last call)
 
            lastCpu[0] = now
 
        procStats.memMb = round(proc.memory_info().rss / 1024 / 1024, 6)
 
    task.LoopingCall(updateTimeStat).start(.1)
 

	
 
"""
 

	
 

	
 
class M:
 

	
 
    def __call__(self, name):
 
        return
light9/vidref/videorecorder.py
Show inline comments
 
@@ -5,46 +5,35 @@ import time, logging, os, traceback
 

	
 
import gi
 
gi.require_version('Gst', '1.0')
 
gi.require_version('GstBase', '1.0')
 

	
 
from gi.repository import Gst
 
# from greplin import scales
 
from rdflib import URIRef
 
from rx.subject import BehaviorSubject
 
from twisted.internet import threads
 
import PIL.Image
 
import moviepy.editor
 
import numpy
 

	
 
from light9 import showconfig
 
from light9.ascoltami.musictime_client import MusicTime
 
from light9.newtypes import Song
 

	
 
from light9.metrics import metrics
 
log = logging.getLogger()
 

	
 
# stats = scales.collection(
 
#     '/recorder',
 
#     scales.PmfStat('jpegEncode', recalcPeriod=1),
 
#     scales.IntStat('deletes'),
 
#     scales.PmfStat('waitForNextImg', recalcPeriod=1),
 
#     scales.PmfStat('crop', recalcPeriod=1),
 
#     scales.RecentFpsStat('encodeFrameFps'),
 
#     scales.RecentFpsStat('queueGstFrameFps'),
 
# )
 

	
 

	
 
@dataclass
 
class CaptureFrame:
 
    img: PIL.Image
 
    song: Song
 
    t: float
 
    isPlaying: bool
 
    imgJpeg: Optional[bytes] = None
 

	
 
    # @stats.jpegEncode.time()
 
    @metrics('jpeg_encode').time()
 
    def asJpeg(self):
 
        if not self.imgJpeg:
 
            output = BytesIO()
 
            self.img.save(output, 'jpeg', quality=80)
 
            self.imgJpeg = output.getvalue()
 
        return self.imgJpeg
 
@@ -70,13 +59,13 @@ def deleteClip(uri: URIRef):
 
    w = uri.split('/')[-4:]
 
    path = '/'.join([
 
        w[0], w[1], 'video', f'light9.bigasterisk.com_{w[0]}_{w[1]}_{w[2]}',
 
        w[3]
 
    ])
 
    log.info(f'deleting {uri} {path}')
 
    # stats.deletes += 1
 
    metrics('deletes').incr()
 
    for fn in [path + '.mp4', path + '.timing']:
 
        os.remove(fn)
 

	
 

	
 
class FramesToVideoFiles:
 
    """
 
@@ -170,13 +159,13 @@ class FramesToVideoFiles:
 

	
 
        if self.currentClipFrameCount < 400:
 
            log.info('too small- deleting')
 
            deleteClip(takeUri(self.outMp4.encode('ascii')))
 

	
 
    def _bg_make_frame(self, video_time_secs):
 
        # stats.encodeFrameFps.mark()
 
        metrics('encodeFrameFps').incr()
 
        if self.nextWriteAction == 'close':
 
            raise StopIteration  # the one in write_videofile
 
        elif self.nextWriteAction == 'notWritingClip':
 
            raise NotImplementedError
 
        elif self.nextWriteAction == 'saveFrames':
 
            pass
 
@@ -184,13 +173,13 @@ class FramesToVideoFiles:
 
            raise NotImplementedError(self.nextWriteAction)
 

	
 
        # should be a little queue to miss fewer frames
 
        t1 = time.time()
 
        while self.nextImg is None:
 
            time.sleep(.015)
 
        # stats.waitForNextImg = time.time() - t1
 
        metrics('wait_for_next_img').observe(time.time() - t1)
 
        cf, self.nextImg = self.nextImg, None
 

	
 
        self.frameMap.write(f'video {video_time_secs:g} = song {cf.t:g}\n')
 
        self.currentClipFrameCount += 1
 
        return numpy.asarray(cf.img)
 

	
 
@@ -245,23 +234,23 @@ class GstSource:
 
                img = self.crop(img)
 
            finally:
 
                buf.unmap(mapinfo)
 
            # could get gst's frame time and pass it to getLatest
 
            latest = self.musicTime.getLatest()
 
            if 'song' in latest:
 
                # stats.queueGstFrameFps.mark()
 
                metrics('queue_gst_frame_fps').incr()
 
                self.liveImages.on_next(
 
                    CaptureFrame(img=img,
 
                                 song=Song(latest['song']),
 
                                 t=latest['t'],
 
                                 isPlaying=latest['playing']))
 
        except Exception:
 
            traceback.print_exc()
 
        return Gst.FlowReturn.OK
 

	
 
    # @stats.crop.time()
 
    @metrics('crop').time()
 
    def crop(self, img):
 
        return img.crop((40, 100, 790, 310))
 

	
 
    def setupPipelineError(self, pipe, cb):
 
        bus = pipe.get_bus()
 

	
light9/zmqtransport.py
Show inline comments
 
import json
 
from rdflib import URIRef, Literal
 
from greplin import scales
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPullConnection
 
from light9.metrics import metrics
 
import logging
 

	
 
log = logging.getLogger('zmq')
 

	
 

	
 
def parseJsonMessage(msg):
 
@@ -17,28 +17,21 @@ def parseJsonMessage(msg):
 
            value = Literal(value)
 
        settings.append((URIRef(device), URIRef(attr), value))
 
    return body['client'], body['clientSession'], settings, body['sendTime']
 

	
 

	
 
def startZmq(port, collector):
 
    stats = scales.collection(
 
        '/zmqServer',
 
        scales.PmfStat('setAttr', recalcPeriod=1),
 
        scales.RecentFpsStat('setAttrFps'),
 
    )
 

	
 
    zf = ZmqFactory()
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
 

	
 
    class Pull(ZmqPullConnection):
 
        #highWaterMark = 3
 
        def onPull(self, message):
 
            stats.setAttrFps.mark()
 
            with stats.setAttr.time():
 
            with metrics('zmq_server_set_attr').time():
 
                # todo: new compressed protocol where you send all URIs up
 
                # front and then use small ints to refer to devices and
 
                # attributes in subsequent requests.
 
                client, clientSession, settings, sendTime = parseJsonMessage(
 
                    message[0])
 
                collector.setAttrs(client, clientSession, settings, sendTime)
0 comments (0 inline, 0 general)