Mercurial > code > home > repos > homeauto
changeset 794:fafe86ae0b03
py3, k8s, prometheus updates
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Dec 2020 17:00:08 -0800 |
parents | c3e3bd5dfa0b |
children | c8562ace4917 |
files | service/collector/.flake8 service/collector/.style.yapf service/collector/Dockerfile service/collector/collector.py service/collector/deploy.yaml service/collector/mypy.ini service/collector/requirements.txt service/collector/serv.n3 service/collector/skaffold.yaml service/collector/tasks.py |
diffstat | 10 files changed, 172 insertions(+), 164 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/collector/.flake8 Sat Dec 26 17:00:08 2020 -0800 @@ -0,0 +1,3 @@ +[flake8] +ignore=W504 +max-line-length=160 \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/collector/.style.yapf Sat Dec 26 17:00:08 2020 -0800 @@ -0,0 +1,4 @@ +# overwritten by /home/drewp/bin/setup_home_venv +[style] +based_on_style = google +column_limit = 160
--- a/service/collector/Dockerfile Mon Nov 30 23:40:38 2020 -0800 +++ b/service/collector/Dockerfile Sat Dec 26 17:00:08 2020 -0800 @@ -1,4 +1,4 @@ -FROM bang6:5000/base_x86 +FROM bang5:5000/base_x86 WORKDIR /opt @@ -13,4 +13,4 @@ EXPOSE 9072 -CMD [ "python3", "./sse_collector.py" ] +CMD [ "python3", "./collector.py" ]
--- a/service/collector/collector.py Mon Nov 30 23:40:38 2020 -0800 +++ b/service/collector/collector.py Sat Dec 26 17:00:08 2020 -0800 @@ -7,57 +7,62 @@ - filter out unneeded stmts from the sources - give a time resolution and concatenate any patches that come faster than that res """ -from docopt import docopt -from greplin import scales -from greplin.scales.cyclonehandler import StatsHandler -from rdflib import Namespace, URIRef - -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from rdflib import StatementType -else: - class StatementType: pass # type: ignore - +import collections +import json +import logging +import time +from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union) -from rdflib.term import Node -from twisted.internet import reactor, defer -from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional -import cyclone.web, cyclone.sse -import logging, collections, json, time - -from standardservice.logsetup import log, enableTwistedLog +import cyclone.sse +import cyclone.web +from docopt import docopt from patchablegraph import jsonFromPatch +from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource +from prometheus_client import Counter, Gauge, Histogram, Summary +from prometheus_client.exposition import generate_latest +from prometheus_client.registry import REGISTRY from rdfdb.patch import Patch - -from patchablegraph.patchsource import ReconnectingPatchSource +from rdflib import Namespace, URIRef +from rdflib.term import Node, Statement +from standardservice.logsetup import enableTwistedLog, log +from twisted.internet import defer, reactor from collector_config import config + #SourceUri = NewType('SourceUri', URIRef) # doesn't work -class SourceUri(URIRef): pass +class SourceUri(URIRef): + pass ROOM = Namespace("http://projects.bigasterisk.com/room/") COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) -STATS = scales.collection('/root', - scales.PmfStat('getState'), - scales.PmfStat('localStatementsPatch'), - scales.PmfStat('makeSyncPatch'), - scales.PmfStat('onPatch'), - scales.PmfStat('sendUpdatePatch'), - scales.PmfStat('replaceSourceStatements'), -) +GET_STATE_CALLS = Summary("get_state_calls", 'calls') +LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') +MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') +ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') +SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') +REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') + + +class Metrics(cyclone.web.RequestHandler): + + def get(self): + self.add_header('content-type', 'text/plain') + self.write(generate_latest(REGISTRY)) + class LocalStatements(object): """ functions that make statements originating from sse_collector itself """ + def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): self.applyPatch = applyPatch - self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef + self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef - @STATS.localStatementsPatch.time() + @LOCAL_STATEMENTS_PATCH_CALLS.time() def setSourceState(self, source: SourceUri, state: URIRef): """ add a patch to the COLLECTOR graph about the state of this @@ -81,27 +86,27 @@ ])) else: self._sourceState[source] = state - self.applyPatch(COLLECTOR, Patch( - addQuads=[ + self.applyPatch(COLLECTOR, Patch(addQuads=[ (source, ROOM['state'], state, COLLECTOR), - ], - delQuads=[ - (source, ROOM['state'], oldState, COLLECTOR), - ])) + ], delQuads=[ + (source, ROOM['state'], oldState, COLLECTOR), + ])) + def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: if isinstance(t, URIRef): - return (t.replace('http://projects.bigasterisk.com/room/', 'room:') - .replace('http://projects.bigasterisk.com/device/', 'dev:') - .replace('http://bigasterisk.com/sse_collector/', 'sc:')) + return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/', + 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:')) return t -def abbrevStmt(stmt: StatementType) -> str: - return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), - abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) + +def abbrevStmt(stmt: Statement) -> str: + return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) + class PatchSink(cyclone.sse.SSEHandler): _handlerSerial = 0 + def __init__(self, application: cyclone.web.Application, request): cyclone.sse.SSEHandler.__init__(self, application, request) self.bound = False @@ -120,7 +125,7 @@ 'created': round(self.created, 2), 'ageHours': round((time.time() - self.created) / 3600, 2), 'streamId': self.streamId, - 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing + 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing 'foafAgent': self.request.headers.get('X-Foaf-Agent'), 'userAgent': self.request.headers.get('user-agent'), } @@ -138,53 +143,49 @@ self.graphClients.removeSseHandler(self) -StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[PatchSink]]] +StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] class PostDeleter(object): + def __init__(self, statements: StatementTable): self.statements = statements def __enter__(self): - self._garbage: List[StatementType] = [] + self._garbage: List[Statement] = [] return self - def add(self, stmt: StatementType): + def add(self, stmt: Statement): self._garbage.append(stmt) def __exit__(self, type, value, traceback): if type is not None: - raise + raise NotImplementedError() for stmt in self._garbage: del self.statements[stmt] class ActiveStatements(object): + def __init__(self): # This table holds statements asserted by any of our sources # plus local statements that we introduce (source is # http://bigasterisk.com/sse_collector/). - self.table: StatementTable = collections.defaultdict( - lambda: (set(), set())) + self.table: StatementTable = collections.defaultdict(lambda: (set(), set())) def state(self) -> Dict: return { 'len': len(self.table), - } + } def postDeleteStatements(self) -> PostDeleter: return PostDeleter(self.table) def pprintTable(self) -> None: - for i, (stmt, (sources, handlers)) in enumerate( - sorted(self.table.items())): - print("%03d. %-80s from %s to %s" % ( - i, - abbrevStmt(stmt), - [abbrevTerm(s) for s in sources], - handlers)) + for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): + print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) - @STATS.makeSyncPatch.time() + @MAKE_SYNC_PATCH_CALLS.time() def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): # todo: this could run all handlers at once, which is how we # use it anyway @@ -195,8 +196,8 @@ for stmt, (stmtSources, handlers) in self.table.items(): belongsInHandler = not sources.isdisjoint(stmtSources) handlerHasIt = handler in handlers - #log.debug("%s belong=%s has=%s", - # abbrevStmt(stmt), belongsInHandler, handlerHasIt) + # log.debug("%s belong=%s has=%s", + # abbrevStmt(stmt), belongsInHandler, handlerHasIt) if belongsInHandler and not handlerHasIt: adds.append(stmt) handlers.add(handler) @@ -212,16 +213,14 @@ for stmt in p.addQuads: sourceUrls, handlers = self.table[stmt] if source in sourceUrls: - raise ValueError("%s added stmt that it already had: %s" % - (source, abbrevStmt(stmt))) + raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) sourceUrls.add(source) with self.postDeleteStatements() as garbage: for stmt in p.delQuads: sourceUrls, handlers = self.table[stmt] if source not in sourceUrls: - raise ValueError("%s deleting stmt that it didn't have: %s" % - (source, abbrevStmt(stmt))) + raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt))) sourceUrls.remove(source) # this is rare, since some handler probably still has # the stmt we're deleting, but it can happen e.g. when @@ -229,9 +228,8 @@ if not sourceUrls and not handlers: garbage.add(stmt) - @STATS.replaceSourceStatements.time() - def replaceSourceStatements(self, source: SourceUri, - stmts: Sequence[StatementType]): + @REPLACE_SOURCE_STATEMENTS_CALLS.time() + def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]): log.debug('replaceSourceStatements with %s stmts', len(stmts)) newStmts = set(stmts) @@ -264,7 +262,6 @@ garbage.add(stmt) - class GraphClients(object): """ All the active PatchSources and SSEHandlers @@ -274,6 +271,7 @@ asserting them and the requesters who currently know them. As statements come and go, we make patches to send to requesters. """ + def __init__(self): self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) self.handlers: Set[PatchSink] = set() @@ -283,10 +281,8 @@ def state(self) -> Dict: return { - 'clients': sorted([ps.state() for ps in self.clients.values()], - key=lambda r: r['reconnectedPatchSource']['url']), - 'sseHandlers': sorted([h.state() for h in self.handlers], - key=lambda r: (r['streamId'], r['created'])), + 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']), + 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), 'statements': self.statements.state(), } @@ -295,11 +291,10 @@ matches = [s for s in config['streams'] if s['id'] == streamId] if len(matches) != 1: raise ValueError("%s matches for %r" % (len(matches), streamId)) - return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [ - COLLECTOR] + return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] - @STATS.onPatch.time() - def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False): + @ON_PATCH_CALLS.time() + def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False): if fullGraph: # a reconnect may need to resend the full graph even # though we've already sent some statements @@ -313,13 +308,10 @@ self.statements.pprintTable() if source != COLLECTOR: - self._localStatements.setSourceState( - source, - ROOM['fullGraphReceived'] if fullGraph else - ROOM['patchesReceived']) + self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) - @STATS.sendUpdatePatch.time() - def _sendUpdatePatch(self, handler: Optional[PatchSink]=None): + @SEND_UPDATE_PATCH_CALLS.time() + def _sendUpdatePatch(self, handler: Optional[PatchSink] = None): """ send a patch event out this handler to bring it up to date with self.statements @@ -347,8 +339,7 @@ # it up into multiple sends, although there's no # guarantee at all since any single stmt could be any # length. - h.sendEvent(message=jsonFromPatch(p).encode('utf8'), - event=b'patch') + h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch') h.lastPatchSentTime = now else: log.debug('nothing to send to %s', h) @@ -365,11 +356,9 @@ if source not in self.clients and source != COLLECTOR: log.debug('connect to patch source %s', source) self._localStatements.setSourceState(source, ROOM['connect']) - self.clients[source] = ReconnectingPatchSource( - source, - listener=lambda p, fullGraph, source=source: self._onPatch( - source, p, fullGraph), - reconnectSecs=10) + self.clients[source] = ReconnectingPatchSource(source, + listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), + reconnectSecs=10) log.debug('bring new client up to date') self._sendUpdatePatch(handler) @@ -379,8 +368,7 @@ self.statements.discardHandler(handler) for source in self._sourcesForHandler(handler): for otherHandler in self.handlers: - if (otherHandler != handler and - source in self._sourcesForHandler(otherHandler)): + if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): # still in use break else: @@ -414,20 +402,24 @@ class State(cyclone.web.RequestHandler): - @STATS.getState.time() + + @GET_STATE_CALLS.time() def get(self) -> None: try: state = self.settings.graphClients.state() - self.write(json.dumps({'graphClients': state}, indent=2, - default=lambda obj: '<unserializable>')) + self.write(json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>')) except Exception: - import traceback; traceback.print_exc() + import traceback + traceback.print_exc() raise + class GraphList(cyclone.web.RequestHandler): + def get(self) -> None: self.write(json.dumps(config['streams'])) + if __name__ == '__main__': arg = docopt(""" Usage: sse_collector.py [options] @@ -441,21 +433,19 @@ log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) defer.setDebugging(True) - graphClients = GraphClients() - #exporter = InfluxExporter(... to export some stats values - reactor.listenTCP( - 9072, - cyclone.web.Application( - handlers=[ - (r"/()", cyclone.web.StaticFileHandler, { - "path": ".", "default_filename": "index.html"}), - (r'/state', State), - (r'/graph/', GraphList), - (r'/graph/(.+)', PatchSink), - (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}), - ], - graphClients=graphClients), - interface='::') + reactor.listenTCP(9072, + cyclone.web.Application(handlers=[ + (r"/()", cyclone.web.StaticFileHandler, { + "path": ".", + "default_filename": "index.html" + }), + (r'/state', State), + (r'/graph/', GraphList), + (r'/graph/(.+)', PatchSink), + (r'/metrics', Metrics), + ], + graphClients=graphClients), + interface='::') reactor.run()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/collector/deploy.yaml Sat Dec 26 17:00:08 2020 -0800 @@ -0,0 +1,38 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: collector +spec: + replicas: 1 + selector: + matchLabels: + app: collector + template: + metadata: + labels: + app: collector + spec: + containers: + - name: collector + image: bang5:5000/collector_image + imagePullPolicy: "Always" + ports: + - containerPort: 9072 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "kubernetes.io/hostname" + operator: In + values: ["bang"] +--- +apiVersion: v1 +kind: Service +metadata: + name: collector +spec: + ports: + - {port: 9072, targetPort: 9072} + selector: + app: collector
--- a/service/collector/mypy.ini Mon Nov 30 23:40:38 2020 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,7 +0,0 @@ -[mypy] -no_implicit_optional = True -#ignore_missing_imports = True -python_executable = /usr/bin/python3 -warn_unused_configs = True -warn_return_any = True -mypy_path = /opt/stubs:/usr/local/lib/python3.6/dist-packages/ \ No newline at end of file
--- a/service/collector/requirements.txt Mon Nov 30 23:40:38 2020 -0800 +++ b/service/collector/requirements.txt Sat Dec 26 17:00:08 2020 -0800 @@ -1,18 +1,16 @@ docopt ipdb +mypy +prometheus_client==0.9.0 +py-spy +rdflib-jsonld==0.5.0 +rdflib==5.0.0 service_identity==18.1.0 -twisted==19.2.0 -py-spy -mypy +twisted==20.3.0 -rdflib==4.2.2 -rdflib-jsonld==0.4.0 - -git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales https://github.com/drewp/cyclone/archive/python3.zip -influxdb==5.2.2 cycloneerr -patchablegraph==0.7.0 -rdfdb==0.8.0 -standardservice==0.4.0 +patchablegraph==0.9.0 +rdfdb==0.21.0 +standardservice==0.6.0
--- a/service/collector/serv.n3 Mon Nov 30 23:40:38 2020 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,23 +0,0 @@ -@prefix : <http://bigasterisk.com/ns/serv#> . -@prefix auth: <http://bigasterisk.com/ns/serv/auth#> . -@prefix serv: <http://bigasterisk.com/services/> . - - -serv:collector a :Service; - :path "/collector/"; - :openid auth:admin; - :serverHost "bang"; - :internalPort 9072; - :prodDockerFlags ( - "-p" "9072:9072" - "--net=host" - ); - :localDockerFlags ( - "-v" "`pwd`:/opt" - ); - :localRunCmdline ( - "python3" "collector.py" "-v" - ); - :dockerFile "Dockerfile" -. -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/collector/skaffold.yaml Sat Dec 26 17:00:08 2020 -0800 @@ -0,0 +1,15 @@ +apiVersion: skaffold/v2beta5 +kind: Config +metadata: + name: collector +build: + tagPolicy: + dateTime: + format: "2006-01-02_15-04-05" + timezone: "Local" + artifacts: + - image: bang5:5000/collector_image +deploy: + kubectl: + manifests: + - deploy.yaml
--- a/service/collector/tasks.py Mon Nov 30 23:40:38 2020 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,10 +0,0 @@ -from invoke import task - - -@task(pre=[build_image]) -def shell(ctx): - ctx.run(f'docker run --rm --name={JOB}_shell -v `pwd`/.mypy_cache:/opt/.mypy_cache -v `pwd`/../../stubs:/opt/stubs -v `pwd`/sse_collector.py:/opt/sse_collector.py --net=host {TAG_x86} /bin/bash', pty=True) - -@task(pre=[build_image]) -def local_run(ctx): - ctx.run(f'docker run --rm -it -p {PORT}:{PORT} --net=host --cap-add SYS_PTRACE --dns 10.2.0.1 --dns-search bigasterisk.com -v `pwd`/static:/opt/static {TAG_x86} python3 sse_collector.py -i', pty=True)