Changeset - 67575505c400
[Not reviewed]
default
0 5 0
drewp@bigasterisk.com - 3 years ago 2022-05-10 06:18:05
drewp@bigasterisk.com
comment out more greplin so services can start
5 files changed with 81 insertions and 79 deletions:
0 comments (0 inline, 0 general)
bin/effectsequencer
Show inline comments
 
#!bin/python
 
"""
 
plays back effect notes from the timeline
 
"""
 

	
 
from run_local import log
 
from twisted.internet import reactor
 
from greplin.scales.cyclonehandler import StatsHandler
 
# from greplin.scales.cyclonehandler import StatsHandler
 
from rdfdb.syncedgraph import SyncedGraph
 
from light9 import networking, showconfig
 
from greplin import scales
 
# from greplin import scales
 
import optparse, sys, logging
 
import cyclone.web
 
from rdflib import URIRef
 
from light9.effect.sequencer import Sequencer, Updates
 
from light9.collector.collector_client import sendToCollector
 

	
 
from light9 import clientsession
 

	
 

	
 
class App(object):
 

	
 
    def __init__(self, show, session):
 
        self.show = show
 
        self.session = session
 

	
 
        self.graph = SyncedGraph(networking.rdfdb.url, "effectSequencer")
 
        self.graph.initiallySynced.addCallback(self.launch)
 

	
 
        self.stats = scales.collection(
 
            '/',
 
            scales.PmfStat('sendLevels', recalcPeriod=1),
 
            scales.PmfStat('getMusic', recalcPeriod=1),
 
            scales.PmfStat('evals', recalcPeriod=1),
 
            scales.PmfStat('sendOutput', recalcPeriod=1),
 
            scales.IntStat('errors'),
 
        )
 
        # self.stats = scales.collection(
 
        #     '/',
 
        #     scales.PmfStat('sendLevels', recalcPeriod=1),
 
        #     scales.PmfStat('getMusic', recalcPeriod=1),
 
        #     scales.PmfStat('evals', recalcPeriod=1),
 
        #     scales.PmfStat('sendOutput', recalcPeriod=1),
 
        #     scales.IntStat('errors'),
 
        # )
 

	
 
    def launch(self, *args):
 
        self.seq = Sequencer(
 
            self.graph,
 
            lambda settings: sendToCollector(
 
                'effectSequencer',
 
                self.session,
 
                settings,
 
                # This seems to be safe here (and lets us get from
 
                # 20fpx to 40fpx), even though it leads to big stalls
 
                # if I use it on KC.
 
                useZmq=True))
 

	
 
        self.cycloneApp = cyclone.web.Application(handlers=[
 
            (r'/()', cyclone.web.StaticFileHandler, {
 
                "path": "light9/effect/",
 
                "default_filename": "sequencer.html"
 
            }),
 
            (r'/updates', Updates),
 
            (r'/stats/(.*)', StatsHandler, {
 
                'serverName': 'effectsequencer'
 
            }),
 
            # (r'/stats/(.*)', StatsHandler, {
 
            #     'serverName': 'effectsequencer'
 
            # }),
 
        ],
 
                                                  debug=True,
 
                                                  seq=self.seq,
 
                                                  graph=self.graph,
 
                                                  stats=self.stats)
 
                                                #   stats=self.stats
 
                                                  )
 
        reactor.listenTCP(networking.effectSequencer.port, self.cycloneApp)
 
        log.info("listening on %s" % networking.effectSequencer.port)
 

	
 

	
 
if __name__ == "__main__":
 
    parser = optparse.OptionParser()
 
    parser.add_option(
 
        '--show',
 
        help='show URI, like http://light9.bigasterisk.com/show/dance2008',
 
        default=showconfig.showUri())
 
    parser.add_option("-v",
 
                      "--verbose",
bin/vidref
Show inline comments
 
@@ -10,49 +10,49 @@ light9/vidref/videorecorder.py capture f
 
light9/vidref/replay.py backend for vidref.js playback element- figures out which frames go with the current song and time
 
light9/vidref/index.html web ui for watching current stage and song playback
 
light9/vidref/setup.html web ui for setup of camera params and frame crop
 
light9/web/light9-vidref-live.js LitElement for live video frames
 
light9/web/light9-vidref-playback.js LitElement for video playback
 

	
 
"""
 
from run_local import log
 

	
 
from typing import cast
 
import logging, optparse, json, base64, os, glob
 

	
 
from greplin import scales
 
from greplin.scales.cyclonehandler import StatsHandler
 
# from greplin import scales
 
# from greplin.scales.cyclonehandler import StatsHandler
 
from rdflib import URIRef
 
from twisted.internet import reactor, defer
 
import cyclone.web, cyclone.httpclient, cyclone.websocket
 

	
 
from cycloneerr import PrettyErrorHandler
 
from light9 import networking, showconfig
 
from light9.newtypes import Song
 
from light9.vidref import videorecorder
 
from rdfdb.syncedgraph import SyncedGraph
 
from standardservice.scalessetup import gatherProcessStats
 
# from standardservice.scalessetup import gatherProcessStats
 

	
 
parser = optparse.OptionParser()
 
parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG")
 
(options, args) = parser.parse_args()
 

	
 
log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 

	
 
gatherProcessStats()
 
stats = scales.collection(
 
    '/webServer',
 
    scales.RecentFpsStat('liveWebsocketFrameFps'),
 
    scales.IntStat('liveClients'),
 
)
 
# gatherProcessStats()
 
# stats = scales.collection(
 
#     '/webServer',
 
#     scales.RecentFpsStat('liveWebsocketFrameFps'),
 
#     scales.IntStat('liveClients'),
 
# )
 

	
 

	
 
class Snapshot(cyclone.web.RequestHandler):
 

	
 
    @defer.inlineCallbacks
 
    def post(self):
 
        # save next pic
 
        # return /snapshot/path
 
        try:
 
            snapshotDir = 'todo'
 
            outputFilename = yield self.settings.gui.snapshot()
 

	
 
@@ -69,34 +69,35 @@ class Snapshot(cyclone.web.RequestHandle
 
            raise
 

	
 

	
 
pipeline = videorecorder.GstSource(
 
    #'/dev/v4l/by-id/usb-Bison_HD_Webcam_200901010001-video-index0'
 
    '/dev/v4l/by-id/usb-Generic_FULL_HD_1080P_Webcam_200901010001-video-index0')
 

	
 

	
 
class Live(cyclone.websocket.WebSocketHandler):
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        pipeline.liveImages.subscribe(on_next=self.onFrame)
 
        stats.liveClients += 1
 
        # stats.liveClients += 1
 

	
 
    def connectionLost(self, reason):
 
        #self.subj.dispose()
 
        stats.liveClients -= 1
 
        # stats.liveClients -= 1
 
        pass
 

	
 
    def onFrame(self, cf: videorecorder.CaptureFrame):
 
        if cf is None: return
 

	
 
        stats.liveWebsocketFrameFps.mark()
 
        # stats.liveWebsocketFrameFps.mark()
 

	
 
        self.sendMessage(
 
            json.dumps({
 
                'jpeg': base64.b64encode(cf.asJpeg()).decode('ascii'),
 
                'description': f't={cf.t}',
 
            }))
 

	
 

	
 
class SnapshotPic(cyclone.web.StaticFileHandler):
 
    pass
 

	
 

	
 
@@ -179,21 +180,21 @@ reactor.listenTCP(
 
                'path': 'light9/vidref',
 
                'default_filename': 'setup.html'
 
            }),
 
            (r'/live', Live),
 
            (r'/clips', Clips),
 
            (r'/replayMap', ReplayMap),
 
            (r'/snapshot', Snapshot),
 
            (r'/snapshot/(.*)', SnapshotPic, {
 
                "path": 'todo',
 
            }),
 
            (r'/time', Time),
 
            (r'/time/stream', TimeStream),
 
            (r'/stats/(.*)', StatsHandler, {
 
                'serverName': 'vidref'
 
            }),
 
            # (r'/stats/(.*)', StatsHandler, {
 
            #     'serverName': 'vidref'
 
            # }),
 
        ],
 
        debug=True,
 
    ))
 
log.info("serving on %s" % port)
 

	
 
reactor.run()
light9/collector/collector_client.py
Show inline comments
 
from light9 import networking
 
from light9.effect.settings import DeviceSettings
 
from twisted.internet import defer
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 
import json, time, logging
 
import treq
 
from greplin import scales
 
# from greplin import scales
 

	
 
log = logging.getLogger('coll_client')
 

	
 
_zmqClient = None
 

	
 
stats = scales.collection(
 
    '/collectorClient',
 
    scales.PmfStat('send', recalcPeriod=1),
 
)
 
# stats = scales.collection(
 
#     '/collectorClient',
 
#     scales.PmfStat('send', recalcPeriod=1),
 
# )
 

	
 

	
 
class TwistedZmqClient(object):
 

	
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 

	
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 
@@ -49,24 +49,24 @@ def sendToCollector(client, session, set
 
                    useZmq=False) -> defer.Deferred:
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings).encode('utf8')
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg, timeout=1)
 

	
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        stats.send = dt
 
        # stats.send = dt
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 

	
 
    d.addCallback(onDone)
 

	
 
    def onErr(err):
 
        log.warn('sendToCollector failed: %r', err)
 

	
 
    d.addErrback(onErr)
 
    return d
light9/effect/sequencer.py
Show inline comments
 
@@ -13,48 +13,48 @@ import cyclone.sse
 
import logging, bisect, time
 
import traceback
 
from decimal import Decimal
 
from typing import Any, Callable, Dict, List, Tuple, cast, Union
 

	
 
from light9.ascoltami.musictime_client import MusicTime
 
from light9.effect import effecteval
 
from light9.effect.settings import DeviceSettings
 
from light9.effect.simple_outputs import SimpleOutputs
 
from light9.namespaces import L9, RDF
 
from light9.newtypes import DeviceUri, DeviceAttr, NoteUri, Curve, Song
 
from rdfdb.syncedgraph import SyncedGraph
 
from standardservice.scalessetup import gatherProcessStats
 
# from standardservice.scalessetup import gatherProcessStats
 

	
 
from greplin import scales
 
# from greplin import scales
 
import imp
 

	
 
log = logging.getLogger('sequencer')
 

	
 
gatherProcessStats()
 
updateStats = scales.collection(
 
    '/update',
 
    scales.PmfStat('s0_getMusic', recalcPeriod=1),
 
    scales.PmfStat('s1_eval', recalcPeriod=1),
 
    #scales.PmfStat('s3_send_client', recalcPeriod=1),
 
    scales.PmfStat('s3_send', recalcPeriod=1),
 
    scales.PmfStat('updateLoopLatency', recalcPeriod=1),
 
    scales.DoubleStat('updateLoopLatencyGoal'),
 
    scales.RecentFpsStat('updateFps'),
 
    scales.DoubleStat('goalFps'),
 
)
 
compileStats = scales.collection(
 
    '/compile',
 
    scales.PmfStat('graph', recalcPeriod=1),
 
    scales.PmfStat('song', recalcPeriod=1),
 
)
 
# gatherProcessStats()
 
# updateStats = scales.collection(
 
#     '/update',
 
#     scales.PmfStat('s0_getMusic', recalcPeriod=1),
 
#     scales.PmfStat('s1_eval', recalcPeriod=1),
 
#     #scales.PmfStat('s3_send_client', recalcPeriod=1),
 
#     scales.PmfStat('s3_send', recalcPeriod=1),
 
#     scales.PmfStat('updateLoopLatency', recalcPeriod=1),
 
#     scales.DoubleStat('updateLoopLatencyGoal'),
 
#     scales.RecentFpsStat('updateFps'),
 
#     scales.DoubleStat('goalFps'),
 
# )
 
# compileStats = scales.collection(
 
#     '/compile',
 
#     scales.PmfStat('graph', recalcPeriod=1),
 
#     scales.PmfStat('song', recalcPeriod=1),
 
# )
 

	
 

	
 
def pyType(n):
 
    ret = n.toPython()
 
    if isinstance(ret, Decimal):
 
        return float(ret)
 
    return ret
 

	
 

	
 
class Note(object):
 

	
 
    def __init__(self, graph: SyncedGraph, uri: NoteUri, effectevalModule,
 
@@ -162,56 +162,56 @@ class CodeWatcher(object):
 
        # in case we got an event at the start of the write
 
        reactor.callLater(.1, go)
 

	
 

	
 
class Sequencer(object):
 

	
 
    def __init__(self,
 
                 graph: SyncedGraph,
 
                 sendToCollector: Callable[[DeviceSettings], Deferred],
 
                 fps=40):
 
        self.graph = graph
 
        self.fps = fps
 
        updateStats.goalFps = self.fps
 
        updateStats.updateLoopLatencyGoal = 1 / self.fps
 
        # updateStats.goalFps = self.fps
 
        # updateStats.updateLoopLatencyGoal = 1 / self.fps
 
        self.sendToCollector = sendToCollector
 
        self.music = MusicTime(period=.2, pollCurvecalc=False)
 

	
 
        self.recentUpdateTimes: List[float] = []
 
        self.lastStatLog = 0.0
 
        self._compileGraphCall = None
 
        self.notes: Dict[Song, List[Note]] = {}  # song: [notes]
 
        self.simpleOutputs = SimpleOutputs(self.graph)
 
        self.graph.addHandler(self.compileGraph)
 
        self.lastLoopSucceeded = False
 

	
 
        self.codeWatcher = CodeWatcher(onChange=self.onCodeChange)
 
        self.updateLoop()
 

	
 
    def onCodeChange(self):
 
        log.debug('seq.onCodeChange')
 
        self.graph.addHandler(self.compileGraph)
 
        #self.updateLoop()
 

	
 
    @compileStats.graph.time()
 
    # @compileStats.graph.time()
 
    def compileGraph(self) -> None:
 
        """rebuild our data from the graph"""
 
        for song in self.graph.subjects(RDF.type, L9['Song']):
 

	
 
            def compileSong(song: Song = cast(Song, song)) -> None:
 
                self.compileSong(song)
 

	
 
            self.graph.addHandler(compileSong)
 

	
 
    @compileStats.song.time()
 
    # @compileStats.song.time()
 
    def compileSong(self, song: Song) -> None:
 
        anyErrors = False
 
        self.notes[song] = []
 
        for note in self.graph.objects(song, L9['note']):
 
            try:
 
                n = Note(self.graph, NoteUri(note), effecteval,
 
                         self.simpleOutputs)
 
            except Exception:
 
                log.warn(f"failed to build Note {note} - skipping")
 
                anyErrors = True
 
                continue
 
            self.notes[song].append(n)
 
@@ -221,66 +221,66 @@ class Sequencer(object):
 
    @inlineCallbacks
 
    def updateLoop(self) -> None:
 
        frameStart = time.time()
 
        try:
 
            sec = yield self.update()
 
        except Exception as e:
 
            self.lastLoopSucceeded = False
 
            traceback.print_exc()
 
            log.warn('updateLoop: %r', e)
 
            reactor.callLater(1, self.updateLoop)
 
        else:
 
            took = time.time() - frameStart
 
            updateStats.updateLoopLatency = took
 
            # updateStats.updateLoopLatency = took
 

	
 
            if not self.lastLoopSucceeded:
 
                log.info('Sequencer.update is working')
 
                self.lastLoopSucceeded = True
 

	
 
            delay = max(0, 1 / self.fps - took)
 
            reactor.callLater(delay, self.updateLoop)
 

	
 
    @updateStats.updateFps.rate()
 
    # @updateStats.updateFps.rate()
 
    @inlineCallbacks
 
    def update(self) -> Deferred:
 

	
 
        with updateStats.s0_getMusic.time():
 
        if 1:#with updateStats.s0_getMusic.time():
 
            musicState = self.music.getLatest()
 
            if not musicState.get('song') or not isinstance(
 
                    musicState.get('t'), float):
 
                return defer.succeed(0.0)
 
            song = Song(URIRef(musicState['song']))
 
            dispatcher.send('state',
 
                            update={
 
                                'song': str(song),
 
                                't': musicState['t']
 
                            })
 

	
 
        with updateStats.s1_eval.time():
 
        if 1:#with updateStats.s1_eval.time():
 
            settings = []
 
            songNotes = sorted(self.notes.get(song, []), key=lambda n: n.uri)
 
            noteReports = []
 
            for note in songNotes:
 
                try:
 
                    s, report = note.outputSettings(musicState['t'])
 
                except Exception:
 
                    traceback.print_exc()
 
                    raise
 
                noteReports.append(report)
 
                settings.append(s)
 
            devSettings = DeviceSettings.fromList(self.graph, settings)
 

	
 
        dispatcher.send('state', update={'songNotes': noteReports})
 

	
 
        with updateStats.s3_send.time():  # our measurement
 
        if 1:#with updateStats.s3_send.time():  # our measurement
 
            sendSecs = yield self.sendToCollector(devSettings)
 

	
 
        # sendToCollector's own measurement.
 
        # (sometimes it's None, not sure why, and neither is mypy)
 
        #if isinstance(sendSecs, float):
 
        #    updateStats.s3_send_client = sendSecs
 

	
 

	
 
class Updates(cyclone.sse.SSEHandler):
 

	
 
    def __init__(self, application, request, **kwargs) -> None:
 
        cyclone.sse.SSEHandler.__init__(self, application, request, **kwargs)
light9/vidref/videorecorder.py
Show inline comments
 
from dataclasses import dataclass
 
from io import BytesIO
 
from typing import Optional
 
import time, logging, os, traceback
 

	
 
import gi
 
gi.require_version('Gst', '1.0')
 
gi.require_version('GstBase', '1.0')
 

	
 
from gi.repository import Gst
 
from greplin import scales
 
# from greplin import scales
 
from rdflib import URIRef
 
from rx.subject import BehaviorSubject
 
from twisted.internet import threads
 
import PIL.Image
 
import moviepy.editor
 
import numpy
 

	
 
from light9 import showconfig
 
from light9.ascoltami.musictime_client import MusicTime
 
from light9.newtypes import Song
 

	
 
log = logging.getLogger()
 

	
 
stats = scales.collection(
 
    '/recorder',
 
    scales.PmfStat('jpegEncode', recalcPeriod=1),
 
    scales.IntStat('deletes'),
 
    scales.PmfStat('waitForNextImg', recalcPeriod=1),
 
    scales.PmfStat('crop', recalcPeriod=1),
 
    scales.RecentFpsStat('encodeFrameFps'),
 
    scales.RecentFpsStat('queueGstFrameFps'),
 
)
 
# stats = scales.collection(
 
#     '/recorder',
 
#     scales.PmfStat('jpegEncode', recalcPeriod=1),
 
#     scales.IntStat('deletes'),
 
#     scales.PmfStat('waitForNextImg', recalcPeriod=1),
 
#     scales.PmfStat('crop', recalcPeriod=1),
 
#     scales.RecentFpsStat('encodeFrameFps'),
 
#     scales.RecentFpsStat('queueGstFrameFps'),
 
# )
 

	
 

	
 
@dataclass
 
class CaptureFrame:
 
    img: PIL.Image
 
    song: Song
 
    t: float
 
    isPlaying: bool
 
    imgJpeg: Optional[bytes] = None
 

	
 
    @stats.jpegEncode.time()
 
    # @stats.jpegEncode.time()
 
    def asJpeg(self):
 
        if not self.imgJpeg:
 
            output = BytesIO()
 
            self.img.save(output, 'jpeg', quality=80)
 
            self.imgJpeg = output.getvalue()
 
        return self.imgJpeg
 

	
 

	
 
def songDir(song: Song) -> bytes:
 
    return os.path.join(
 
        showconfig.root(), b'video',
 
        song.replace('http://', '').replace('/', '_').encode('ascii'))
 
@@ -64,25 +64,25 @@ def takeUri(songPath: bytes) -> URIRef:
 
        ['http://light9.bigasterisk.com/show', song[-2], song[-1], take]))
 

	
 

	
 
def deleteClip(uri: URIRef):
 
    # uri http://light9.bigasterisk.com/show/dance2019/song6/take_155
 
    # path show/dance2019/video/light9.bigasterisk.com_show_dance2019_song6/take_155.*
 
    w = uri.split('/')[-4:]
 
    path = '/'.join([
 
        w[0], w[1], 'video', f'light9.bigasterisk.com_{w[0]}_{w[1]}_{w[2]}',
 
        w[3]
 
    ])
 
    log.info(f'deleting {uri} {path}')
 
    stats.deletes += 1
 
    # stats.deletes += 1
 
    for fn in [path + '.mp4', path + '.timing']:
 
        os.remove(fn)
 

	
 

	
 
class FramesToVideoFiles:
 
    """
 

	
 
    nextWriteAction: 'ignore'
 
    currentOutputClip: None
 

	
 
    (frames come in for new video)
 
    nextWriteAction: 'saveFrame'
 
@@ -164,39 +164,39 @@ class FramesToVideoFiles:
 
                                                   bitrate='150000')
 
        except (StopIteration, RuntimeError):
 
            self.frameMap.close()
 

	
 
        log.info('write_videofile done')
 
        self.currentOutputClip = None
 

	
 
        if self.currentClipFrameCount < 400:
 
            log.info('too small- deleting')
 
            deleteClip(takeUri(self.outMp4.encode('ascii')))
 

	
 
    def _bg_make_frame(self, video_time_secs):
 
        stats.encodeFrameFps.mark()
 
        # stats.encodeFrameFps.mark()
 
        if self.nextWriteAction == 'close':
 
            raise StopIteration  # the one in write_videofile
 
        elif self.nextWriteAction == 'notWritingClip':
 
            raise NotImplementedError
 
        elif self.nextWriteAction == 'saveFrames':
 
            pass
 
        else:
 
            raise NotImplementedError(self.nextWriteAction)
 

	
 
        # should be a little queue to miss fewer frames
 
        t1 = time.time()
 
        while self.nextImg is None:
 
            time.sleep(.015)
 
        stats.waitForNextImg = time.time() - t1
 
        # stats.waitForNextImg = time.time() - t1
 
        cf, self.nextImg = self.nextImg, None
 

	
 
        self.frameMap.write(f'video {video_time_secs:g} = song {cf.t:g}\n')
 
        self.currentClipFrameCount += 1
 
        return numpy.asarray(cf.img)
 

	
 

	
 
class GstSource:
 

	
 
    def __init__(self, dev):
 
        """
 
        make new gst pipeline
 
@@ -239,35 +239,35 @@ class GstSource:
 
            (result, mapinfo) = buf.map(Gst.MapFlags.READ)
 
            try:
 
                img = PIL.Image.frombytes(
 
                    'RGB', (caps.get_structure(0).get_value('width'),
 
                            caps.get_structure(0).get_value('height')),
 
                    mapinfo.data)
 
                img = self.crop(img)
 
            finally:
 
                buf.unmap(mapinfo)
 
            # could get gst's frame time and pass it to getLatest
 
            latest = self.musicTime.getLatest()
 
            if 'song' in latest:
 
                stats.queueGstFrameFps.mark()
 
                # stats.queueGstFrameFps.mark()
 
                self.liveImages.on_next(
 
                    CaptureFrame(img=img,
 
                                 song=Song(latest['song']),
 
                                 t=latest['t'],
 
                                 isPlaying=latest['playing']))
 
        except Exception:
 
            traceback.print_exc()
 
        return Gst.FlowReturn.OK
 

	
 
    @stats.crop.time()
 
    # @stats.crop.time()
 
    def crop(self, img):
 
        return img.crop((40, 100, 790, 310))
 

	
 
    def setupPipelineError(self, pipe, cb):
 
        bus = pipe.get_bus()
 

	
 
        def onBusMessage(bus, msg):
 

	
 
            print('nusmsg', msg)
 
            if msg.type == Gst.MessageType.ERROR:
 
                _, txt = msg.parse_error()
 
                cb(txt)
0 comments (0 inline, 0 general)