view collector.py @ 12:032e59be8fe9

refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
author drewp@bigasterisk.com
date Fri, 25 Nov 2022 20:58:08 -0800
parents 36471461685f
children bfd95926be6e
line wrap: on
line source

"""
requesting /graph/foo returns an SSE patch stream that's the
result of fetching multiple other SSE patch streams. The result stream
may include new statements injected by this service.

Future:
- filter out unneeded stmts from the sources
- give a time resolution and concatenate any patches that come faster than that res
"""
import json
import logging
import time
from typing import Dict, List, Optional, Set, Union

import cyclone.sse
import cyclone.web
from docopt import docopt
from patchablegraph.patchablegraph import jsonFromPatch
from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource
from prometheus_client import Summary
from prometheus_client.exposition import generate_latest
from prometheus_client.registry import REGISTRY
from rdfdb.patch import Patch
from rdflib import Namespace, URIRef
from standardservice.logsetup import enableTwistedLog, log
from twisted.internet import defer, reactor

from collector_config import config
from merge import SourceUri, ActiveStatements, LocalStatements
from patchsink import PatchSink

import cyclone.sse
def py3_sendEvent(self, message, event=None, eid=None, retry=None):

    if isinstance(message, dict):
        message = cyclone.sse.escape.json_encode(message)
    if isinstance(message, str):
        message = message.encode("utf-8")
    assert isinstance(message, bytes)
    if eid:
        self.transport.write(b"id: %s\n" % eid)
    if event:
        self.transport.write(b"event: %s\n" % event)
    if retry:
        self.transport.write(b"retry: %s\n" % retry)
    self.transport.write(b"data: %s\n\n" % message)


cyclone.sse.SSEHandler.sendEvent = py3_sendEvent


ROOM = Namespace("http://projects.bigasterisk.com/room/")
COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))

GET_STATE_CALLS = Summary("get_state_calls", 'calls')
ON_PATCH_CALLS = Summary("on_patch_calls", 'calls')
SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls')


class Metrics(cyclone.web.RequestHandler):

    def get(self):
        self.add_header('content-type', 'text/plain')
        self.write(generate_latest(REGISTRY))



class GraphClients(object):
    """
    All the active PatchSources and SSEHandlers

    To handle all the overlapping-statement cases, we store a set of
    true statements along with the sources that are currently
    asserting them and the requesters who currently know them. As
    statements come and go, we make patches to send to requesters.
    """

    def __init__(self):
        self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {}  # (COLLECTOR is not listed)
        self.handlers: Set[PatchSink] = set()
        self.statements: ActiveStatements = ActiveStatements()

        self._localStatements = LocalStatements(self._onPatch)

    def state(self) -> Dict:
        return {
            'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']),
            'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])),
            'statements': self.statements.state(),
        }

    def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]:
        streamId = handler.streamId
        matches = [s for s in config['streams'] if s['id'] == streamId]
        if len(matches) != 1:
            raise ValueError("%s matches for %r" % (len(matches), streamId))
        return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR]

    @ON_PATCH_CALLS.time()
    def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False):
        if fullGraph:
            # a reconnect may need to resend the full graph even
            # though we've already sent some statements
            self.statements.replaceSourceStatements(source, p.addQuads)
        else:
            self.statements.applySourcePatch(source, p)

        self._sendUpdatePatch()

        if log.isEnabledFor(logging.DEBUG):
            self.statements.pprintTable()

        if source != COLLECTOR:
            self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived'])

    @SEND_UPDATE_PATCH_CALLS.time()
    def _sendUpdatePatch(self, handler: Optional[PatchSink] = None):
        """
        send a patch event out this handler to bring it up to date with
        self.statements
        """
        now = time.time()
        selected = self.handlers
        if handler is not None:
            if handler not in self.handlers:
                log.error("called _sendUpdatePatch on a handler that's gone")
                return
            selected = {handler}
        # reduce loops here- prepare all patches at once
        for h in selected:
            period = .9
            if 'Raspbian' in h.request.headers.get('user-agent', ''):
                period = 5
            if h.lastPatchSentTime > now - period:
                continue
            p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h)))
            log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr)
            if not p.isNoop():
                log.debug("send patch %s to %s", p.shortSummary(), h)
                # This can be a giant line, which was a problem
                # once. Might be nice for this service to try to break
                # it up into multiple sends, although there's no
                # guarantee at all since any single stmt could be any
                # length.
                h.sendEvent(message=jsonFromPatch(p), event=b'patch')
                h.lastPatchSentTime = now
            else:
                log.debug('nothing to send to %s', h)

    def addSseHandler(self, handler: PatchSink):
        log.info('addSseHandler %r %r', handler, handler.streamId)

        # fail early if id doesn't match
        sources = self._sourcesForHandler(handler)

        self.handlers.add(handler)

        for source in sources:
            if source not in self.clients and source != COLLECTOR:
                log.debug('connect to patch source %s', source)
                self._localStatements.setSourceState(source, ROOM['connect'])
                self.clients[source] = ReconnectingPatchSource(source,
                                                               listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph),
                                                               reconnectSecs=10)
        log.debug('bring new client up to date')

        self._sendUpdatePatch(handler)

    def removeSseHandler(self, handler: PatchSink):
        log.info('removeSseHandler %r', handler)
        self.statements.discardHandler(handler)
        for source in self._sourcesForHandler(handler):
            for otherHandler in self.handlers:
                if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)):
                    # still in use
                    break
            else:
                self._stopClient(source)

        self.handlers.remove(handler)

    def _stopClient(self, url: SourceUri):
        if url == COLLECTOR:
            return

        self.clients[url].stop()

        self.statements.discardSource(url)

        self._localStatements.setSourceState(url, None)
        if url in self.clients:
            del self.clients[url]

        self.cleanup()

    def cleanup(self):
        """
        despite the attempts above, we still get useless rows in the table
        sometimes
        """
        with self.statements.postDeleteStatements() as garbage:
            for stmt, (sources, handlers) in self.statements.table.items():
                if not sources and not any(h in self.handlers for h in handlers):
                    garbage.add(stmt)


class State(cyclone.web.RequestHandler):

    @GET_STATE_CALLS.time()
    def get(self) -> None:
        try:
            state = self.settings.graphClients.state()
            msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>')
            log.info(msg)
            self.write(msg)
        except Exception:
            import traceback
            traceback.print_exc()
            raise


class GraphList(cyclone.web.RequestHandler):

    def get(self) -> None:
        self.write(json.dumps(config['streams']))


if __name__ == '__main__':
    arg = docopt("""
    Usage: sse_collector.py [options]

    -v   Verbose
    -i  Info level only
    """)

    if True:
        enableTwistedLog()
        log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO)
        defer.setDebugging(True)

    graphClients = GraphClients()

    reactor.listenTCP(
        9072,
        cyclone.web.Application(  #
            handlers=[
                (r'/state', State),
                (r'/graph/', GraphList),
                (r'/graph/(.+)', PatchSink),
                (r'/metrics', Metrics),
            ], graphClients=graphClients),
        interface='::')
    reactor.run()