changeset 795:c8562ace4917

big updates for k8s, py3, drop FuXi, use prometheus for metrics.
author drewp@bigasterisk.com
date Sun, 27 Dec 2020 03:29:18 -0800
parents fafe86ae0b03
children fc74ae6d5d68
files service/reasoning/.flake8 service/reasoning/.style.yapf service/reasoning/Dockerfile service/reasoning/actions.py service/reasoning/deploy.yaml service/reasoning/inference.py service/reasoning/inputgraph.py service/reasoning/oneShot service/reasoning/reasoning.py service/reasoning/requirements.txt service/reasoning/serv.n3 service/reasoning/skaffold.yaml service/reasoning/tasks.py
diffstat 13 files changed, 310 insertions(+), 243 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/.flake8	Sun Dec 27 03:29:18 2020 -0800
@@ -0,0 +1,3 @@
+[flake8]
+ignore=W504
+max-line-length=160
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/.style.yapf	Sun Dec 27 03:29:18 2020 -0800
@@ -0,0 +1,4 @@
+# overwritten by /home/drewp/bin/setup_home_venv
+[style]
+based_on_style = google
+column_limit = 160
--- a/service/reasoning/Dockerfile	Sat Dec 26 17:00:08 2020 -0800
+++ b/service/reasoning/Dockerfile	Sun Dec 27 03:29:18 2020 -0800
@@ -1,4 +1,4 @@
-FROM bang6:5000/base_x86
+FROM bang5:5000/base_x86
 
 WORKDIR /opt
 
@@ -7,14 +7,9 @@
 # not sure why this doesn't work from inside requirements.txt
 RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -U 'https://github.com/drewp/cyclone/archive/python3.zip?v2'
 
-COPY FuXi/ FuXi
-RUN pip3 install ./FuXi
-
-RUN pip3 install pytype
-
 COPY *.n3 *.py *.html req* ./
 COPY input ./input
 
 EXPOSE 9071
 
-CMD [ "python3", "./reasoning.py" ]
+CMD [ "python3", "./reasoning.py","-irv" ]
--- a/service/reasoning/actions.py	Sat Dec 26 17:00:08 2020 -0800
+++ b/service/reasoning/actions.py	Sun Dec 27 03:29:18 2020 -0800
@@ -2,9 +2,10 @@
 import logging
 import urllib
 
-from rdflib import URIRef, Namespace, RDF, Literal
+import cyclone.sse
+import treq
+from rdflib import RDF, Literal, Namespace, URIRef
 from twisted.internet import reactor
-import treq
 
 from httpputoutputs import HttpPutOutputs
 from inputgraph import InputGraph
@@ -15,20 +16,25 @@
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/")
 
+
 def secsFromLiteral(v):
     if v[-1] != 's':
         raise NotImplementedError(v)
     return float(v[:-1])
 
+
 def ntStatement(stmt):
+
     def compact(u):
         if isinstance(u, URIRef) and u.startswith(ROOM):
             return 'room:' + u[len(ROOM):]
         return u.n3()
+
     return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2]))
 
 
 class Actions(object):
+
     def __init__(self, inputGraph: InputGraph, sendToLiveClients, mockOutput=False):
         self.inputGraph = inputGraph
         self.mockOutput = mockOutput
@@ -64,21 +70,18 @@
             log.debug('inferred stmt we might PUT: %s', ntStatement(stmt))
             putUrl = deviceGraph.value(stmt[0], ROOM['putUrl'])
             putPred = deviceGraph.value(stmt[0], ROOM['putPredicate'])
-            matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'],
-                                          default=putPred)
+            matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'], default=putPred)
             if putUrl and matchPred == stmt[1]:
-                log.debug('putDevices: stmt %s leads to putting at %s',
-                          ntStatement(stmt), putUrl.n3())
-                self._put(putUrl + '?' + urllib.parse.urlencode([
-                    ('s', str(stmt[0])),
-                    ('p', str(putPred))]),
+                log.debug('putDevices: stmt %s leads to putting at %s', ntStatement(stmt), putUrl.n3())
+                self._put(putUrl + '?' + urllib.parse.urlencode([('s', str(stmt[0])), ('p', str(putPred))]),
                           payload=str(stmt[2].toPython()),
                           agent=agentFor.get(stmt[0], None),
                           refreshSecs=self._getRefreshSecs(stmt[0]))
-                activated.add((stmt[0],
-                               # didn't test that this should be
-                               # stmt[1] and not putPred
-                               stmt[1]))
+                activated.add((
+                    stmt[0],
+                    # didn't test that this should be
+                    # stmt[1] and not putPred
+                    stmt[1]))
         return activated
 
     def _oneShotPostActions(self, deviceGraph, inferred):
@@ -97,18 +100,20 @@
         # nothing in this actually makes them one-shot yet. they'll
         # just fire as often as we get in here, which is not desirable
         log.debug("_oneShotPostActions")
+
         def err(e):
             log.warn("post %s failed", postTarget)
+
         for osp in deviceGraph.subjects(RDF.type, ROOM['OneShotPost']):
             s = deviceGraph.value(osp, ROOM['subject'])
             p = deviceGraph.value(osp, ROOM['predicate'])
             if s is None or p is None:
                 continue
-            #log.info("checking for %s %s", s, p)
+            # log.info("checking for %s %s", s, p)
             for postTarget in inferred.objects(s, p):
                 log.debug("post target %r", postTarget)
                 # this packet ought to have 'oneShot' in it somewhere
-                self.sendToLiveClients({"s":s, "p":p, "o":postTarget})
+                self.sendToLiveClients({"s": s, "p": p, "o": postTarget})
 
                 log.debug("    POST %s", postTarget)
                 if not self.mockOutput:
@@ -129,8 +134,7 @@
         """
 
         defaultStmts = set()
-        for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'],
-                                               REASONING['default']):
+        for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'], REASONING['default']):
             s = deviceGraph.value(defaultDesc, ROOM['subject'])
             p = deviceGraph.value(defaultDesc, ROOM['predicate'])
             if (s, p) not in activated:
@@ -143,15 +147,14 @@
     def _getRefreshSecs(self, target):
         # should be able to map(secsFromLiteral) in here somehow and
         # remove the workaround in httpputoutputs.currentRefreshSecs
-        return self.inputGraph.rxValue(target, ROOM['refreshPutValue'],
-                                       default=Literal('30s'))#.map(secsFromLiteral)
+        return self.inputGraph.rxValue(target, ROOM['refreshPutValue'], default=Literal('30s'))  # .map(secsFromLiteral)
 
     def _put(self, url, payload: str, refreshSecs, agent=None):
         self.putOutputs.put(url, payload, agent, refreshSecs)
 
-import cyclone.sse
 
 class PutOutputsTable(cyclone.sse.SSEHandler):
+
     def __init__(self, application, request):
         cyclone.sse.SSEHandler.__init__(self, application, request)
         self.actions = self.settings.reasoning.actions
@@ -167,8 +170,7 @@
         if not self.bound:
             return
         puts = {
-            'puts': [row.report() for _, row in
-                     sorted(self.actions.putOutputs.state.items())],
+            'puts': [row.report() for _, row in sorted(self.actions.putOutputs.state.items())],
         }
         self.sendEvent(message=json.dumps(puts).encode('utf8'), event=b'update')
         reactor.callLater(1, self.loop)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/deploy.yaml	Sun Dec 27 03:29:18 2020 -0800
@@ -0,0 +1,38 @@
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: reasoning
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: reasoning
+  template:
+    metadata:
+      labels:
+        app: reasoning
+    spec:
+      containers:
+        - name: reasoning
+          image: bang5:5000/reasoning_image
+          imagePullPolicy: "Always"
+          ports:
+          - containerPort: 9071
+      affinity:
+        nodeAffinity:
+          requiredDuringSchedulingIgnoredDuringExecution:
+            nodeSelectorTerms:
+            - matchExpressions:
+              - key: "kubernetes.io/hostname"
+                operator: In
+                values: ["bang"]
+---
+apiVersion: v1
+kind: Service
+metadata:
+  name: reasoning
+spec:
+  ports:
+  - {port: 9071, targetPort: 9071}
+  selector:
+    app: reasoning
--- a/service/reasoning/inference.py	Sat Dec 26 17:00:08 2020 -0800
+++ b/service/reasoning/inference.py	Sun Dec 27 03:29:18 2020 -0800
@@ -2,27 +2,25 @@
 see ./reasoning for usage
 """
 
-import os, contextlib
-try:
-    from rdflib.Graph import Graph
-except ImportError:
-    from rdflib import Graph
+import contextlib
+import os
 
-from rdflib import Namespace
+from prometheus_client import Summary
+from rdflib import Graph, Namespace
+from rdflib.graph import ConjunctiveGraph
 from rdflib.parser import StringInputSource
 
-from FuXi.Rete.Util import generateTokenSet
-from FuXi.Rete import ReteNetwork
-from FuXi.Rete.RuleStore import N3RuleStore
+from escapeoutputstatements import escapeOutputStatements
+
+READ_RULES_CALLS = Summary('read_rules_calls', 'calls')
 
-from greplin import scales
-STATS = scales.collection('/web',
-                          scales.PmfStat('readRules'))
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+LOG = Namespace('http://www.w3.org/2000/10/swap/log#')
+
 
-from escapeoutputstatements import escapeOutputStatements
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-
-def _loadAndEscape(ruleStore, n3, outputPatterns):
+def _loadAndEscape(ruleStore: ConjunctiveGraph, n3: bytes, outputPatterns):
+    ruleStore.parse(StringInputSource(n3), format='n3')
+    return
     ruleGraph = Graph(ruleStore)
 
     # Can't escapeOutputStatements in the ruleStore since it
@@ -32,13 +30,16 @@
     # of my rules. This serialize/parse version is very slow (400ms),
     # but it only runs when the file changes.
     plainGraph = Graph()
-    plainGraph.parse(StringInputSource(n3.encode('utf8')), format='n3') # for inference
+    plainGraph.parse(StringInputSource(n3.encode('utf8')), format='n3')  # for inference
     escapeOutputStatements(plainGraph, outputPatterns=outputPatterns)
     expandedN3 = plainGraph.serialize(format='n3')
 
     ruleGraph.parse(StringInputSource(expandedN3), format='n3')
 
+
 _rulesCache = (None, None, None, None)
+
+
 def readRules(rulesPath, outputPatterns):
     """
     returns (rulesN3, ruleStore)
@@ -49,27 +50,59 @@
     """
     global _rulesCache
 
-    with STATS.readRules.time():
+    with READ_RULES_CALLS.time():
         mtime = os.path.getmtime(rulesPath)
         key = (rulesPath, mtime)
         if _rulesCache[:2] == key:
             _, _, rulesN3, ruleStore = _rulesCache
         else:
-            rulesN3 = open(rulesPath).read() # for web display
+            rulesN3 = open(rulesPath, 'rb').read()  # for web display
 
-            ruleStore = N3RuleStore()
+            ruleStore = ConjunctiveGraph()
             _loadAndEscape(ruleStore, rulesN3, outputPatterns)
-            log.debug('%s rules' % len(ruleStore.rules))
+            log.debug('%s rules' % len(ruleStore))
 
             _rulesCache = key + (rulesN3, ruleStore)
         return rulesN3, ruleStore
 
-def infer(graph, rules):
+
+def infer(graph: ConjunctiveGraph, rules: ConjunctiveGraph):
+    """
+    returns new graph of inferred statements.
     """
-    returns new graph of inferred statements. Plain rete api seems to
-    alter rules.formulae and rules.rules, but this function does not
-    alter the incoming rules object, so you can cache it.
-    """
+    log.info(f'Begin inference of graph len={len(graph)} with rules len={len(rules)}:')
+
+    workingSet = ConjunctiveGraph()
+    workingSet.addN(graph.quads())
+
+    implied = ConjunctiveGraph()
+
+    delta = 1
+    while delta > 0:
+        delta = -len(implied)
+
+        for r in rules:
+            if r[1] == LOG['implies']:
+                containsSetup = all(st in workingSet for st in r[0])
+                if containsSetup:
+                    log.info(f'  Rule {r[0]} -> present={containsSetup}')
+                    for st in r[0]:
+                        log.info(f'     {st[0].n3()} {st[1].n3()} {st[2].n3()}')
+
+                    log.info(f'  ...implies {len(r[2])} statements')
+                if containsSetup:
+                    for st in r[2]:
+                        workingSet.add(st)
+                        implied.add(st)
+            else:
+                log.info(f'  {r}')
+        delta += len(implied)
+        log.info(f'  this inference round added {delta} more implied stmts')
+    log.info(f'{len(implied)} stmts implied:')
+    for st in implied:
+        log.info(f'  {st}')
+    return implied
+
     # based on fuxi/tools/rdfpipe.py
     target = Graph()
     tokenSet = generateTokenSet(graph)
@@ -79,6 +112,7 @@
 
     return target
 
+
 @contextlib.contextmanager
 def _dontChangeRulesStore(rules):
     if not hasattr(rules, '_stashOriginalRules'):
@@ -89,15 +123,21 @@
             del rules.formulae[k]
     rules.rules = rules._stashOriginalRules[:]
 
-import time, logging
+
+import logging
+import time
+
 log = logging.getLogger()
+
+
 def logTime(func):
+
     def inner(*args, **kw):
         t1 = time.time()
         try:
             ret = func(*args, **kw)
         finally:
-            log.info("Call to %s took %.1f ms" % (
-                func.__name__, 1000 * (time.time() - t1)))
+            log.info("Call to %s took %.1f ms" % (func.__name__, 1000 * (time.time() - t1)))
         return ret
+
     return inner
--- a/service/reasoning/inputgraph.py	Sat Dec 26 17:00:08 2020 -0800
+++ b/service/reasoning/inputgraph.py	Sun Dec 27 03:29:18 2020 -0800
@@ -1,39 +1,36 @@
-import logging, time
+import logging
+import time
 import weakref
 from typing import Callable
 
-from greplin import scales
-from rdflib import Graph, ConjunctiveGraph
-from rdflib import Namespace, URIRef, RDFS
+from patchablegraph.patchsource import ReconnectingPatchSource
+from prometheus_client import Summary
+from rdfdb.patch import Patch
+from rdfdb.rdflibpatch import patchQuads
+from rdflib import RDFS, ConjunctiveGraph, Graph, Namespace, URIRef
 from rdflib.parser import StringInputSource
 from rx.subjects import BehaviorSubject
+from twisted.internet import reactor
 from twisted.python.filepath import FilePath
-from twisted.internet import reactor
-
-from patchablegraph.patchsource import ReconnectingPatchSource
-from rdfdb.rdflibpatch import patchQuads
-from rdfdb.patch import Patch
 
 log = logging.getLogger('fetch')
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 
-
-STATS = scales.collection('/web',
-                          scales.PmfStat('combineGraph'),
-)
+COMBINE_GRAPH_CALLS = Summary('combine_graph_calls', 'calls')
 
 
 def parseRdf(text: str, contentType: str):
     g = Graph()
     g.parse(StringInputSource(text), format={
         'text/n3': 'n3',
-        }[contentType])
+    }[contentType])
     return g
 
 
 class RemoteData(object):
+
     def __init__(self, onChange: Callable[[], None]):
         """we won't fire onChange during init"""
         self.onChange = onChange
@@ -42,17 +39,16 @@
 
     def _finishInit(self):
         self.patchSource = ReconnectingPatchSource(
-            URIRef('http://bang:9072/graph/home'),
-            #URIRef('http://frontdoor:10012/graph/events'),
-            self.onPatch, reconnectSecs=10, agent='reasoning')
+            URIRef('http://collector.default.svc.cluster.local:9072/graph/home'),
+            # URIRef('http://frontdoor:10012/graph/events'),
+            self.onPatch,
+            reconnectSecs=10,
+            agent='reasoning')
 
     def onPatch(self, p: Patch, fullGraph: bool):
         if fullGraph:
             self.graph = ConjunctiveGraph()
-        patchQuads(self.graph,
-                   deleteQuads=p.delQuads,
-                   addQuads=p.addQuads,
-                   perfect=True)
+        patchQuads(self.graph, deleteQuads=p.delQuads, addQuads=p.addQuads, perfect=True)
 
         ignorePredicates = [
             ROOM['signalStrength'],
@@ -73,10 +69,9 @@
         ]
         ignoreContexts = [
             URIRef('http://bigasterisk.com/sse_collector/'),
-            ]
+        ]
         for affected in p.addQuads + p.delQuads:
-            if (affected[1] not in ignorePredicates and
-                affected[3] not in ignoreContexts):
+            if (affected[1] not in ignorePredicates and affected[3] not in ignoreContexts):
                 log.debug("  remote graph changed")
                 self.onChange()
                 break
@@ -85,6 +80,7 @@
 
 
 class InputGraph(object):
+
     def __init__(self, inputDirs, onChange):
         """
         this has one Graph that's made of:
@@ -118,7 +114,7 @@
     def _rxUpdate(self, subj, pred, default, rxv):
         rxv.on_next(self.getGraph().value(subj, pred, default=default))
 
-    def rxValue(self, subj, pred, default):# -> BehaviorSubject:
+    def rxValue(self, subj, pred, default):  # -> BehaviorSubject:
         value = BehaviorSubject(default)
         self._rxValues[value] = (subj, pred, default)
         self._rxUpdate(subj, pred, default, value)
@@ -166,13 +162,13 @@
         self.addOneShot(g)
         return time.time() - t1
 
-    @STATS.combineGraph.time()
-    def getGraph(self):
+    @COMBINE_GRAPH_CALLS.time()
+    def getGraph(self) -> ConjunctiveGraph:
         """rdflib Graph with the file+remote contents of the input graph"""
         # this could be much faster with the combined readonly graph
         # view from rdflib
         if self._combinedGraph is None:
-            self._combinedGraph = Graph()
+            self._combinedGraph = ConjunctiveGraph()
             if self._fileGraph:
                 for s in self._fileGraph:
                     self._combinedGraph.add(s)
--- a/service/reasoning/oneShot	Sat Dec 26 17:00:08 2020 -0800
+++ b/service/reasoning/oneShot	Sun Dec 27 03:29:18 2020 -0800
@@ -1,18 +1,26 @@
-#!/usr/bin/python
+#!/usr/bin/python3
 """
 send a statement to the reasoning server for one update cycle. Args
 are s/p/o in n3 notation, with many prefixes predefined here.
 """
-import sys, requests, time, os
+import json
+import os
+import subprocess
+import sys
+import time
+
+import requests
+
 s, p, o = sys.argv[1:]
 
 prefixes = {
     '': 'http://projects.bigasterisk.com/room/',
-    'room' : 'http://projects.bigasterisk.com/room/',
+    'room': 'http://projects.bigasterisk.com/room/',
     'shuttle': 'http://bigasterisk.com/room/livingRoom/shuttlepro/',
     'sensor': 'http://bigasterisk.com/homeauto/sensor/',
 }
 
+
 def expand(term):
     if ':' not in term or term.startswith(('<', '"', "'")):
         return term
@@ -21,13 +29,16 @@
         return '<%s%s>' % (prefixes[left], right)
     return term
 
+
+pod = json.loads(subprocess.check_output(["kubectl", "get", "pod", "--selector=app=reasoning", "-o", "json"]))
+ip = pod['items'][0]['status']['podIP']
+
 stmt = '%s %s %s .' % (expand(s), expand(p), expand(o))
-print "Sending: %s" % stmt
+print("Sending: %s" % stmt)
 
 t1 = time.time()
-ret = requests.post(
-    'http://%s/oneShot' % os.environ.get('REASONING', 'bang:9071'),
-    headers={"content-type": "text/n3"},
-    data=stmt.encode('ascii'))
+ret = requests.post('http://%s/oneShot' % os.environ.get('REASONING', f'{ip}:9071'),
+                    headers={"content-type": "text/n3"},
+                    data=stmt.encode('ascii'))
 g = float(ret.headers['x-graph-ms'])
-print "%.1f ms for graph update; %.1f ms other overhead" % (g, 1000 * (time.time() - t1) - g)
+print("%.1f ms for graph update; %.1f ms other overhead" % (g, 1000 * (time.time() - t1) - g))
--- a/service/reasoning/reasoning.py	Sat Dec 26 17:00:08 2020 -0800
+++ b/service/reasoning/reasoning.py	Sun Dec 27 03:29:18 2020 -0800
@@ -1,4 +1,3 @@
-#!bin/python
 """
 Graph consists of:
   input/* (read at startup)
@@ -14,70 +13,73 @@
 When do we gather? The services should be able to trigger us, perhaps
 with PSHB, that their graph has changed.
 """
-from crochet import no_setup
-no_setup()
-
-import json, time, traceback, sys
-from logging import getLogger, DEBUG, WARN
+import json
+import sys
+import time
+import traceback
+from logging import DEBUG, WARN, getLogger
 from typing import Dict, Optional, Set, Tuple
 
+import cyclone.web
+import cyclone.websocket
 from colorlog import ColoredFormatter
 from docopt import docopt
-from rdflib import Namespace, Literal, RDF, Graph, URIRef
+from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
+from prometheus_client import Counter, Gauge, Histogram, Summary
+from prometheus_client.exposition import generate_latest
+from prometheus_client.registry import REGISTRY
+from rdflib import RDF, Graph, Literal, Namespace, URIRef
+from rdflib.graph import ConjunctiveGraph
 from rdflib.term import Node
-from twisted.internet import reactor, defer
-import cyclone.web, cyclone.websocket
-from FuXi.Rete.RuleStore import N3RuleStore
-
-from greplin import scales
-from greplin.scales.cyclonehandler import StatsHandler
+from standardservice.logsetup import log, verboseLogging
+from twisted.internet import defer, reactor
 
-from inference import infer, readRules
 from actions import Actions, PutOutputsTable
-from inputgraph import InputGraph
 from escapeoutputstatements import unquoteOutputStatements
-
-from standardservice.logsetup import log, verboseLogging
-from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
-
+from inference import infer, readRules
+from inputgraph import InputGraph
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 DEV = Namespace("http://projects.bigasterisk.com/device/")
 
 NS = {'': ROOM, 'dev': DEV}
 
-STATS = scales.collection('/web',
-                          scales.PmfStat('graphChanged'),
-                          scales.PmfStat('updateRules'),
-)
+GRAPH_CHANGED_CALLS = Summary('graph_changed_calls', 'calls')
+UPDATE_RULES_CALLS = Summary('update_rules_calls', 'calls')
+
 
 def ntStatement(stmt: Tuple[Node, Node, Node]):
+
     def compact(u):
         if isinstance(u, URIRef) and u.startswith(ROOM):
             return 'room:' + u[len(ROOM):]
         return u.n3()
+
     return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2]))
 
+
 class Reasoning(object):
-    ruleStore: N3RuleStore
+    ruleStore: ConjunctiveGraph
+
     def __init__(self, mockOutput=False):
         self.prevGraph = None
 
         self.rulesN3 = "(not read yet)"
-        self.inferred = Graph() # gets replaced in each graphChanged call
-        self.outputGraph = PatchableGraph() # copy of inferred, for now
+        self.inferred = Graph()  # gets replaced in each graphChanged call
+        self.outputGraph = PatchableGraph()  # copy of inferred, for now
 
         self.inputGraph = InputGraph([], self.graphChanged)
         self.actions = Actions(self.inputGraph, sendToLiveClients, mockOutput=mockOutput)
         self.inputGraph.updateFileData()
 
-    @STATS.updateRules.time()
+    @UPDATE_RULES_CALLS.time()
     def updateRules(self):
         rulesPath = 'rules.n3'
         try:
             t1 = time.time()
             self.rulesN3, self.ruleStore = readRules(
-                rulesPath, outputPatterns=[
+                rulesPath,
+                outputPatterns=[
                     # Incomplete. See escapeoutputstatements.py for
                     # explanation.
                     (None, ROOM['brightness'], None),
@@ -90,22 +92,19 @@
             # this is so if you're just watching the inferred output,
             # you'll see the error too
             self.inferred = Graph()
-            self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'],
-                               Literal(traceback.format_exc())))
+            self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], Literal(traceback.format_exc())))
             self.copyOutput()
             raise
-        return [(ROOM['reasoner'], ROOM['ruleParseTime'],
-                 Literal(ruleParseTime))], ruleParseTime
+        return [(ROOM['reasoner'], ROOM['ruleParseTime'], Literal(ruleParseTime))], ruleParseTime
 
-    @STATS.graphChanged.time()
+    @GRAPH_CHANGED_CALLS.time()
     def graphChanged(self, inputGraph: InputGraph, oneShot=False, oneShotGraph: Graph = None):
         """
         If we're getting called for a oneShot event, the oneShotGraph
         statements are already in inputGraph.getGraph().
         """
         log.info("----------------------")
-        log.info("graphChanged (oneShot=%s %s stmts):",
-                 oneShot, len(oneShotGraph) if oneShotGraph is not None else 0)
+        log.info("graphChanged (oneShot=%s %s stmts):", oneShot, len(oneShotGraph) if oneShotGraph is not None else 0)
         if oneShotGraph:
             for stmt in oneShotGraph:
                 log.info(" oneshot -> %s", ntStatement(stmt))
@@ -135,17 +134,14 @@
             if oneShot:
                 self.inferred = oldInferred
         log.info("graphChanged took %.1f ms (rule parse %.1f ms, infer %.1f ms, putResults %.1f ms)" %
-                 ((time.time() - t1) * 1000,
-                  ruleParseSec * 1000,
-                  inferSec * 1000,
-                  putResultsTime * 1000))
+                 ((time.time() - t1) * 1000, ruleParseSec * 1000, inferSec * 1000, putResultsTime * 1000))
         if not oneShot:
             self.copyOutput()
 
     def copyOutput(self):
-        self.outputGraph.setToGraph((s,p,o,ROOM['inferred']) for s,p,o in self.inferred)
+        self.outputGraph.setToGraph((s, p, o, ROOM['inferred']) for s, p, o in self.inferred)
 
-    def _makeInferred(self, inputGraph: InputGraph):
+    def _makeInferred(self, inputGraph: ConjunctiveGraph):
         t1 = time.time()
 
         out = infer(inputGraph, self.ruleStore)
@@ -153,18 +149,19 @@
             out.bind(p, n, override=True)
 
         inferenceTime = time.time() - t1
-        out.add((ROOM['reasoner'], ROOM['inferenceTime'],
-                 Literal(inferenceTime)))
+        out.add((ROOM['reasoner'], ROOM['inferenceTime'], Literal(inferenceTime)))
         return out, inferenceTime
 
 
+class Index(cyclone.web.RequestHandler):
 
-class Index(cyclone.web.RequestHandler):
     def get(self):
         self.set_header("Content-Type", "text/html")
         self.write(open('index.html').read())
 
+
 class ImmediateUpdate(cyclone.web.RequestHandler):
+
     def put(self):
         """
         request an immediate load of the remote graphs; the thing we
@@ -176,12 +173,13 @@
         todo: this should do the right thing when many requests come
         in very quickly
         """
-        log.warn("immediateUpdate from %s %s - ignored",
-                 self.request.headers.get('User-Agent', '?'),
+        log.warn("immediateUpdate from %s %s - ignored", self.request.headers.get('User-Agent', '?'),
                  self.request.headers['Host'])
         self.set_status(202)
 
+
 class OneShot(cyclone.web.RequestHandler):
+
     def post(self):
         """
         payload is an rdf graph. The statements are momentarily added
@@ -195,69 +193,79 @@
         """
         try:
             log.info('POST to oneShot, headers=%s', self.request.headers)
-            ct = self.request.headers.get(
-                'Content-Type',
-                self.request.headers.get('content-type', ''))
-            dt = self.settings.reasoning.inputGraph.addOneShotFromString(
-                self.request.body, ct)
+            ct = self.request.headers.get('Content-Type', self.request.headers.get('content-type', ''))
+            dt = self.settings.reasoning.inputGraph.addOneShotFromString(self.request.body, ct)
             self.set_header('x-graph-ms', str(1000 * dt))
         except Exception as e:
             traceback.print_exc()
             log.error(e)
             raise
 
+
 # for reuse
 class GraphResource(cyclone.web.RequestHandler):
+
     def get(self, which: str):
         self.set_header("Content-Type", "application/json")
         r = self.settings.reasoning
-        g = {'lastInput': r.inputGraph.getGraph(),
-             'lastOutput': r.inferred,
-             }[which]
+        g = {
+            'lastInput': r.inputGraph.getGraph(),
+            'lastOutput': r.inferred,
+        }[which]
         self.write(self.jsonRdf(g))
 
     def jsonRdf(self, g):
         return json.dumps(sorted(list(g)))
 
+
 class NtGraphs(cyclone.web.RequestHandler):
     """same as what gets posted above"""
+
     def get(self):
         r = self.settings.reasoning
         inputGraphNt = r.inputGraph.getGraph().serialize(format="nt")
         inferredNt = r.inferred.serialize(format="nt")
         self.set_header("Content-Type", "application/json")
-        self.write(json.dumps({"input": inputGraphNt,
-                               "inferred": inferredNt}))
+        self.write(json.dumps({"input": inputGraphNt, "inferred": inferredNt}))
+
 
 class Rules(cyclone.web.RequestHandler):
+
     def get(self):
         self.set_header("Content-Type", "text/plain")
         self.write(self.settings.reasoning.rulesN3)
 
+
 class Status(cyclone.web.RequestHandler):
+
     def get(self):
         self.set_header("Content-Type", "text/plain")
         g = self.settings.reasoning.inputGraph.getGraph()
         msg = ""
         for badSource in g.subjects(RDF.type, ROOM['FailedGraphLoad']):
-            msg += "GET %s failed (%s). " % (
-                badSource, g.value(badSource, ROOM['graphLoadError']))
+            msg += "GET %s failed (%s). " % (badSource, g.value(badSource, ROOM['graphLoadError']))
         if not msg:
             self.finish("all inputs ok")
             return
         self.set_status(500)
         self.finish(msg)
 
+
 class Static(cyclone.web.RequestHandler):
+
     def get(self, p: str):
         self.write(open(p).read())
 
+
 liveClients: Set[cyclone.websocket.WebSocketHandler] = set()
-def sendToLiveClients(d: Dict[str, object]=None, asJson: Optional[str]=None):
+
+
+def sendToLiveClients(d: Dict[str, object] = None, asJson: Optional[str] = None):
     j = asJson or json.dumps(d)
     for c in liveClients:
         c.sendMessage(j)
 
+
 class Events(cyclone.websocket.WebSocketHandler):
 
     def connectionMade(self, *args, **kwargs):
@@ -271,7 +279,16 @@
     def messageReceived(self, message):
         log.info("got message %s" % message)
 
+
+class Metrics(cyclone.web.RequestHandler):
+
+    def get(self):
+        self.add_header('content-type', 'text/plain')
+        self.write(generate_latest(REGISTRY))
+
+
 class Application(cyclone.web.Application):
+
     def __init__(self, reasoning):
         handlers = [
             (r"/", Index),
@@ -280,38 +297,39 @@
             (r'/putOutputs', PutOutputsTable),
             (r'/(jquery.min.js)', Static),
             (r'/(lastInput|lastOutput)Graph', GraphResource),
-
-            (r"/graph/reasoning", CycloneGraphHandler,
-             {'masterGraph': reasoning.outputGraph}),
-            (r"/graph/reasoning/events", CycloneGraphEventsHandler,
-             {'masterGraph': reasoning.outputGraph}),
-
+            (r"/graph/reasoning", CycloneGraphHandler, {
+                'masterGraph': reasoning.outputGraph
+            }),
+            (r"/graph/reasoning/events", CycloneGraphEventsHandler, {
+                'masterGraph': reasoning.outputGraph
+            }),
             (r'/ntGraphs', NtGraphs),
             (r'/rules', Rules),
             (r'/status', Status),
             (r'/events', Events),
-            (r'/stats/(.*)', StatsHandler, {'serverName': 'reasoning'}),
+            (r'/metrics', Metrics),
         ]
         cyclone.web.Application.__init__(self, handlers, reasoning=reasoning)
 
+
 def configLogging(arg):
     log.setLevel(WARN)
 
     if arg['-i'] or arg['-r'] or arg['-o'] or arg['-v']:
-        log.handlers[0].setFormatter(ColoredFormatter(
-            "%(log_color)s%(levelname)-8s %(name)-6s %(filename)-12s:%(lineno)-3s %(funcName)-20s%(reset)s %(white)s%(message)s",
-            datefmt=None,
-            reset=True,
-            log_colors={
-                'DEBUG':    'cyan',
-                'INFO':     'green',
-                'WARNING':  'yellow',
-                'ERROR':    'red',
-                'CRITICAL': 'red,bg_white',
-            },
-            secondary_log_colors={},
-            style='%'
-        ))
+        log.handlers[0].setFormatter(
+            ColoredFormatter(
+                "%(log_color)s%(levelname)-8s %(name)-6s %(filename)-12s:%(lineno)-3s %(funcName)-20s%(reset)s %(white)s%(message)s",
+                datefmt=None,
+                reset=True,
+                log_colors={
+                    'DEBUG': 'cyan',
+                    'INFO': 'green',
+                    'WARNING': 'yellow',
+                    'ERROR': 'red',
+                    'CRITICAL': 'red,bg_white',
+                },
+                secondary_log_colors={},
+                style='%'))
         defer.setDebugging(True)
 
     if arg['-i']:
--- a/service/reasoning/requirements.txt	Sat Dec 26 17:00:08 2020 -0800
+++ b/service/reasoning/requirements.txt	Sun Dec 27 03:29:18 2020 -0800
@@ -1,19 +1,16 @@
 colorlog==2.6.0
-crochet==1.12.0
-
 docopt
+prometheus_client==0.9.0
+rdflib_jsonld==0.5.0
+rdflib==5.0.0
 rx==1.6.1
-service_identity
-treq==18.6.0
-twisted
-
-rdflib==4.2.2
-rdflib_jsonld==0.4.0
-git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
+service_identity==18.1.0
+six==1.15.0
+treq==20.9.0
+twisted==20.3.0
 https://github.com/drewp/cyclone/archive/python3.zip
 
 cycloneerr
-export_to_influxdb==0.4.0
-patchablegraph==0.11.0
+patchablegraph==0.9.0
 rdfdb==0.21.0
 standardservice==0.6.0
--- a/service/reasoning/serv.n3	Sat Dec 26 17:00:08 2020 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,23 +0,0 @@
-@prefix : <http://bigasterisk.com/ns/serv#> .
-@prefix auth: <http://bigasterisk.com/ns/serv/auth#> .
-@prefix serv: <http://bigasterisk.com/services/> .
-
-
-serv:reasoning a :Service;
-      :path "/reasoning/";
-      :openid auth:admin;
-      :serverHost "bang";
-      :internalPort 9071;
-      :prodDockerFlags (
-        "-p" "9071:9071"
-        "--net=host"
-      );
-      :localDockerFlags (
-        "-v" "`pwd`:/opt"
-      );
-      :localRunCmdline (
-        "python3" "reasoning.py" "-iro"
-      );
-      :dockerFile "Dockerfile"
-.
-
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/skaffold.yaml	Sun Dec 27 03:29:18 2020 -0800
@@ -0,0 +1,20 @@
+apiVersion: skaffold/v2beta5
+kind: Config
+metadata:
+  name: reasoning
+build:
+  local: 
+    useDockerCLI: true
+  tagPolicy:
+    dateTime:
+      format: "2006-01-02_15-04-05"
+      timezone: "Local"
+  artifacts:
+    - image: bang5:5000/reasoning_image
+      sync:
+        infer:
+          - rules.n3
+deploy:
+  kubectl:
+    manifests:
+    - deploy.yaml
--- a/service/reasoning/tasks.py	Sat Dec 26 17:00:08 2020 -0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,34 +0,0 @@
-from invoke import Collection, task
-import sys
-sys.path.append('/my/proj/release')
-from serv_tasks import serv_tasks
-
-ns = Collection()
-serv_tasks(ns, 'serv.n3', 'reasoning')
-
-@ns.add_task
-@task(pre=[ns['build']])
-def local_run_mock(ctx):
-    ctx.run(f'docker run --name reasoning_local_run_mock --rm -it -p 9071:9071 -v `pwd`:/opt --dns 10.2.0.1 --dns-search bigasterisk.com --net=host bang6:5000/reasoning:latest python3 reasoning.py -iro --mockoutput', pty=True)
-
-@ns.add_task
-@task(pre=[ns['build']])
-def pytype(ctx):
-    ctx.run(f'docker run '
-            f'--name reasoning_pytype '
-            f'--rm -it '
-            f'-v `pwd`:/opt '
-            f'--dns 10.2.0.1 '
-            f'--dns-search bigasterisk.com '
-            f'--net=host bang6:5000/reasoning:latest '
-            f'pytype --pythonpath /usr/local/lib/python3.6/dist-packages:. '
-            f'--jobs 4 '
-            f'actions.py '
-            f'escapeoutputstatements.py '
-            f'graphop.py '
-            f'httpputoutputs.py '
-            f'inference.py '
-            f'inputgraph.py '
-            f'private_ipv6_addresses.py '
-            f'rdflibtrig.py '
-            f'reasoning.py', pty=True)