#!bin/python from __future__ import division from run_local import log from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks, returnValue import cyclone.web, cyclone.websocket, cyclone.httpclient import sys, optparse, logging, subprocess, json, time, traceback from rdflib import URIRef, Literal sys.path.append(".") from light9 import networking, showconfig, Submaster, dmxclient from light9.rdfdb.syncedgraph import SyncedGraph from light9.namespaces import L9, RDF, RDFS from light9.rdfdb.patch import Patch from light9.effecteval.effect import EffectNode from light9.greplin_cyclone import StatsForCyclone from greplin import scales sys.path.append("/my/proj/homeauto/lib") sys.path.append("/home/drewp/projects/homeauto/lib") from cycloneerr import PrettyErrorHandler class EffectEdit(cyclone.web.RequestHandler): def get(self): self.write(open("light9/effecteval/effect.html").read()) class SongEffects(PrettyErrorHandler, cyclone.web.RequestHandler): def post(self): song = URIRef(self.get_argument('uri')) drop = URIRef(self.get_argument('drop')) ctx = song now = time.time() effect = song + "/effect/e-%f" % now curve = song + "/curve/c-%f" % now with self.settings.graph.currentState( tripleFilter=(drop, RDFS.label, None)) as g: dropSubLabel = g.label(drop) self.settings.graph.patch(Patch(addQuads=[ (song, L9['curve'], curve, ctx), (song, L9['effect'], effect, ctx), (effect, RDF.type, L9['Effect'], ctx), (effect, L9['code'], Literal('out = sub(%s, intensity=%s)' % (drop.n3(), curve.n3())), ctx), (curve, RDF.type, L9['Curve'], ctx), (curve, RDFS.label, Literal('sub %s' % dropSubLabel), ctx), (curve, L9['points'], Literal('0 0'), ctx), ])) class SongEffectsUpdates(cyclone.websocket.WebSocketHandler): def connectionMade(self, *args, **kwargs): self.graph = self.settings.graph self.graph.addHandler(self.updateClient) def updateClient(self): # todo: abort if client is gone playlist = self.graph.value(showconfig.showUri(), L9['playList']) songs = list(self.graph.items(playlist)) out = [] for s in songs: out.append({'uri': s, 'label': self.graph.label(s)}) out[-1]['effects'] = sorted(self.graph.objects(s, L9['effect'])) self.sendMessage({'songs': out}) class EffectUpdates(cyclone.websocket.WebSocketHandler): """ stays alive for the life of the effect page """ def connectionMade(self, *args, **kwargs): log.info("websocket opened") self.uri = URIRef(self.get_argument('uri')) self.sendMessage({'hello': repr(self)}) self.graph = self.settings.graph self.graph.addHandler(self.updateClient) def updateClient(self): # todo: if client has dropped, abort and don't get any more # graph updates self.sendMessage({'code': self.graph.value(self.uri, L9['code'])}) def connectionLost(self, reason): log.info("websocket closed") def messageReceived(self, message): log.info("got message %s" % message) # write a patch back to the graph class EffectEval(PrettyErrorHandler, cyclone.web.RequestHandler): @inlineCallbacks def get(self): # return dmx list for that effect uri = URIRef(self.get_argument('uri')) response = yield cyclone.httpclient.fetch( networking.musicPlayer.path('time')) songTime = json.loads(response.body)['t'] node = EffectNode(self.settings.graph, uri) outSub = node.eval(songTime) self.write(json.dumps(outSub.get_dmx_list())) # Completely not sure where the effect background loop should # go. Another process could own it, and get this request repeatedly: class SongEffectsEval(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): song = URIRef(self.get_argument('song')) effects = effectsForSong(self.settings.graph, song) raise NotImplementedError self.write(maxDict(effectDmxDict(e) for e in effects)) # return dmx dict for all effects in the song, already combined class EffectLoop(object): """maintains a collection of the current EffectNodes, gets time from music player, sends dmx""" def __init__(self, graph, stats): self.graph, self.stats = graph, stats self.currentSong = None self.currentEffects = [] self.graph.addHandler(self.setEffects) self.period = 1 / 30 self.coastSecs = .3 # main reason to keep this low is to notice play/pause self.songTimeFromRequest = 0 self.requestTime = 0 # unix sec for when we fetched songTime reactor.callLater(self.period, self.sendLevels) def setEffects(self): self.currentEffects = [] if self.currentSong is None: return for effectUri in self.graph.objects(self.currentSong, L9['effect']): self.currentEffects.append(EffectNode(self.graph, effectUri)) @inlineCallbacks def getSongTime(self): now = time.time() if now - self.requestTime < self.coastSecs: estimated = self.songTimeFromRequest if self.currentSong is not None and self.currentPlaying: estimated += now - self.requestTime returnValue((estimated, self.currentSong)) else: response = json.loads((yield cyclone.httpclient.fetch( networking.musicPlayer.path('time'))).body) self.requestTime = now self.currentPlaying = response['playing'] self.songTimeFromRequest = response['t'] returnValue( (response['t'], (response['song'] and URIRef(response['song'])))) @inlineCallbacks def sendLevels(self): t1 = time.time() try: with self.stats.sendLevels.time(): with self.stats.getMusic.time(): songTime, song = yield self.getSongTime() if song != self.currentSong: self.currentSong = song # this may be piling on the handlers self.graph.addHandler(self.setEffects) if song is None: return outSubs = [] for e in self.currentEffects: outSubs.append(e.eval(songTime)) out = Submaster.sub_maxes(*outSubs) dmx = out.get_dmx_list() if log.isEnabledFor(logging.DEBUG): log.debug("send dmx: %r", out.get_levels()) with self.stats.writeDmx.time(): yield dmxclient.outputlevels(dmx, twisted=True) elapsed = time.time() - t1 dt = max(0, self.period - elapsed) except Exception: self.stats.errors.mark() traceback.print_exc() dt = 1 reactor.callLater(dt, self.sendLevels) class App(object): def __init__(self, show): self.show = show self.graph = SyncedGraph("effectEval") self.graph.initiallySynced.addCallback(self.launch) self.stats = scales.collection('/', scales.PmfStat('sendLevels'), scales.PmfStat('getMusic'), scales.PmfStat('writeDmx'), scales.IntStat('errors'), ) def launch(self, *args): self.loop = EffectLoop(self.graph, self.stats) SFH = cyclone.web.StaticFileHandler self.cycloneApp = cyclone.web.Application(handlers=[ (r'/()', SFH, {'path': 'light9/effecteval', 'default_filename': 'index.html'}), (r'/effect', EffectEdit), (r'/(websocket\.js)', SFH, {'path': 'light9/rdfdb/web/'}), (r'/effect\.js', StaticCoffee, {'src': 'light9/effecteval/effect.coffee'}), (r'/index\.js', StaticCoffee, {'src': 'light9/effecteval/index.coffee'}), (r'/effectUpdates', EffectUpdates), (r'/songEffectsUpdates', SongEffectsUpdates), (r'/static/(.*)', SFH, {'path': 'static/'}), (r'/effect/eval', EffectEval), (r'/songEffects', SongEffects), (r'/songEffects/eval', SongEffectsEval), (r'/stats', StatsForCyclone), ], debug=True, graph=self.graph, stats=self.stats) reactor.listenTCP(networking.effectEval.port, self.cycloneApp) log.info("listening on %s" % networking.effectEval.port) class StaticCoffee(PrettyErrorHandler, cyclone.web.RequestHandler): def initialize(self, src): super(StaticCoffee, self).initialize() self.src = src def get(self): self.set_header('Content-Type', 'application/javascript') self.write(subprocess.check_output([ '/usr/bin/coffee', '--compile', '--print', self.src])) if __name__ == "__main__": parser = optparse.OptionParser() parser.add_option('--show', help='show URI, like http://light9.bigasterisk.com/show/dance2008', default=showconfig.showUri()) parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG") parser.add_option("--twistedlog", action="store_true", help="twisted logging") (options, args) = parser.parse_args() log.setLevel(logging.DEBUG if options.verbose else logging.INFO) if not options.show: raise ValueError("missing --show http://...") app = App(URIRef(options.show)) if options.twistedlog: from twisted.python import log as twlog twlog.startLogging(sys.stderr) reactor.run()