Files @ e8401b82e6bc
Branch filter:

Location: light9/light9/collector/service.py

drewp@bigasterisk.com
attempt to deal with ClientDisconnect, which was spamming logs
#!bin/python
"""
Collector receives device attrs from multiple senders, combines
them, and sends output attrs to hardware. The combining part has
custom code for some attributes.

Input can be over http or zmq.
"""
import functools
import logging
import traceback
from typing import List

from light9 import networking
from light9.collector.collector import Collector
from light9.collector.output import ArtnetDmx, DummyOutput, Output, Udmx  # noqa
from light9.collector.weblisteners import UiListener, WebListeners
from light9.namespaces import L9
from light9.run_local import log
from light9.zmqtransport import parseJsonMessage
from prometheus_client import Summary
from rdfdb.syncedgraph.syncedgraph import SyncedGraph
from starlette.applications import Starlette
from starlette.endpoints import WebSocketEndpoint
from starlette.requests import ClientDisconnect
from starlette.responses import Response
from starlette.routing import Route, WebSocketRoute
from starlette.types import Receive, Scope, Send
from starlette.websockets import WebSocket
from starlette_exporter import PrometheusMiddleware, handle_metrics

STAT_SETATTR = Summary('set_attr', 'setAttr calls')


class Updates(WebSocketEndpoint, UiListener):

    def __init__(self, listeners, scope: Scope, receive: Receive, send: Send) -> None:
        super().__init__(scope, receive, send)
        self.listeners = listeners

    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()
        log.info('socket connect %s', self.scope['client'])
        self.websocket = websocket
        self.listeners.addClient(self)

    async def sendMessage(self, msgText):
        await self.websocket.send_text(msgText)

    # async def on_receive(self, websocket, data):
    #     json.loads(data)

    async def on_disconnect(self, websocket: WebSocket, close_code: int):
        self.listeners.delClient(self)

    pass


async def PutAttrs(collector: Collector, request):
    with STAT_SETATTR.time():
        try:
            body = await request.body()
        except ClientDisconnect:
            log.warning("PUT /attrs request disconnected- ignoring")
            return Response('', status_code=400)
        client, clientSession, settings, sendTime = parseJsonMessage(collector.graph, body)
        collector.setAttrs(client, clientSession, settings, sendTime)
        return Response('', status_code=202)


def main():
    logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
    logging.getLogger('syncedgraph').setLevel(logging.INFO)
    logging.getLogger('output.allDmx').setLevel(logging.WARNING)

    graph = SyncedGraph(networking.rdfdb.url, "collector")

    try:
        # todo: drive outputs with config files
        rate = 30
        outputs: List[Output] = [
            # ArtnetDmx(L9['output/dmxA/'],
            #           host='127.0.0.1',
            #           port=6445,
            #           rate=rate),
            #sudo chmod a+rw /dev/bus/usb/003/021
            Udmx(L9['output/dmxA/'], bus=3, address=21, lastDmxChannel=100),
        ]
    except Exception:
        log.error("setting up outputs:")
        traceback.print_exc()
        raise
    listeners = WebListeners()
    c = Collector(graph, outputs, listeners)

    app = Starlette(
        debug=True,
        routes=[
            # Route('/recentRequests', lambda req: get_recentRequests(req, db)),
            WebSocketRoute('/updates', endpoint=functools.partial(Updates, listeners)),
            Route('/attrs', functools.partial(PutAttrs, c), methods=['PUT']),
        ],
    )

    app.add_middleware(PrometheusMiddleware)
    app.add_route("/metrics", handle_metrics)

    # loadtest = os.environ.get('LOADTEST', False)  # call myself with some synthetic load then exit
    # if loadtest:
    #     # in a subprocess since we don't want this client to be
    #     # cooperating with the main event loop and only sending
    #     # requests when there's free time
    #     def afterWarmup():
    #         log.info('running collector_loadtest')
    #         d = utils.getProcessValue('bin/python', ['bin/collector_loadtest.py'])

    #         def done(*a):
    #             log.info('loadtest done')
    #             reactor.stop()

    #         d.addCallback(done)

    #     reactor.callLater(2, afterWarmup)

    return app


app = main()