Mercurial > code > home > repos > light9
view bin/attic/patchserver @ 2452:f21b61884b0f
asco: move the important buttons so they're above the fold
author | drewp@bigasterisk.com |
---|---|
date | Sun, 18 May 2025 14:37:06 -0700 |
parents | 4556eebe5d73 |
children |
line wrap: on
line source
#!bin/python from run_local import log from rdflib import URIRef from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks, Deferred import logging import optparse import os import time import treq import cyclone.web, cyclone.websocket, cyclone.httpclient from cycloneerr import PrettyErrorHandler from light9.namespaces import L9, RDF from light9 import networking, showconfig from rdfdb.syncedgraph import SyncedGraph from light9.effect.settings import DeviceSettings from rdfdb.patch import Patch from light9.metrics import metrics, metricsRoute def launch(graph): if 0: reactor.listenTCP( networking.captureDevice.port, cyclone.web.Application(handlers=[ (r'/()', cyclone.web.StaticFileHandler, { "path": "light9/web", "default_filename": "patchServer.html" }), metricsRoute(), ]), interface='::', ) log.info('serving http on %s', networking.captureDevice.port) def prn(): width = {} for dc in graph.subjects(RDF.type, L9['DeviceClass']): for attr in graph.objects(dc, L9['attr']): width[dc] = max( width.get(dc, 0), graph.value(attr, L9['dmxOffset']).toPython() + 1) user = {} # chan: [dev] for dev in set(graph.subjects(L9['dmxBase'], None)): dc = graph.value(dev, RDF.type) base = graph.value(dev, L9['dmxBase']).toPython() for offset in range(0, width[dc]): chan = base + offset user.setdefault(chan, []).append(dev) for chan in range(1, max(user) + 1): dev = user.get(chan, None) print(f'chan {chan} used by {dev}') graph.addHandler(prn) def main(): parser = optparse.OptionParser() parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG") (options, args) = parser.parse_args() log.setLevel(logging.DEBUG if options.verbose else logging.INFO) graph = SyncedGraph(networking.rdfdb.url, "captureDevice") graph.initiallySynced.addCallback(lambda _: launch(graph)).addErrback( log.error) reactor.run() if __name__ == '__main__': main()