view collector.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents 032e59be8fe9
children
line wrap: on
line source

"""
requesting /graph/foo returns an SSE patch stream that's the
result of fetching multiple other SSE patch streams. The result stream
may include new statements injected by this service.

Future:
- filter out unneeded stmts from the sources
- give a time resolution and concatenate any patches that come faster than that res
"""
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Set, Union

from patchablegraph.patchablegraph import jsonFromPatch
from prometheus_client import Summary
from rdfdb.patch import Patch
from rdflib import Namespace, URIRef

from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

from collector_config import config
from merge import SourceUri, ActiveStatements, LocalStatements
from patchsink import PatchSink, PatchSinkResponse
from patchsource import PatchSource
logging.basicConfig(level=logging.DEBUG)
log=logging.getLogger()
ROOM = Namespace("http://projects.bigasterisk.com/room/")
COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))

GET_STATE_CALLS = Summary("get_state_calls", 'calls')
ON_PATCH_CALLS = Summary("on_patch_calls", 'calls')
SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls')


class GraphClients(object):
    """
    All the active PatchSources and SSEHandlers

    To handle all the overlapping-statement cases, we store a set of
    true statements along with the sources that are currently
    asserting them and the requesters who currently know them. As
    statements come and go, we make patches to send to requesters.
    """

    def __init__(self):
        self.clients: Dict[SourceUri, PatchSource] = {}  # (COLLECTOR is not listed)
        self.handlers: Set[PatchSinkResponse] = set()
        self.statements: ActiveStatements = ActiveStatements()

        self._localStatements = LocalStatements(self._onPatch)

    def state(self) -> Dict:
        return {
            'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['url']),
            'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])),
            'statements': self.statements.state(),
        }

    def _sourcesForHandler(self, handler: PatchSinkResponse) -> List[SourceUri]:
        streamId = handler.streamId
        matches = [s for s in config['streams'] if s['id'] == streamId]
        if len(matches) != 1:
            raise ValueError("%s matches for %r" % (len(matches), streamId))
        return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR]

    @ON_PATCH_CALLS.time()
    def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False):
        if fullGraph:
            # a reconnect may need to resend the full graph even
            # though we've already sent some statements
            self.statements.replaceSourceStatements(source, p.addQuads)
        else:
            self.statements.applySourcePatch(source, p)

        self._sendUpdatePatch()

        if 0 and log.isEnabledFor(logging.DEBUG):
            self.statements.pprintTable()

        if source != COLLECTOR:
            self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived'])

    @SEND_UPDATE_PATCH_CALLS.time()
    def _sendUpdatePatch(self, handler: Optional[PatchSinkResponse] = None):
        """
        send a patch event out this handler to bring it up to date with
        self.statements
        """
        now = time.time()
        selected = self.handlers
        if handler is not None:
            if handler not in self.handlers:
                log.error("called _sendUpdatePatch on a handler that's gone")
                return
            selected = {handler}
        # reduce loops here- prepare all patches at once
        for h in selected:
            period = .9
            if 'Raspbian' in h.user_agent:
                period = 5
            if h.lastPatchSentTime > now - period:
                continue
            p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h)))
            log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr)
            if not p.isNoop():
                log.debug("send patch %s to %s", p.shortSummary(), h)
                # This can be a giant line, which was a problem
                # once. Might be nice for this service to try to break
                # it up into multiple sends, although there's no
                # guarantee at all since any single stmt could be any
                # length.
                h.sendEvent(message=jsonFromPatch(p), event='patch')
                h.lastPatchSentTime = now
            else:
                log.debug('nothing to send to %s', h)

    def addSseHandler(self, handler: PatchSinkResponse):
        log.info('addSseHandler %r %r', handler, handler.streamId)

        # fail early if id doesn't match
        sources = self._sourcesForHandler(handler)

        self.handlers.add(handler)

        for source in sources:
            if source not in self.clients and source != COLLECTOR:
                log.debug('connect to patch source %s', source)
                self._localStatements.setSourceState(source, ROOM['connect'])
                self.clients[source] = PatchSource(source,
                                                   listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph),
                                                   reconnectSecs=10)
        log.debug('bring new client up to date')

        self._sendUpdatePatch(handler)

    def removeSseHandler(self, handler: PatchSinkResponse):
        log.info('removeSseHandler %r', handler)
        self.statements.discardHandler(handler)
        for source in self._sourcesForHandler(handler):
            for otherHandler in self.handlers:
                if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)):
                    # still in use
                    break
            else:
                self._stopClient(source)

        self.handlers.remove(handler)

    def _stopClient(self, url: SourceUri):
        if url == COLLECTOR:
            return

        self.clients[url].stop()

        self.statements.discardSource(url)

        self._localStatements.setSourceState(url, None)
        if url in self.clients:
            del self.clients[url]

        self.cleanup()

    def cleanup(self):
        """
        despite the attempts above, we still get useless rows in the table
        sometimes
        """
        with self.statements.postDeleteStatements() as garbage:
            for stmt, (sources, handlers) in self.statements.table.items():
                if not sources and not any(h in self.handlers for h in handlers):
                    garbage.add(stmt)


@GET_STATE_CALLS.time()
def State(request: Request) -> JSONResponse:
    state = request.app.state.graphClients.state()
    msg = json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>')
    log.info(msg)
    return JSONResponse({'graphClients': state})


def GraphList(request: Request) -> JSONResponse:
    return JSONResponse(config['streams'])

def main():
    graphClients = GraphClients()

    app = Starlette(
        debug=True,
        routes=[
            Route('/state', State),
            Route('/graph/', GraphList),
            Route('/graph/{stream_id:str}', PatchSink),
        ])
    app.state.graphClients = graphClients

    app.add_middleware(PrometheusMiddleware, app_name='collector')
    app.add_route("/metrics", handle_metrics)
    return app


app = main()