# HG changeset patch # User drewp@bigasterisk.com # Date 1648615292 25200 # Node ID e9ac7f52849ec9993ccfdb3620238a22e3d24c36 # Parent 6ee9a1c5a991128aa755ceaca0094f2442a68781 collector to its own repo diff -r 6ee9a1c5a991 -r e9ac7f52849e service/collector/.flake8 --- a/service/collector/.flake8 Sun Dec 12 22:03:28 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,3 +0,0 @@ -[flake8] -ignore=W504 -max-line-length=160 \ No newline at end of file diff -r 6ee9a1c5a991 -r e9ac7f52849e service/collector/.style.yapf --- a/service/collector/.style.yapf Sun Dec 12 22:03:28 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,4 +0,0 @@ -# overwritten by /home/drewp/bin/setup_home_venv -[style] -based_on_style = google -column_limit = 130 diff -r 6ee9a1c5a991 -r e9ac7f52849e service/collector/Dockerfile --- a/service/collector/Dockerfile Sun Dec 12 22:03:28 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,17 +0,0 @@ -FROM bang5:5000/base_basic - -WORKDIR /opt - -RUN echo 2021-08-26 && apt-get update -RUN apt-get install -y vim net-tools iputils-ping git - -COPY requirements.txt ./ -RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -Ur requirements.txt -# not sure why this doesn't work from inside requirements.txt -RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip' - -COPY *.py *.html req* *.ini ./ - -EXPOSE 9072 - -CMD [ "python3", "./collector.py" ] diff -r 6ee9a1c5a991 -r e9ac7f52849e service/collector/collector.py --- a/service/collector/collector.py Sun Dec 12 22:03:28 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,451 +0,0 @@ -""" -requesting /graph/foo returns an SSE patch stream that's the -result of fetching multiple other SSE patch streams. The result stream -may include new statements injected by this service. - -Future: -- filter out unneeded stmts from the sources -- give a time resolution and concatenate any patches that come faster than that res -""" -import collections -import json -import logging -import time -from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union) - -import cyclone.sse -import cyclone.web -from docopt import docopt -from patchablegraph import jsonFromPatch -from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource -from prometheus_client import Counter, Gauge, Histogram, Summary -from prometheus_client.exposition import generate_latest -from prometheus_client.registry import REGISTRY -from rdfdb.patch import Patch -from rdflib import Namespace, URIRef -from rdflib.term import Node, Statement -from standardservice.logsetup import enableTwistedLog, log -from twisted.internet import defer, reactor - -from collector_config import config - - -#SourceUri = NewType('SourceUri', URIRef) # doesn't work -class SourceUri(URIRef): - pass - - -ROOM = Namespace("http://projects.bigasterisk.com/room/") -COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) - -GET_STATE_CALLS = Summary("get_state_calls", 'calls') -LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') -MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') -ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') -SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') -REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') - - -class Metrics(cyclone.web.RequestHandler): - - def get(self): - self.add_header('content-type', 'text/plain') - self.write(generate_latest(REGISTRY)) - - -class LocalStatements(object): - """ - functions that make statements originating from sse_collector itself - """ - - def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): - self.applyPatch = applyPatch - self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef - - @LOCAL_STATEMENTS_PATCH_CALLS.time() - def setSourceState(self, source: SourceUri, state: URIRef): - """ - add a patch to the COLLECTOR graph about the state of this - source. state=None to remove the source. - """ - oldState = self._sourceState.get(source, None) - if state == oldState: - return - log.info('source state %s -> %s', source, state) - if oldState is None: - self._sourceState[source] = state - self.applyPatch(COLLECTOR, Patch(addQuads=[ - (COLLECTOR, ROOM['source'], source, COLLECTOR), - (source, ROOM['state'], state, COLLECTOR), - ])) - elif state is None: - del self._sourceState[source] - self.applyPatch(COLLECTOR, Patch(delQuads=[ - (COLLECTOR, ROOM['source'], source, COLLECTOR), - (source, ROOM['state'], oldState, COLLECTOR), - ])) - else: - self._sourceState[source] = state - self.applyPatch(COLLECTOR, Patch(addQuads=[ - (source, ROOM['state'], state, COLLECTOR), - ], delQuads=[ - (source, ROOM['state'], oldState, COLLECTOR), - ])) - - -def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: - if isinstance(t, URIRef): - return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/', - 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:')) - return t - - -def abbrevStmt(stmt: Statement) -> str: - return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) - - -class PatchSink(cyclone.sse.SSEHandler): - _handlerSerial = 0 - - def __init__(self, application: cyclone.web.Application, request): - cyclone.sse.SSEHandler.__init__(self, application, request) - self.bound = False - self.created = time.time() - self.graphClients = self.settings.graphClients - - self._serial = PatchSink._handlerSerial - PatchSink._handlerSerial += 1 - self.lastPatchSentTime: float = 0.0 - - def __repr__(self) -> str: - return '' % self._serial - - def state(self) -> Dict: - return { - 'created': round(self.created, 2), - 'ageHours': round((time.time() - self.created) / 3600, 2), - 'streamId': self.streamId, - 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing - 'foafAgent': self.request.headers.get('X-Foaf-Agent'), - 'userAgent': self.request.headers.get('user-agent'), - } - - def bind(self, *args, **kwargs): - self.streamId = args[0] - - self.graphClients.addSseHandler(self) - # If something goes wrong with addSseHandler, I don't want to - # try removeSseHandler. - self.bound = True - - def unbind(self) -> None: - if self.bound: - self.graphClients.removeSseHandler(self) - - -StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] - - -class PostDeleter(object): - - def __init__(self, statements: StatementTable): - self.statements = statements - - def __enter__(self): - self._garbage: List[Statement] = [] - return self - - def add(self, stmt: Statement): - self._garbage.append(stmt) - - def __exit__(self, type, value, traceback): - if type is not None: - raise NotImplementedError() - for stmt in self._garbage: - del self.statements[stmt] - - -class ActiveStatements(object): - - def __init__(self): - # This table holds statements asserted by any of our sources - # plus local statements that we introduce (source is - # http://bigasterisk.com/sse_collector/). - self.table: StatementTable = collections.defaultdict(lambda: (set(), set())) - - def state(self) -> Dict: - return { - 'len': len(self.table), - } - - def postDeleteStatements(self) -> PostDeleter: - return PostDeleter(self.table) - - def pprintTable(self) -> None: - for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): - print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) - - @MAKE_SYNC_PATCH_CALLS.time() - def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): - # todo: this could run all handlers at once, which is how we - # use it anyway - adds = [] - dels = [] - - with self.postDeleteStatements() as garbage: - for stmt, (stmtSources, handlers) in self.table.items(): - belongsInHandler = not sources.isdisjoint(stmtSources) - handlerHasIt = handler in handlers - # log.debug("%s belong=%s has=%s", - # abbrevStmt(stmt), belongsInHandler, handlerHasIt) - if belongsInHandler and not handlerHasIt: - adds.append(stmt) - handlers.add(handler) - elif not belongsInHandler and handlerHasIt: - dels.append(stmt) - handlers.remove(handler) - if not handlers and not stmtSources: - garbage.add(stmt) - - return Patch(addQuads=adds, delQuads=dels) - - def applySourcePatch(self, source: SourceUri, p: Patch): - for stmt in p.addQuads: - sourceUrls, handlers = self.table[stmt] - if source in sourceUrls: - raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) - sourceUrls.add(source) - - with self.postDeleteStatements() as garbage: - for stmt in p.delQuads: - sourceUrls, handlers = self.table[stmt] - if source not in sourceUrls: - raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt))) - sourceUrls.remove(source) - # this is rare, since some handler probably still has - # the stmt we're deleting, but it can happen e.g. when - # a handler was just deleted - if not sourceUrls and not handlers: - garbage.add(stmt) - - @REPLACE_SOURCE_STATEMENTS_CALLS.time() - def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]): - log.debug('replaceSourceStatements with %s stmts', len(stmts)) - newStmts = set(stmts) - - with self.postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.table.items(): - if source in sources: - if stmt not in stmts: - sources.remove(source) - if not sources and not handlers: - garbage.add(stmt) - else: - if stmt in stmts: - sources.add(source) - newStmts.discard(stmt) - - self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) - - def discardHandler(self, handler: PatchSink): - with self.postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.table.items(): - handlers.discard(handler) - if not sources and not handlers: - garbage.add(stmt) - - def discardSource(self, source: SourceUri): - with self.postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.table.items(): - sources.discard(source) - if not sources and not handlers: - garbage.add(stmt) - - -class GraphClients(object): - """ - All the active PatchSources and SSEHandlers - - To handle all the overlapping-statement cases, we store a set of - true statements along with the sources that are currently - asserting them and the requesters who currently know them. As - statements come and go, we make patches to send to requesters. - """ - - def __init__(self): - self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) - self.handlers: Set[PatchSink] = set() - self.statements: ActiveStatements = ActiveStatements() - - self._localStatements = LocalStatements(self._onPatch) - - def state(self) -> Dict: - return { - 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']), - 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), - 'statements': self.statements.state(), - } - - def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: - streamId = handler.streamId - matches = [s for s in config['streams'] if s['id'] == streamId] - if len(matches) != 1: - raise ValueError("%s matches for %r" % (len(matches), streamId)) - return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] - - @ON_PATCH_CALLS.time() - def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False): - if fullGraph: - # a reconnect may need to resend the full graph even - # though we've already sent some statements - self.statements.replaceSourceStatements(source, p.addQuads) - else: - self.statements.applySourcePatch(source, p) - - self._sendUpdatePatch() - - if log.isEnabledFor(logging.DEBUG): - self.statements.pprintTable() - - if source != COLLECTOR: - self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) - - @SEND_UPDATE_PATCH_CALLS.time() - def _sendUpdatePatch(self, handler: Optional[PatchSink] = None): - """ - send a patch event out this handler to bring it up to date with - self.statements - """ - now = time.time() - selected = self.handlers - if handler is not None: - if handler not in self.handlers: - log.error("called _sendUpdatePatch on a handler that's gone") - return - selected = {handler} - # reduce loops here- prepare all patches at once - for h in selected: - period = .9 - if 'Raspbian' in h.request.headers.get('user-agent', ''): - period = 5 - if h.lastPatchSentTime > now - period: - continue - p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) - log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) - if not p.isNoop(): - log.debug("send patch %s to %s", p.shortSummary(), h) - # This can be a giant line, which was a problem - # once. Might be nice for this service to try to break - # it up into multiple sends, although there's no - # guarantee at all since any single stmt could be any - # length. - h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch') - h.lastPatchSentTime = now - else: - log.debug('nothing to send to %s', h) - - def addSseHandler(self, handler: PatchSink): - log.info('addSseHandler %r %r', handler, handler.streamId) - - # fail early if id doesn't match - sources = self._sourcesForHandler(handler) - - self.handlers.add(handler) - - for source in sources: - if source not in self.clients and source != COLLECTOR: - log.debug('connect to patch source %s', source) - self._localStatements.setSourceState(source, ROOM['connect']) - self.clients[source] = ReconnectingPatchSource(source, - listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), - reconnectSecs=10) - log.debug('bring new client up to date') - - self._sendUpdatePatch(handler) - - def removeSseHandler(self, handler: PatchSink): - log.info('removeSseHandler %r', handler) - self.statements.discardHandler(handler) - for source in self._sourcesForHandler(handler): - for otherHandler in self.handlers: - if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): - # still in use - break - else: - self._stopClient(source) - - self.handlers.remove(handler) - - def _stopClient(self, url: SourceUri): - if url == COLLECTOR: - return - - self.clients[url].stop() - - self.statements.discardSource(url) - - self._localStatements.setSourceState(url, None) - if url in self.clients: - del self.clients[url] - - self.cleanup() - - def cleanup(self): - """ - despite the attempts above, we still get useless rows in the table - sometimes - """ - with self.statements.postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.statements.table.items(): - if not sources and not any(h in self.handlers for h in handlers): - garbage.add(stmt) - - -class State(cyclone.web.RequestHandler): - - @GET_STATE_CALLS.time() - def get(self) -> None: - try: - state = self.settings.graphClients.state() - self.write(json.dumps({'graphClients': state}, indent=2, default=lambda obj: '')) - except Exception: - import traceback - traceback.print_exc() - raise - - -class GraphList(cyclone.web.RequestHandler): - - def get(self) -> None: - self.write(json.dumps(config['streams'])) - - -if __name__ == '__main__': - arg = docopt(""" - Usage: sse_collector.py [options] - - -v Verbose - -i Info level only - """) - - if arg['-v'] or arg['-i']: - enableTwistedLog() - log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) - defer.setDebugging(True) - - graphClients = GraphClients() - - reactor.listenTCP(9072, - cyclone.web.Application(handlers=[ - (r"/()", cyclone.web.StaticFileHandler, { - "path": ".", - "default_filename": "index.html" - }), - (r'/state', State), - (r'/graph/', GraphList), - (r'/graph/(.+)', PatchSink), - (r'/metrics', Metrics), - ], - graphClients=graphClients), - interface='::') - reactor.run() diff -r 6ee9a1c5a991 -r e9ac7f52849e service/collector/collector_config.py --- a/service/collector/collector_config.py Sun Dec 12 22:03:28 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,77 +0,0 @@ -config = { - 'streams': [ - {'id': 'home', - 'sources': [ - # should be from :reasoning :source ?s - # 'http://bang:9059/graph/events', - # 'http://bang5:10310/graph/events', # kitchen - # 'http://bang5:10311/graph/events', # living - # 'http://bang5:10312/graph/events', # frontdoor - # 'http://bang5:10313/graph/events', # workshop - # 'http://bang5:10314/graph/events', # garage - # 'http://bang5:10315/graph/events', # bed - # 'http://bang5:10316/graph/events', # changing - # 'http://bang5:10317/graph/events', # frontbed - - # #'http://bang:9099/graph/mapTrails/events', - # 'http://slash:9095/graph/dpms/events', - # 'http://slash:9107/graph/xidle/events', - # 'http://dash:9095/graph/dpms/events', - # 'http://dash:9107/graph/xidle/events', - # 'http://frontdoor5:9095/graph/dpms/events', - # 'http://frontdoor5:9107/graph/xidle/events', - # 'http://frontdoor5:10012/graph/rfid/events', - # 'http://frontdoor5:10013/graph/tinyScreen/events', - - # 'http://bang:9075/graph/environment/events', - # 'http://bang:10011/graph/frontDoorLock/events', - 'http://mqtt-to-rdf.default.svc.cluster.local.:10018/graph/mqtt/events', - # 'http://bang:10016/graph/power/events', - # 'http://bang:10015/graph/store/events', - # 'http://bang:10006/graph/timebank/events', - # 'http://bang:9070/graph/wifi/events', - ]}, - {'id': 'frontDoor', # used for front door display - 'sources': [ - # 'http://bang:9105/graph/calendar/countdown/events', - # 'http://bang:9105/graph/calendar/upcoming/events', - # 'http://bang:9075/graph/environment/events', - 'http://mqtt-to-rdf.default.svc.cluster.local.:10018/graph/mqtt/events', - # 'http://bang:10016/graph/power/events', - # 'http://bang:10006/graph/timebank/events', - # 'http://bang:10015/graph/store/events', - - # 'http://bang:9059/graph/events', - # 'http://bang5:10310/graph/events', # kitchen - # 'http://bang5:10311/graph/events', # living - # 'http://bang5:10313/graph/events', # workshop - # 'http://bang5:10314/graph/events', # garage - # 'http://bang5:10317/graph/events', # frontbed - ]}, - {'id': 'network', - 'sources': [ - # 'http://bang:9070/graph/wifi/events', - # 'http://bang:9073/graph/dhcpLeases/events', - # 'http://bang:9009/graph/traffic/events', - ]}, - {'id': 'source_frontDoor', - 'sources': [ - # 'http://bang5:10312/graph/events', # frontdoor - # 'http://frontdoor5:9095/graph/dpms/events', - # 'http://frontdoor5:9107/graph/xidle/events', - # 'http://frontdoor5:10012/graph/rfid/events', - # 'http://frontdoor5:10013/graph/tinyScreen/events', - # 'http://bang:10011/graph/frontDoorLock/events', - 'http://mqtt-to-rdf.default.svc.cluster.local.:10018/graph/mqtt/events', - ]}, - {'id': 'env', - 'sources': [ - # 'http://bang:9075/graph/environment/events', - ]}, - {'id': 'workshop', - 'sources': [ - # 'http://bang5:10313/graph/events', # workshop - ]}, - - ] -} diff -r 6ee9a1c5a991 -r e9ac7f52849e service/collector/deploy.yaml --- a/service/collector/deploy.yaml Sun Dec 12 22:03:28 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,38 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: collector -spec: - replicas: 1 - selector: - matchLabels: - app: collector - template: - metadata: - labels: - app: collector - spec: - containers: - - name: collector - image: bang5:5000/collector_image - imagePullPolicy: "Always" - ports: - - containerPort: 9072 - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: "kubernetes.io/hostname" - operator: In - values: ["bang"] ---- -apiVersion: v1 -kind: Service -metadata: - name: collector -spec: - ports: - - {port: 9072, targetPort: 9072} - selector: - app: collector diff -r 6ee9a1c5a991 -r e9ac7f52849e service/collector/index.html --- a/service/collector/index.html Sun Dec 12 22:03:28 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,151 +0,0 @@ - - - - collector - - - -

collector

- -

See output for graph/home

- - - - - diff -r 6ee9a1c5a991 -r e9ac7f52849e service/collector/requirements.txt --- a/service/collector/requirements.txt Sun Dec 12 22:03:28 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,14 +0,0 @@ -docopt -ipdb -prometheus_client==0.11.0 -rdflib-jsonld==0.5.0 -rdflib==5.0.0 -service_identity==21.1.0 -twisted - -https://github.com/drewp/cyclone/archive/python3.zip - -cycloneerr -patchablegraph==0.9.0 -rdfdb==0.21.0 -standardservice==0.6.0 diff -r 6ee9a1c5a991 -r e9ac7f52849e service/collector/skaffold.yaml --- a/service/collector/skaffold.yaml Sun Dec 12 22:03:28 2021 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,15 +0,0 @@ -apiVersion: skaffold/v2beta5 -kind: Config -metadata: - name: collector -build: - tagPolicy: - dateTime: - format: "2006-01-02_15-04-05" - timezone: "Local" - artifacts: - - image: bang5:5000/collector_image -deploy: - kubectl: - manifests: - - deploy.yaml