Mercurial > code > home > repos > collector
changeset 0:e2d855c00e57
initial move from homeauto/ repo
author | drewp@bigasterisk.com |
---|---|
date | Tue, 29 Mar 2022 21:44:04 -0700 |
parents | |
children | 1275220a644b |
files | .flake8 .style.yapf .vscode/settings.json Dockerfile collector.py collector_config.py deploy.yaml index.html requirements.txt skaffold.yaml |
diffstat | 10 files changed, 803 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.flake8 Tue Mar 29 21:44:04 2022 -0700 @@ -0,0 +1,3 @@ +[flake8] +ignore=W504 +max-line-length=160 \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.style.yapf Tue Mar 29 21:44:04 2022 -0700 @@ -0,0 +1,4 @@ +# overwritten by /home/drewp/bin/setup_home_venv +[style] +based_on_style = google +column_limit = 130
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.vscode/settings.json Tue Mar 29 21:44:04 2022 -0700 @@ -0,0 +1,10 @@ +{ + "python.linting.pylintEnabled": false, + "python.linting.flake8Enabled": true, + "python.linting.enabled": true, + "python.pythonPath": "/home/drewp/.venvs/collector/bin/python", + "python.formatting.provider": "yapf", + "files.watcherExclude": { + "_darcs_old/**": true + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Dockerfile Tue Mar 29 21:44:04 2022 -0700 @@ -0,0 +1,17 @@ +FROM bang5:5000/base_basic + +WORKDIR /opt + +RUN echo 2021-08-26 && apt-get update +RUN apt-get install -y vim net-tools iputils-ping git + +COPY requirements.txt ./ +RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -Ur requirements.txt +# not sure why this doesn't work from inside requirements.txt +RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip' + +COPY *.py *.html req* *.ini ./ + +EXPOSE 9072 + +CMD [ "python3", "./collector.py" ]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/collector.py Tue Mar 29 21:44:04 2022 -0700 @@ -0,0 +1,451 @@ +""" +requesting /graph/foo returns an SSE patch stream that's the +result of fetching multiple other SSE patch streams. The result stream +may include new statements injected by this service. + +Future: +- filter out unneeded stmts from the sources +- give a time resolution and concatenate any patches that come faster than that res +""" +import collections +import json +import logging +import time +from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union) + +import cyclone.sse +import cyclone.web +from docopt import docopt +from patchablegraph import jsonFromPatch +from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource +from prometheus_client import Counter, Gauge, Histogram, Summary +from prometheus_client.exposition import generate_latest +from prometheus_client.registry import REGISTRY +from rdfdb.patch import Patch +from rdflib import Namespace, URIRef +from rdflib.term import Node, Statement +from standardservice.logsetup import enableTwistedLog, log +from twisted.internet import defer, reactor + +from collector_config import config + + +#SourceUri = NewType('SourceUri', URIRef) # doesn't work +class SourceUri(URIRef): + pass + + +ROOM = Namespace("http://projects.bigasterisk.com/room/") +COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) + +GET_STATE_CALLS = Summary("get_state_calls", 'calls') +LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') +MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') +ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') +SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') +REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') + + +class Metrics(cyclone.web.RequestHandler): + + def get(self): + self.add_header('content-type', 'text/plain') + self.write(generate_latest(REGISTRY)) + + +class LocalStatements(object): + """ + functions that make statements originating from sse_collector itself + """ + + def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): + self.applyPatch = applyPatch + self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef + + @LOCAL_STATEMENTS_PATCH_CALLS.time() + def setSourceState(self, source: SourceUri, state: URIRef): + """ + add a patch to the COLLECTOR graph about the state of this + source. state=None to remove the source. + """ + oldState = self._sourceState.get(source, None) + if state == oldState: + return + log.info('source state %s -> %s', source, state) + if oldState is None: + self._sourceState[source] = state + self.applyPatch(COLLECTOR, Patch(addQuads=[ + (COLLECTOR, ROOM['source'], source, COLLECTOR), + (source, ROOM['state'], state, COLLECTOR), + ])) + elif state is None: + del self._sourceState[source] + self.applyPatch(COLLECTOR, Patch(delQuads=[ + (COLLECTOR, ROOM['source'], source, COLLECTOR), + (source, ROOM['state'], oldState, COLLECTOR), + ])) + else: + self._sourceState[source] = state + self.applyPatch(COLLECTOR, Patch(addQuads=[ + (source, ROOM['state'], state, COLLECTOR), + ], delQuads=[ + (source, ROOM['state'], oldState, COLLECTOR), + ])) + + +def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: + if isinstance(t, URIRef): + return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/', + 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:')) + return t + + +def abbrevStmt(stmt: Statement) -> str: + return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) + + +class PatchSink(cyclone.sse.SSEHandler): + _handlerSerial = 0 + + def __init__(self, application: cyclone.web.Application, request): + cyclone.sse.SSEHandler.__init__(self, application, request) + self.bound = False + self.created = time.time() + self.graphClients = self.settings.graphClients + + self._serial = PatchSink._handlerSerial + PatchSink._handlerSerial += 1 + self.lastPatchSentTime: float = 0.0 + + def __repr__(self) -> str: + return '<Handler #%s>' % self._serial + + def state(self) -> Dict: + return { + 'created': round(self.created, 2), + 'ageHours': round((time.time() - self.created) / 3600, 2), + 'streamId': self.streamId, + 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing + 'foafAgent': self.request.headers.get('X-Foaf-Agent'), + 'userAgent': self.request.headers.get('user-agent'), + } + + def bind(self, *args, **kwargs): + self.streamId = args[0] + + self.graphClients.addSseHandler(self) + # If something goes wrong with addSseHandler, I don't want to + # try removeSseHandler. + self.bound = True + + def unbind(self) -> None: + if self.bound: + self.graphClients.removeSseHandler(self) + + +StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] + + +class PostDeleter(object): + + def __init__(self, statements: StatementTable): + self.statements = statements + + def __enter__(self): + self._garbage: List[Statement] = [] + return self + + def add(self, stmt: Statement): + self._garbage.append(stmt) + + def __exit__(self, type, value, traceback): + if type is not None: + raise NotImplementedError() + for stmt in self._garbage: + del self.statements[stmt] + + +class ActiveStatements(object): + + def __init__(self): + # This table holds statements asserted by any of our sources + # plus local statements that we introduce (source is + # http://bigasterisk.com/sse_collector/). + self.table: StatementTable = collections.defaultdict(lambda: (set(), set())) + + def state(self) -> Dict: + return { + 'len': len(self.table), + } + + def postDeleteStatements(self) -> PostDeleter: + return PostDeleter(self.table) + + def pprintTable(self) -> None: + for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): + print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) + + @MAKE_SYNC_PATCH_CALLS.time() + def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): + # todo: this could run all handlers at once, which is how we + # use it anyway + adds = [] + dels = [] + + with self.postDeleteStatements() as garbage: + for stmt, (stmtSources, handlers) in self.table.items(): + belongsInHandler = not sources.isdisjoint(stmtSources) + handlerHasIt = handler in handlers + # log.debug("%s belong=%s has=%s", + # abbrevStmt(stmt), belongsInHandler, handlerHasIt) + if belongsInHandler and not handlerHasIt: + adds.append(stmt) + handlers.add(handler) + elif not belongsInHandler and handlerHasIt: + dels.append(stmt) + handlers.remove(handler) + if not handlers and not stmtSources: + garbage.add(stmt) + + return Patch(addQuads=adds, delQuads=dels) + + def applySourcePatch(self, source: SourceUri, p: Patch): + for stmt in p.addQuads: + sourceUrls, handlers = self.table[stmt] + if source in sourceUrls: + raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) + sourceUrls.add(source) + + with self.postDeleteStatements() as garbage: + for stmt in p.delQuads: + sourceUrls, handlers = self.table[stmt] + if source not in sourceUrls: + raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt))) + sourceUrls.remove(source) + # this is rare, since some handler probably still has + # the stmt we're deleting, but it can happen e.g. when + # a handler was just deleted + if not sourceUrls and not handlers: + garbage.add(stmt) + + @REPLACE_SOURCE_STATEMENTS_CALLS.time() + def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]): + log.debug('replaceSourceStatements with %s stmts', len(stmts)) + newStmts = set(stmts) + + with self.postDeleteStatements() as garbage: + for stmt, (sources, handlers) in self.table.items(): + if source in sources: + if stmt not in stmts: + sources.remove(source) + if not sources and not handlers: + garbage.add(stmt) + else: + if stmt in stmts: + sources.add(source) + newStmts.discard(stmt) + + self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) + + def discardHandler(self, handler: PatchSink): + with self.postDeleteStatements() as garbage: + for stmt, (sources, handlers) in self.table.items(): + handlers.discard(handler) + if not sources and not handlers: + garbage.add(stmt) + + def discardSource(self, source: SourceUri): + with self.postDeleteStatements() as garbage: + for stmt, (sources, handlers) in self.table.items(): + sources.discard(source) + if not sources and not handlers: + garbage.add(stmt) + + +class GraphClients(object): + """ + All the active PatchSources and SSEHandlers + + To handle all the overlapping-statement cases, we store a set of + true statements along with the sources that are currently + asserting them and the requesters who currently know them. As + statements come and go, we make patches to send to requesters. + """ + + def __init__(self): + self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) + self.handlers: Set[PatchSink] = set() + self.statements: ActiveStatements = ActiveStatements() + + self._localStatements = LocalStatements(self._onPatch) + + def state(self) -> Dict: + return { + 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']), + 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), + 'statements': self.statements.state(), + } + + def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: + streamId = handler.streamId + matches = [s for s in config['streams'] if s['id'] == streamId] + if len(matches) != 1: + raise ValueError("%s matches for %r" % (len(matches), streamId)) + return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] + + @ON_PATCH_CALLS.time() + def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False): + if fullGraph: + # a reconnect may need to resend the full graph even + # though we've already sent some statements + self.statements.replaceSourceStatements(source, p.addQuads) + else: + self.statements.applySourcePatch(source, p) + + self._sendUpdatePatch() + + if log.isEnabledFor(logging.DEBUG): + self.statements.pprintTable() + + if source != COLLECTOR: + self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) + + @SEND_UPDATE_PATCH_CALLS.time() + def _sendUpdatePatch(self, handler: Optional[PatchSink] = None): + """ + send a patch event out this handler to bring it up to date with + self.statements + """ + now = time.time() + selected = self.handlers + if handler is not None: + if handler not in self.handlers: + log.error("called _sendUpdatePatch on a handler that's gone") + return + selected = {handler} + # reduce loops here- prepare all patches at once + for h in selected: + period = .9 + if 'Raspbian' in h.request.headers.get('user-agent', ''): + period = 5 + if h.lastPatchSentTime > now - period: + continue + p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) + log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) + if not p.isNoop(): + log.debug("send patch %s to %s", p.shortSummary(), h) + # This can be a giant line, which was a problem + # once. Might be nice for this service to try to break + # it up into multiple sends, although there's no + # guarantee at all since any single stmt could be any + # length. + h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch') + h.lastPatchSentTime = now + else: + log.debug('nothing to send to %s', h) + + def addSseHandler(self, handler: PatchSink): + log.info('addSseHandler %r %r', handler, handler.streamId) + + # fail early if id doesn't match + sources = self._sourcesForHandler(handler) + + self.handlers.add(handler) + + for source in sources: + if source not in self.clients and source != COLLECTOR: + log.debug('connect to patch source %s', source) + self._localStatements.setSourceState(source, ROOM['connect']) + self.clients[source] = ReconnectingPatchSource(source, + listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), + reconnectSecs=10) + log.debug('bring new client up to date') + + self._sendUpdatePatch(handler) + + def removeSseHandler(self, handler: PatchSink): + log.info('removeSseHandler %r', handler) + self.statements.discardHandler(handler) + for source in self._sourcesForHandler(handler): + for otherHandler in self.handlers: + if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): + # still in use + break + else: + self._stopClient(source) + + self.handlers.remove(handler) + + def _stopClient(self, url: SourceUri): + if url == COLLECTOR: + return + + self.clients[url].stop() + + self.statements.discardSource(url) + + self._localStatements.setSourceState(url, None) + if url in self.clients: + del self.clients[url] + + self.cleanup() + + def cleanup(self): + """ + despite the attempts above, we still get useless rows in the table + sometimes + """ + with self.statements.postDeleteStatements() as garbage: + for stmt, (sources, handlers) in self.statements.table.items(): + if not sources and not any(h in self.handlers for h in handlers): + garbage.add(stmt) + + +class State(cyclone.web.RequestHandler): + + @GET_STATE_CALLS.time() + def get(self) -> None: + try: + state = self.settings.graphClients.state() + self.write(json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>')) + except Exception: + import traceback + traceback.print_exc() + raise + + +class GraphList(cyclone.web.RequestHandler): + + def get(self) -> None: + self.write(json.dumps(config['streams'])) + + +if __name__ == '__main__': + arg = docopt(""" + Usage: sse_collector.py [options] + + -v Verbose + -i Info level only + """) + + if arg['-v'] or arg['-i']: + enableTwistedLog() + log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) + defer.setDebugging(True) + + graphClients = GraphClients() + + reactor.listenTCP(9072, + cyclone.web.Application(handlers=[ + (r"/()", cyclone.web.StaticFileHandler, { + "path": ".", + "default_filename": "index.html" + }), + (r'/state', State), + (r'/graph/', GraphList), + (r'/graph/(.+)', PatchSink), + (r'/metrics', Metrics), + ], + graphClients=graphClients), + interface='::') + reactor.run()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/collector_config.py Tue Mar 29 21:44:04 2022 -0700 @@ -0,0 +1,100 @@ +config = { + 'streams': [ + { + 'id': + 'reposync', + 'sources': [ + 'http://reposync.default.svc.cluster.local.:8000/graph/githubRepos/events', + 'http://reposync.default.svc.cluster.local.:8001/graph/localRepos/events', + ] + }, + { + 'id': + 'home', + 'sources': [ + # should be from :reasoning :source ?s + # 'http://bang:9059/graph/events', + # 'http://bang5:10310/graph/events', # kitchen + # 'http://bang5:10311/graph/events', # living + # 'http://bang5:10312/graph/events', # frontdoor + # 'http://bang5:10313/graph/events', # workshop + # 'http://bang5:10314/graph/events', # garage + # 'http://bang5:10315/graph/events', # bed + # 'http://bang5:10316/graph/events', # changing + # 'http://bang5:10317/graph/events', # frontbed + + # #'http://bang:9099/graph/mapTrails/events', + # 'http://slash:9095/graph/dpms/events', + # 'http://slash:9107/graph/xidle/events', + # 'http://dash:9095/graph/dpms/events', + # 'http://dash:9107/graph/xidle/events', + # 'http://frontdoor5:9095/graph/dpms/events', + # 'http://frontdoor5:9107/graph/xidle/events', + # 'http://frontdoor5:10012/graph/rfid/events', + # 'http://frontdoor5:10013/graph/tinyScreen/events', + + # 'http://bang:9075/graph/environment/events', + # 'http://bang:10011/graph/frontDoorLock/events', + 'http://mqtt-to-rdf.default.svc.cluster.local.:10018/graph/mqtt/events', + # 'http://bang:10016/graph/power/events', + # 'http://bang:10015/graph/store/events', + # 'http://bang:10006/graph/timebank/events', + # 'http://bang:9070/graph/wifi/events', + ] + }, + { + 'id': + 'frontDoor', # used for front door display + 'sources': [ + # 'http://bang:9105/graph/calendar/countdown/events', + # 'http://bang:9105/graph/calendar/upcoming/events', + # 'http://bang:9075/graph/environment/events', + 'http://mqtt-to-rdf.default.svc.cluster.local.:10018/graph/mqtt/events', + # 'http://bang:10016/graph/power/events', + # 'http://bang:10006/graph/timebank/events', + # 'http://bang:10015/graph/store/events', + + # 'http://bang:9059/graph/events', + # 'http://bang5:10310/graph/events', # kitchen + # 'http://bang5:10311/graph/events', # living + # 'http://bang5:10313/graph/events', # workshop + # 'http://bang5:10314/graph/events', # garage + # 'http://bang5:10317/graph/events', # frontbed + ] + }, + { + 'id': + 'network', + 'sources': [ + # 'http://bang:9070/graph/wifi/events', + # 'http://bang:9073/graph/dhcpLeases/events', + # 'http://bang:9009/graph/traffic/events', + ] + }, + { + 'id': + 'source_frontDoor', + 'sources': [ + # 'http://bang5:10312/graph/events', # frontdoor + # 'http://frontdoor5:9095/graph/dpms/events', + # 'http://frontdoor5:9107/graph/xidle/events', + # 'http://frontdoor5:10012/graph/rfid/events', + # 'http://frontdoor5:10013/graph/tinyScreen/events', + # 'http://bang:10011/graph/frontDoorLock/events', + 'http://mqtt-to-rdf.default.svc.cluster.local.:10018/graph/mqtt/events', + ] + }, + { + 'id': 'env', + 'sources': [ + # 'http://bang:9075/graph/environment/events', + ] + }, + { + 'id': 'workshop', + 'sources': [ + # 'http://bang5:10313/graph/events', # workshop + ] + }, + ] +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/deploy.yaml Tue Mar 29 21:44:04 2022 -0700 @@ -0,0 +1,38 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: collector +spec: + replicas: 1 + selector: + matchLabels: + app: collector + template: + metadata: + labels: + app: collector + spec: + containers: + - name: collector + image: bang5:5000/collector_image + imagePullPolicy: "Always" + ports: + - containerPort: 9072 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "kubernetes.io/hostname" + operator: In + values: ["bang"] +--- +apiVersion: v1 +kind: Service +metadata: + name: collector +spec: + ports: + - {port: 9072, targetPort: 9072} + selector: + app: collector
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/index.html Tue Mar 29 21:44:04 2022 -0700 @@ -0,0 +1,151 @@ +<!DOCTYPE html> +<html> + <head> + <title>collector</title> + <meta charset="utf-8" /> + </head> + <body class="rdfBrowsePage"> + <h1>collector</h1> + + <p><a href="/rdf/browse/?graph=/sse_collector/graph/home">See output for graph/home</a></p> + + <collector-state></collector-state> + <script type="module"> + import { LitElement, html, css, customElement } from "https://cdn.skypack.dev/lit-element"; + export class CollectorState extends LitElement { + static get properties() { + return { + latestState: { type: Object }, + }; + } + + constructor() { + super(); + } + + firstUpdated() { + this.latestState = { graphClients: {} }; + this.refresh(); + } + + delayedRefresh() { + setTimeout(() => { + requestAnimationFrame(() => { + this.refresh(); + }); + }, 5000); + } + + refresh() { + fetch("state") + .then((response) => { + return response.json(); + }) + .then((newState) => { + this.latestState = newState; + this.delayedRefresh(); + }); + } + + static get styles() { + return css` + :host { + display: inline-block; + border: 2px solid gray; + padding: 5px; + } + `; + } + + render() { + const sourcesTable = (clients) => { + const clientRow = (client) => { + const d = client.reconnectedPatchSource; + const now = Date.now() / 1000; + const dispSec = (sec) => + Math.abs(sec) > now - 1 + ? "--" + : Math.abs(sec) > 3600 + ? `${Math.round(sec / 3600)} hr` + : Math.abs(sec) > 60 + ? `${Math.round(sec / 60)} min` + : `${Math.round(sec * 10) / 10} sec`; + return html` + <tr> + <td><a href="${d.url.replace("/events", "")}">[browse]</a> <a href="${d.url}">${d.url}</a></td> + <td>${d.fullGraphReceived}</td> + <td>${d.patchesReceived}</td> + <td>${dispSec(d.time.open - now)}</td> + <td>${dispSec(d.time.fullGraph - d.time.open)}</td> + <td>${dispSec(d.time.latestPatch - now)}</td> + </tr> + `; + }; + + return html` + <table> + <thead> + <tr> + <th>patch source</th> + <th>full graph recv</th> + <th>patches recv</th> + <th>time open (rel)</th> + <th>time fullGraph (after open)</th> + <th>time latest patch (rel)</th> + </tr> + </thead> + <tbody> + ${clients.map(clientRow)} + </tbody> + </table> + `; + }; + + const handlersTable = (handlers) => { + const handlerRow = (d) => { + return html` + <tr> + <td>${d.created}</td> + <td>${d.ageHours}</td> + <td><a href="/rdf/browse/?graph=/sse_collector/graph/${d.streamId}">${d.streamId}</a></td> + <td>${d.foafAgent}</td> + <td>${d.userAgent}</td> + </tr> + `; + }; + + return html` + <table> + <thead> + <tr> + <th>created</th> + <th>age hours</th> + <th>stream</th> + <th>foaf agent</th> + <th>user agent</th> + </tr> + </thead> + <tbody> + ${handlers.map(handlerRow)} + </tbody> + </table> + `; + }; + + if (!this.latestState) { + return "loading..."; + } + const d = this.latestState.graphClients; + return html` <div> + <p>Graph: ${d.statements ? d.statements.len : 0} statements</p> + + <p>Sources: ${sourcesTable(d.clients || [])}</p> + + <p>Listening clients: ${handlersTable(d.sseHandlers || [])}</p> + </div>`; + } + } + customElements.define("collector-state", CollectorState); + </script> + </body> +</html>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/requirements.txt Tue Mar 29 21:44:04 2022 -0700 @@ -0,0 +1,14 @@ +docopt +ipdb +prometheus_client==0.11.0 +rdflib-jsonld==0.5.0 +rdflib==5.0.0 +service_identity==21.1.0 +twisted + +https://github.com/drewp/cyclone/archive/python3.zip + +cycloneerr +patchablegraph==0.9.0 +rdfdb==0.21.0 +standardservice==0.6.0
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/skaffold.yaml Tue Mar 29 21:44:04 2022 -0700 @@ -0,0 +1,15 @@ +apiVersion: skaffold/v2beta5 +kind: Config +metadata: + name: collector +build: + tagPolicy: + dateTime: + format: "2006-01-02_15-04-05" + timezone: "Local" + artifacts: + - image: bang5:5000/collector_image +deploy: + kubectl: + manifests: + - deploy.yaml