Mercurial > code > home > repos > collector
changeset 12:032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
author | drewp@bigasterisk.com |
---|---|
date | Fri, 25 Nov 2022 20:58:08 -0800 |
parents | baf886e01ed1 |
children | bfd95926be6e |
files | collector.py merge.py patchsink.py |
diffstat | 3 files changed, 235 insertions(+), 223 deletions(-) [+] |
line wrap: on
line diff
--- a/collector.py Fri Nov 25 20:57:38 2022 -0800 +++ b/collector.py Fri Nov 25 20:58:08 2022 -0800 @@ -7,11 +7,10 @@ - filter out unneeded stmts from the sources - give a time resolution and concatenate any patches that come faster than that res """ -import collections import json import logging import time -from typing import (Callable, Dict, List, Optional, Sequence, Set, Tuple, Union) +from typing import Dict, List, Optional, Set, Union import cyclone.sse import cyclone.web @@ -23,11 +22,12 @@ from prometheus_client.registry import REGISTRY from rdfdb.patch import Patch from rdflib import Namespace, URIRef -from rdflib.term import Node from standardservice.logsetup import enableTwistedLog, log from twisted.internet import defer, reactor from collector_config import config +from merge import SourceUri, ActiveStatements, LocalStatements +from patchsink import PatchSink import cyclone.sse def py3_sendEvent(self, message, event=None, eid=None, retry=None): @@ -44,26 +44,17 @@ if retry: self.transport.write(b"retry: %s\n" % retry) self.transport.write(b"data: %s\n\n" % message) -cyclone.sse.SSEHandler.sendEvent = py3_sendEvent -Statement = Tuple[Node, Node, Node, Node] - - -# SourceUri = NewType('SourceUri', URIRef) # doesn't work -class SourceUri(URIRef): - pass +cyclone.sse.SSEHandler.sendEvent = py3_sendEvent ROOM = Namespace("http://projects.bigasterisk.com/room/") COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) GET_STATE_CALLS = Summary("get_state_calls", 'calls') -LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') -MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') -REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') class Metrics(cyclone.web.RequestHandler): @@ -73,214 +64,6 @@ self.write(generate_latest(REGISTRY)) -class LocalStatements(object): - """ - functions that make statements originating from sse_collector itself - """ - - def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]): - self.applyPatch = applyPatch - self._sourceState: Dict[SourceUri, Optional[URIRef]] = {} # source: state URIRef - - @LOCAL_STATEMENTS_PATCH_CALLS.time() - def setSourceState(self, source: SourceUri, state: Optional[URIRef]): - """ - add a patch to the COLLECTOR graph about the state of this - source. state=None to remove the source. - """ - oldState = self._sourceState.get(source, None) - if state == oldState: - return - log.info('source state %s -> %s', source, state) - if oldState is None: - self._sourceState[source] = state - self.applyPatch(COLLECTOR, Patch(addQuads=[ - (COLLECTOR, ROOM['source'], source, COLLECTOR), - (source, ROOM['state'], state, COLLECTOR), - ])) - elif state is None: - del self._sourceState[source] - self.applyPatch(COLLECTOR, Patch(delQuads=[ - (COLLECTOR, ROOM['source'], source, COLLECTOR), - (source, ROOM['state'], oldState, COLLECTOR), - ])) - else: - self._sourceState[source] = state - self.applyPatch(COLLECTOR, Patch(addQuads=[ - (source, ROOM['state'], state, COLLECTOR), - ], delQuads=[ - (source, ROOM['state'], oldState, COLLECTOR), - ])) - - -def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: - if isinstance(t, URIRef): - return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/', - 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:')) - return t - - -def abbrevStmt(stmt: Statement) -> str: - return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) - - -class PatchSink(cyclone.sse.SSEHandler): - _handlerSerial = 0 - - def __init__(self, application: cyclone.web.Application, request): - cyclone.sse.SSEHandler.__init__(self, application, request) - self.bound = False - self.created = time.time() - self.graphClients = self.settings.graphClients - - self._serial = PatchSink._handlerSerial - PatchSink._handlerSerial += 1 - self.lastPatchSentTime: float = 0.0 - - def __repr__(self) -> str: - return '<Handler #%s>' % self._serial - - def state(self) -> Dict: - return { - 'created': round(self.created, 2), - 'ageHours': round((time.time() - self.created) / 3600, 2), - 'streamId': self.streamId, - 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing - 'foafAgent': self.request.headers.get('X-Foaf-Agent'), - 'userAgent': self.request.headers.get('user-agent'), - } - - def bind(self, *args, **kwargs): - self.streamId = args[0] - - self.graphClients.addSseHandler(self) - # If something goes wrong with addSseHandler, I don't want to - # try removeSseHandler. - self.bound = True - - def unbind(self) -> None: - if self.bound: - self.graphClients.removeSseHandler(self) - - -StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] - - -class PostDeleter(object): - - def __init__(self, statements: StatementTable): - self.statements = statements - - def __enter__(self): - self._garbage: List[Statement] = [] - return self - - def add(self, stmt: Statement): - self._garbage.append(stmt) - - def __exit__(self, type, value, traceback): - if type is not None: - raise NotImplementedError() - for stmt in self._garbage: - del self.statements[stmt] - - -class ActiveStatements(object): - - def __init__(self): - # This table holds statements asserted by any of our sources - # plus local statements that we introduce (source is - # http://bigasterisk.com/sse_collector/). - self.table: StatementTable = collections.defaultdict(lambda: (set(), set())) - - def state(self) -> Dict: - return { - 'len': len(self.table), - } - - def postDeleteStatements(self) -> PostDeleter: - return PostDeleter(self.table) - - def pprintTable(self) -> None: - for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): - print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) - - @MAKE_SYNC_PATCH_CALLS.time() - def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): - # todo: this could run all handlers at once, which is how we - # use it anyway - adds = [] - dels = [] - - with self.postDeleteStatements() as garbage: - for stmt, (stmtSources, handlers) in self.table.items(): - belongsInHandler = not sources.isdisjoint(stmtSources) - handlerHasIt = handler in handlers - # log.debug("%s belong=%s has=%s", - # abbrevStmt(stmt), belongsInHandler, handlerHasIt) - if belongsInHandler and not handlerHasIt: - adds.append(stmt) - handlers.add(handler) - elif not belongsInHandler and handlerHasIt: - dels.append(stmt) - handlers.remove(handler) - if not handlers and not stmtSources: - garbage.add(stmt) - - return Patch(addQuads=adds, delQuads=dels) - - def applySourcePatch(self, source: SourceUri, p: Patch): - for stmt in p.addQuads: - sourceUrls, handlers = self.table[stmt] - if source in sourceUrls: - raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) - sourceUrls.add(source) - - with self.postDeleteStatements() as garbage: - for stmt in p.delQuads: - sourceUrls, handlers = self.table[stmt] - if source not in sourceUrls: - raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt))) - sourceUrls.remove(source) - # this is rare, since some handler probably still has - # the stmt we're deleting, but it can happen e.g. when - # a handler was just deleted - if not sourceUrls and not handlers: - garbage.add(stmt) - - @REPLACE_SOURCE_STATEMENTS_CALLS.time() - def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]): - log.debug('replaceSourceStatements with %s stmts', len(stmts)) - newStmts = set(stmts) - - with self.postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.table.items(): - if source in sources: - if stmt not in stmts: - sources.remove(source) - if not sources and not handlers: - garbage.add(stmt) - else: - if stmt in stmts: - sources.add(source) - newStmts.discard(stmt) - - self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) - - def discardHandler(self, handler: PatchSink): - with self.postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.table.items(): - handlers.discard(handler) - if not sources and not handlers: - garbage.add(stmt) - - def discardSource(self, source: SourceUri): - with self.postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.table.items(): - sources.discard(source) - if not sources and not handlers: - garbage.add(stmt) - class GraphClients(object): """ @@ -465,7 +248,6 @@ (r'/graph/', GraphList), (r'/graph/(.+)', PatchSink), (r'/metrics', Metrics), - ], - graphClients=graphClients), + ], graphClients=graphClients), interface='::') reactor.run()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/merge.py Fri Nov 25 20:58:08 2022 -0800 @@ -0,0 +1,187 @@ +import collections +from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, NewType +from prometheus_client import Summary + +from rdfdb.patch import Patch +from rdflib import Namespace, URIRef +from rdflib.term import Node +from standardservice.logsetup import enableTwistedLog, log +from patchsink import PatchSink +LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') +MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') +REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') + +SourceUri = NewType('SourceUri', URIRef) + +Statement = Tuple[Node, Node, Node, Node] +StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] + +ROOM = Namespace("http://projects.bigasterisk.com/room/") +COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) + + +def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: + if isinstance(t, URIRef): + return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/', + 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:')) + return t + + +def abbrevStmt(stmt: Statement) -> str: + return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) + + +class LocalStatements(object): + """ + functions that make statements originating from sse_collector itself + """ + + def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]): + self.applyPatch = applyPatch + self._sourceState: Dict[SourceUri, Optional[URIRef]] = {} # source: state URIRef + + @LOCAL_STATEMENTS_PATCH_CALLS.time() + def setSourceState(self, source: SourceUri, state: Optional[URIRef]): + """ + add a patch to the COLLECTOR graph about the state of this + source. state=None to remove the source. + """ + oldState = self._sourceState.get(source, None) + if state == oldState: + return + log.info('source state %s -> %s', source, state) + if oldState is None: + self._sourceState[source] = state + self.applyPatch(COLLECTOR, Patch(addQuads=[ + (COLLECTOR, ROOM['source'], source, COLLECTOR), + (source, ROOM['state'], state, COLLECTOR), + ])) + elif state is None: + del self._sourceState[source] + self.applyPatch(COLLECTOR, Patch(delQuads=[ + (COLLECTOR, ROOM['source'], source, COLLECTOR), + (source, ROOM['state'], oldState, COLLECTOR), + ])) + else: + self._sourceState[source] = state + self.applyPatch(COLLECTOR, Patch(addQuads=[ + (source, ROOM['state'], state, COLLECTOR), + ], delQuads=[ + (source, ROOM['state'], oldState, COLLECTOR), + ])) + + +class PostDeleter(object): + + def __init__(self, statements: StatementTable): + self.statements = statements + + def __enter__(self): + self._garbage: List[Statement] = [] + return self + + def add(self, stmt: Statement): + self._garbage.append(stmt) + + def __exit__(self, type, value, traceback): + if type is not None: + raise NotImplementedError() + for stmt in self._garbage: + del self.statements[stmt] + + +class ActiveStatements(object): + + def __init__(self): + # This table holds statements asserted by any of our sources + # plus local statements that we introduce (source is + # http://bigasterisk.com/sse_collector/). + self.table: StatementTable = collections.defaultdict(lambda: (set(), set())) + + def state(self) -> Dict: + return { + 'len': len(self.table), + } + + def postDeleteStatements(self) -> PostDeleter: + return PostDeleter(self.table) + + def pprintTable(self) -> None: + for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): + print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) + + @MAKE_SYNC_PATCH_CALLS.time() + def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): + # todo: this could run all handlers at once, which is how we + # use it anyway + adds = [] + dels = [] + + with self.postDeleteStatements() as garbage: + for stmt, (stmtSources, handlers) in self.table.items(): + belongsInHandler = not sources.isdisjoint(stmtSources) + handlerHasIt = handler in handlers + # log.debug("%s belong=%s has=%s", + # abbrevStmt(stmt), belongsInHandler, handlerHasIt) + if belongsInHandler and not handlerHasIt: + adds.append(stmt) + handlers.add(handler) + elif not belongsInHandler and handlerHasIt: + dels.append(stmt) + handlers.remove(handler) + if not handlers and not stmtSources: + garbage.add(stmt) + + return Patch(addQuads=adds, delQuads=dels) + + def applySourcePatch(self, source: SourceUri, p: Patch): + for stmt in p.addQuads: + sourceUrls, handlers = self.table[stmt] + if source in sourceUrls: + raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) + sourceUrls.add(source) + + with self.postDeleteStatements() as garbage: + for stmt in p.delQuads: + sourceUrls, handlers = self.table[stmt] + if source not in sourceUrls: + raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt))) + sourceUrls.remove(source) + # this is rare, since some handler probably still has + # the stmt we're deleting, but it can happen e.g. when + # a handler was just deleted + if not sourceUrls and not handlers: + garbage.add(stmt) + + @REPLACE_SOURCE_STATEMENTS_CALLS.time() + def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]): + log.debug('replaceSourceStatements with %s stmts', len(stmts)) + newStmts = set(stmts) + + with self.postDeleteStatements() as garbage: + for stmt, (sources, handlers) in self.table.items(): + if source in sources: + if stmt not in stmts: + sources.remove(source) + if not sources and not handlers: + garbage.add(stmt) + else: + if stmt in stmts: + sources.add(source) + newStmts.discard(stmt) + + self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) + + def discardHandler(self, handler: PatchSink): + with self.postDeleteStatements() as garbage: + for stmt, (sources, handlers) in self.table.items(): + handlers.discard(handler) + if not sources and not handlers: + garbage.add(stmt) + + def discardSource(self, source: SourceUri): + with self.postDeleteStatements() as garbage: + for stmt, (sources, handlers) in self.table.items(): + sources.discard(source) + if not sources and not handlers: + garbage.add(stmt)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/patchsink.py Fri Nov 25 20:58:08 2022 -0800 @@ -0,0 +1,43 @@ +import time +from typing import Dict + +import cyclone.sse +import cyclone.web + +class PatchSink(cyclone.sse.SSEHandler): + _handlerSerial = 0 + + def __init__(self, application: cyclone.web.Application, request): + cyclone.sse.SSEHandler.__init__(self, application, request) + self.bound = False + self.created = time.time() + self.graphClients = self.settings.graphClients + + self._serial = PatchSink._handlerSerial + PatchSink._handlerSerial += 1 + self.lastPatchSentTime: float = 0.0 + + def __repr__(self) -> str: + return '<Handler #%s>' % self._serial + + def state(self) -> Dict: + return { + 'created': round(self.created, 2), + 'ageHours': round((time.time() - self.created) / 3600, 2), + 'streamId': self.streamId, + 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing + 'foafAgent': self.request.headers.get('X-Foaf-Agent'), + 'userAgent': self.request.headers.get('user-agent'), + } + + def bind(self, *args, **kwargs): + self.streamId = args[0] + + self.graphClients.addSseHandler(self) + # If something goes wrong with addSseHandler, I don't want to + # try removeSseHandler. + self.bound = True + + def unbind(self) -> None: + if self.bound: + self.graphClients.removeSseHandler(self)