changeset 794:fafe86ae0b03

py3, k8s, prometheus updates
author drewp@bigasterisk.com
date Sat, 26 Dec 2020 17:00:08 -0800
parents c3e3bd5dfa0b
children c8562ace4917
files service/collector/.flake8 service/collector/.style.yapf service/collector/Dockerfile service/collector/collector.py service/collector/deploy.yaml service/collector/mypy.ini service/collector/requirements.txt service/collector/serv.n3 service/collector/skaffold.yaml service/collector/tasks.py
diffstat 10 files changed, 172 insertions(+), 164 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/.flake8	Sat Dec 26 17:00:08 2020 -0800
@@ -0,0 +1,3 @@
+[flake8]
+ignore=W504
+max-line-length=160
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/.style.yapf	Sat Dec 26 17:00:08 2020 -0800
@@ -0,0 +1,4 @@
+# overwritten by /home/drewp/bin/setup_home_venv
+[style]
+based_on_style = google
+column_limit = 160
--- a/service/collector/Dockerfile	Mon Nov 30 23:40:38 2020 -0800
+++ b/service/collector/Dockerfile	Sat Dec 26 17:00:08 2020 -0800
@@ -1,4 +1,4 @@
-FROM bang6:5000/base_x86
+FROM bang5:5000/base_x86
 
 WORKDIR /opt
 
@@ -13,4 +13,4 @@
 
 EXPOSE 9072
 
-CMD [ "python3", "./sse_collector.py" ]
+CMD [ "python3", "./collector.py" ]
--- a/service/collector/collector.py	Mon Nov 30 23:40:38 2020 -0800
+++ b/service/collector/collector.py	Sat Dec 26 17:00:08 2020 -0800
@@ -7,57 +7,62 @@
 - filter out unneeded stmts from the sources
 - give a time resolution and concatenate any patches that come faster than that res
 """
-from docopt import docopt
-from greplin import scales
-from greplin.scales.cyclonehandler import StatsHandler
-from rdflib import Namespace, URIRef
-
-from typing import TYPE_CHECKING
-if TYPE_CHECKING:
-    from rdflib import StatementType
-else:
-    class StatementType: pass  # type: ignore
-
+import collections
+import json
+import logging
+import time
+from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union)
 
-from rdflib.term import Node
-from twisted.internet import reactor, defer
-from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional
-import cyclone.web, cyclone.sse
-import logging, collections, json, time
-
-from standardservice.logsetup import log, enableTwistedLog
+import cyclone.sse
+import cyclone.web
+from docopt import docopt
 from patchablegraph import jsonFromPatch
+from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource
+from prometheus_client import Counter, Gauge, Histogram, Summary
+from prometheus_client.exposition import generate_latest
+from prometheus_client.registry import REGISTRY
 from rdfdb.patch import Patch
-
-from patchablegraph.patchsource import ReconnectingPatchSource
+from rdflib import Namespace,  URIRef
+from rdflib.term import Node, Statement
+from standardservice.logsetup import enableTwistedLog, log
+from twisted.internet import defer, reactor
 
 from collector_config import config
 
+
 #SourceUri = NewType('SourceUri', URIRef) # doesn't work
-class SourceUri(URIRef): pass
+class SourceUri(URIRef):
+    pass
 
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
 
-STATS = scales.collection('/root',
-                          scales.PmfStat('getState'),
-                          scales.PmfStat('localStatementsPatch'),
-                          scales.PmfStat('makeSyncPatch'),
-                          scales.PmfStat('onPatch'),
-                          scales.PmfStat('sendUpdatePatch'),
-                          scales.PmfStat('replaceSourceStatements'),
-)
+GET_STATE_CALLS = Summary("get_state_calls", 'calls')
+LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls')
+MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls')
+ON_PATCH_CALLS = Summary("on_patch_calls", 'calls')
+SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls')
+REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls')
+
+
+class Metrics(cyclone.web.RequestHandler):
+
+    def get(self):
+        self.add_header('content-type', 'text/plain')
+        self.write(generate_latest(REGISTRY))
+
 
 class LocalStatements(object):
     """
     functions that make statements originating from sse_collector itself
     """
+
     def __init__(self, applyPatch: Callable[[URIRef, Patch], None]):
         self.applyPatch = applyPatch
-        self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef
+        self._sourceState: Dict[SourceUri, URIRef] = {}  # source: state URIRef
 
-    @STATS.localStatementsPatch.time()
+    @LOCAL_STATEMENTS_PATCH_CALLS.time()
     def setSourceState(self, source: SourceUri, state: URIRef):
         """
         add a patch to the COLLECTOR graph about the state of this
@@ -81,27 +86,27 @@
             ]))
         else:
             self._sourceState[source] = state
-            self.applyPatch(COLLECTOR, Patch(
-                addQuads=[
+            self.applyPatch(COLLECTOR, Patch(addQuads=[
                 (source, ROOM['state'], state, COLLECTOR),
-                ],
-                delQuads=[
-                    (source, ROOM['state'], oldState, COLLECTOR),
-                ]))
+            ], delQuads=[
+                (source, ROOM['state'], oldState, COLLECTOR),
+            ]))
+
 
 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]:
     if isinstance(t, URIRef):
-        return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
-                .replace('http://projects.bigasterisk.com/device/', 'dev:')
-                .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
+        return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/',
+                                                                                    'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:'))
     return t
 
-def abbrevStmt(stmt: StatementType) -> str:
-    return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]),
-                              abbrevTerm(stmt[2]), abbrevTerm(stmt[3]))
+
+def abbrevStmt(stmt: Statement) -> str:
+    return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3]))
+
 
 class PatchSink(cyclone.sse.SSEHandler):
     _handlerSerial = 0
+
     def __init__(self, application: cyclone.web.Application, request):
         cyclone.sse.SSEHandler.__init__(self, application, request)
         self.bound = False
@@ -120,7 +125,7 @@
             'created': round(self.created, 2),
             'ageHours': round((time.time() - self.created) / 3600, 2),
             'streamId': self.streamId,
-            'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing
+            'remoteIp': self.request.remote_ip,  # wrong, need some forwarded-for thing
             'foafAgent': self.request.headers.get('X-Foaf-Agent'),
             'userAgent': self.request.headers.get('user-agent'),
         }
@@ -138,53 +143,49 @@
             self.graphClients.removeSseHandler(self)
 
 
-StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[PatchSink]]]
+StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]]
 
 
 class PostDeleter(object):
+
     def __init__(self, statements: StatementTable):
         self.statements = statements
 
     def __enter__(self):
-        self._garbage: List[StatementType] = []
+        self._garbage: List[Statement] = []
         return self
 
-    def add(self, stmt: StatementType):
+    def add(self, stmt: Statement):
         self._garbage.append(stmt)
 
     def __exit__(self, type, value, traceback):
         if type is not None:
-            raise
+            raise NotImplementedError()
         for stmt in self._garbage:
             del self.statements[stmt]
 
 
 class ActiveStatements(object):
+
     def __init__(self):
         # This table holds statements asserted by any of our sources
         # plus local statements that we introduce (source is
         # http://bigasterisk.com/sse_collector/).
-        self.table: StatementTable = collections.defaultdict(
-            lambda: (set(), set()))
+        self.table: StatementTable = collections.defaultdict(lambda: (set(), set()))
 
     def state(self) -> Dict:
         return {
             'len': len(self.table),
-            }
+        }
 
     def postDeleteStatements(self) -> PostDeleter:
         return PostDeleter(self.table)
 
     def pprintTable(self) -> None:
-        for i, (stmt, (sources, handlers)) in enumerate(
-                sorted(self.table.items())):
-            print("%03d. %-80s from %s to %s" % (
-                i,
-                abbrevStmt(stmt),
-                [abbrevTerm(s) for s in sources],
-                handlers))
+        for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())):
+            print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers))
 
-    @STATS.makeSyncPatch.time()
+    @MAKE_SYNC_PATCH_CALLS.time()
     def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]):
         # todo: this could run all handlers at once, which is how we
         # use it anyway
@@ -195,8 +196,8 @@
             for stmt, (stmtSources, handlers) in self.table.items():
                 belongsInHandler = not sources.isdisjoint(stmtSources)
                 handlerHasIt = handler in handlers
-                #log.debug("%s belong=%s has=%s",
-                #          abbrevStmt(stmt), belongsInHandler, handlerHasIt)
+                # log.debug("%s belong=%s has=%s",
+                #           abbrevStmt(stmt), belongsInHandler, handlerHasIt)
                 if belongsInHandler and not handlerHasIt:
                     adds.append(stmt)
                     handlers.add(handler)
@@ -212,16 +213,14 @@
         for stmt in p.addQuads:
             sourceUrls, handlers = self.table[stmt]
             if source in sourceUrls:
-                raise ValueError("%s added stmt that it already had: %s" %
-                                 (source, abbrevStmt(stmt)))
+                raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt)))
             sourceUrls.add(source)
 
         with self.postDeleteStatements() as garbage:
             for stmt in p.delQuads:
                 sourceUrls, handlers = self.table[stmt]
                 if source not in sourceUrls:
-                    raise ValueError("%s deleting stmt that it didn't have: %s" %
-                                     (source, abbrevStmt(stmt)))
+                    raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt)))
                 sourceUrls.remove(source)
                 # this is rare, since some handler probably still has
                 # the stmt we're deleting, but it can happen e.g. when
@@ -229,9 +228,8 @@
                 if not sourceUrls and not handlers:
                     garbage.add(stmt)
 
-    @STATS.replaceSourceStatements.time()
-    def replaceSourceStatements(self, source: SourceUri,
-                                stmts: Sequence[StatementType]):
+    @REPLACE_SOURCE_STATEMENTS_CALLS.time()
+    def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]):
         log.debug('replaceSourceStatements with %s stmts', len(stmts))
         newStmts = set(stmts)
 
@@ -264,7 +262,6 @@
                     garbage.add(stmt)
 
 
-
 class GraphClients(object):
     """
     All the active PatchSources and SSEHandlers
@@ -274,6 +271,7 @@
     asserting them and the requesters who currently know them. As
     statements come and go, we make patches to send to requesters.
     """
+
     def __init__(self):
         self.clients: Dict[SourceUri, PatchSource] = {}  # (COLLECTOR is not listed)
         self.handlers: Set[PatchSink] = set()
@@ -283,10 +281,8 @@
 
     def state(self) -> Dict:
         return {
-            'clients': sorted([ps.state() for ps in self.clients.values()],
-                              key=lambda r: r['reconnectedPatchSource']['url']),
-            'sseHandlers': sorted([h.state() for h in self.handlers],
-                                  key=lambda r: (r['streamId'],  r['created'])),
+            'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']),
+            'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])),
             'statements': self.statements.state(),
         }
 
@@ -295,11 +291,10 @@
         matches = [s for s in config['streams'] if s['id'] == streamId]
         if len(matches) != 1:
             raise ValueError("%s matches for %r" % (len(matches), streamId))
-        return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [
-            COLLECTOR]
+        return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR]
 
-    @STATS.onPatch.time()
-    def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False):
+    @ON_PATCH_CALLS.time()
+    def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False):
         if fullGraph:
             # a reconnect may need to resend the full graph even
             # though we've already sent some statements
@@ -313,13 +308,10 @@
             self.statements.pprintTable()
 
         if source != COLLECTOR:
-            self._localStatements.setSourceState(
-                source,
-                ROOM['fullGraphReceived'] if fullGraph else
-                ROOM['patchesReceived'])
+            self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived'])
 
-    @STATS.sendUpdatePatch.time()
-    def _sendUpdatePatch(self, handler: Optional[PatchSink]=None):
+    @SEND_UPDATE_PATCH_CALLS.time()
+    def _sendUpdatePatch(self, handler: Optional[PatchSink] = None):
         """
         send a patch event out this handler to bring it up to date with
         self.statements
@@ -347,8 +339,7 @@
                 # it up into multiple sends, although there's no
                 # guarantee at all since any single stmt could be any
                 # length.
-                h.sendEvent(message=jsonFromPatch(p).encode('utf8'),
-                            event=b'patch')
+                h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch')
                 h.lastPatchSentTime = now
             else:
                 log.debug('nothing to send to %s', h)
@@ -365,11 +356,9 @@
             if source not in self.clients and source != COLLECTOR:
                 log.debug('connect to patch source %s', source)
                 self._localStatements.setSourceState(source, ROOM['connect'])
-                self.clients[source] = ReconnectingPatchSource(
-                    source,
-                    listener=lambda p, fullGraph, source=source: self._onPatch(
-                        source, p, fullGraph),
-                    reconnectSecs=10)
+                self.clients[source] = ReconnectingPatchSource(source,
+                                                               listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph),
+                                                               reconnectSecs=10)
         log.debug('bring new client up to date')
 
         self._sendUpdatePatch(handler)
@@ -379,8 +368,7 @@
         self.statements.discardHandler(handler)
         for source in self._sourcesForHandler(handler):
             for otherHandler in self.handlers:
-                if (otherHandler != handler and
-                    source in self._sourcesForHandler(otherHandler)):
+                if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)):
                     # still in use
                     break
             else:
@@ -414,20 +402,24 @@
 
 
 class State(cyclone.web.RequestHandler):
-    @STATS.getState.time()
+
+    @GET_STATE_CALLS.time()
     def get(self) -> None:
         try:
             state = self.settings.graphClients.state()
-            self.write(json.dumps({'graphClients': state}, indent=2,
-                                  default=lambda obj: '<unserializable>'))
+            self.write(json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>'))
         except Exception:
-            import traceback; traceback.print_exc()
+            import traceback
+            traceback.print_exc()
             raise
 
+
 class GraphList(cyclone.web.RequestHandler):
+
     def get(self) -> None:
         self.write(json.dumps(config['streams']))
 
+
 if __name__ == '__main__':
     arg = docopt("""
     Usage: sse_collector.py [options]
@@ -441,21 +433,19 @@
         log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO)
         defer.setDebugging(True)
 
-
     graphClients = GraphClients()
-    #exporter = InfluxExporter(... to export some stats values
 
-    reactor.listenTCP(
-        9072,
-        cyclone.web.Application(
-            handlers=[
-                (r"/()", cyclone.web.StaticFileHandler, {
-                    "path": ".", "default_filename": "index.html"}),
-                (r'/state', State),
-                (r'/graph/', GraphList),
-                (r'/graph/(.+)', PatchSink),
-                (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}),
-            ],
-            graphClients=graphClients),
-        interface='::')
+    reactor.listenTCP(9072,
+                      cyclone.web.Application(handlers=[
+                          (r"/()", cyclone.web.StaticFileHandler, {
+                              "path": ".",
+                              "default_filename": "index.html"
+                          }),
+                          (r'/state', State),
+                          (r'/graph/', GraphList),
+                          (r'/graph/(.+)', PatchSink),
+                          (r'/metrics', Metrics),
+                      ],
+                                              graphClients=graphClients),
+                      interface='::')
     reactor.run()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/deploy.yaml	Sat Dec 26 17:00:08 2020 -0800
@@ -0,0 +1,38 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: collector
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: collector
+  template:
+    metadata:
+      labels:
+        app: collector
+    spec:
+      containers:
+        - name: collector
+          image: bang5:5000/collector_image
+          imagePullPolicy: "Always"
+          ports:
+          - containerPort: 9072
+      affinity:
+        nodeAffinity:
+          requiredDuringSchedulingIgnoredDuringExecution:
+            nodeSelectorTerms:
+            - matchExpressions:
+              - key: "kubernetes.io/hostname"
+                operator: In
+                values: ["bang"]
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: collector
+spec:
+  ports:
+  - {port: 9072, targetPort: 9072}
+  selector:
+    app: collector
--- a/service/collector/mypy.ini	Mon Nov 30 23:40:38 2020 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,7 +0,0 @@
-[mypy]
-no_implicit_optional = True
-#ignore_missing_imports = True
-python_executable = /usr/bin/python3
-warn_unused_configs = True
-warn_return_any = True
-mypy_path = /opt/stubs:/usr/local/lib/python3.6/dist-packages/
\ No newline at end of file
--- a/service/collector/requirements.txt	Mon Nov 30 23:40:38 2020 -0800
+++ b/service/collector/requirements.txt	Sat Dec 26 17:00:08 2020 -0800
@@ -1,18 +1,16 @@
 docopt
 ipdb
+mypy
+prometheus_client==0.9.0
+py-spy
+rdflib-jsonld==0.5.0
+rdflib==5.0.0
 service_identity==18.1.0
-twisted==19.2.0
-py-spy
-mypy
+twisted==20.3.0
 
-rdflib==4.2.2
-rdflib-jsonld==0.4.0
-
-git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
 https://github.com/drewp/cyclone/archive/python3.zip
-influxdb==5.2.2
 
 cycloneerr
-patchablegraph==0.7.0
-rdfdb==0.8.0
-standardservice==0.4.0
+patchablegraph==0.9.0
+rdfdb==0.21.0
+standardservice==0.6.0
--- a/service/collector/serv.n3	Mon Nov 30 23:40:38 2020 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,23 +0,0 @@
-@prefix : <http://bigasterisk.com/ns/serv#> .
-@prefix auth: <http://bigasterisk.com/ns/serv/auth#> .
-@prefix serv: <http://bigasterisk.com/services/> .
-
-
-serv:collector a :Service;
-      :path "/collector/";
-      :openid auth:admin;
-      :serverHost "bang";
-      :internalPort 9072;
-      :prodDockerFlags (
-      "-p" "9072:9072"
-      "--net=host"
-      );
-      :localDockerFlags (
-        "-v" "`pwd`:/opt"
-      );
-      :localRunCmdline (
-        "python3" "collector.py" "-v"
-      );
-      :dockerFile "Dockerfile"
-.
-
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/skaffold.yaml	Sat Dec 26 17:00:08 2020 -0800
@@ -0,0 +1,15 @@
+apiVersion: skaffold/v2beta5
+kind: Config
+metadata:
+  name: collector
+build:
+  tagPolicy:
+    dateTime:
+      format: "2006-01-02_15-04-05"
+      timezone: "Local"
+  artifacts:
+  - image: bang5:5000/collector_image
+deploy:
+  kubectl:
+    manifests:
+    - deploy.yaml
--- a/service/collector/tasks.py	Mon Nov 30 23:40:38 2020 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,10 +0,0 @@
-from invoke import task
-
-
-@task(pre=[build_image])
-def shell(ctx):
-    ctx.run(f'docker run --rm --name={JOB}_shell -v `pwd`/.mypy_cache:/opt/.mypy_cache -v `pwd`/../../stubs:/opt/stubs -v `pwd`/sse_collector.py:/opt/sse_collector.py  --net=host {TAG_x86} /bin/bash', pty=True)
-
-@task(pre=[build_image])
-def local_run(ctx):
-    ctx.run(f'docker run --rm -it -p {PORT}:{PORT} --net=host --cap-add SYS_PTRACE --dns 10.2.0.1 --dns-search bigasterisk.com -v `pwd`/static:/opt/static {TAG_x86} python3 sse_collector.py -i', pty=True)