Mercurial > code > home > repos > homeauto
changeset 795:c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
author | drewp@bigasterisk.com |
---|---|
date | Sun, 27 Dec 2020 03:29:18 -0800 |
parents | fafe86ae0b03 |
children | fc74ae6d5d68 |
files | service/reasoning/.flake8 service/reasoning/.style.yapf service/reasoning/Dockerfile service/reasoning/actions.py service/reasoning/deploy.yaml service/reasoning/inference.py service/reasoning/inputgraph.py service/reasoning/oneShot service/reasoning/reasoning.py service/reasoning/requirements.txt service/reasoning/serv.n3 service/reasoning/skaffold.yaml service/reasoning/tasks.py |
diffstat | 13 files changed, 310 insertions(+), 243 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/.flake8 Sun Dec 27 03:29:18 2020 -0800 @@ -0,0 +1,3 @@ +[flake8] +ignore=W504 +max-line-length=160 \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/.style.yapf Sun Dec 27 03:29:18 2020 -0800 @@ -0,0 +1,4 @@ +# overwritten by /home/drewp/bin/setup_home_venv +[style] +based_on_style = google +column_limit = 160
--- a/service/reasoning/Dockerfile Sat Dec 26 17:00:08 2020 -0800 +++ b/service/reasoning/Dockerfile Sun Dec 27 03:29:18 2020 -0800 @@ -1,4 +1,4 @@ -FROM bang6:5000/base_x86 +FROM bang5:5000/base_x86 WORKDIR /opt @@ -7,14 +7,9 @@ # not sure why this doesn't work from inside requirements.txt RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -U 'https://github.com/drewp/cyclone/archive/python3.zip?v2' -COPY FuXi/ FuXi -RUN pip3 install ./FuXi - -RUN pip3 install pytype - COPY *.n3 *.py *.html req* ./ COPY input ./input EXPOSE 9071 -CMD [ "python3", "./reasoning.py" ] +CMD [ "python3", "./reasoning.py","-irv" ]
--- a/service/reasoning/actions.py Sat Dec 26 17:00:08 2020 -0800 +++ b/service/reasoning/actions.py Sun Dec 27 03:29:18 2020 -0800 @@ -2,9 +2,10 @@ import logging import urllib -from rdflib import URIRef, Namespace, RDF, Literal +import cyclone.sse +import treq +from rdflib import RDF, Literal, Namespace, URIRef from twisted.internet import reactor -import treq from httpputoutputs import HttpPutOutputs from inputgraph import InputGraph @@ -15,20 +16,25 @@ DEV = Namespace("http://projects.bigasterisk.com/device/") REASONING = Namespace("http://projects.bigasterisk.com/ns/reasoning/") + def secsFromLiteral(v): if v[-1] != 's': raise NotImplementedError(v) return float(v[:-1]) + def ntStatement(stmt): + def compact(u): if isinstance(u, URIRef) and u.startswith(ROOM): return 'room:' + u[len(ROOM):] return u.n3() + return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2])) class Actions(object): + def __init__(self, inputGraph: InputGraph, sendToLiveClients, mockOutput=False): self.inputGraph = inputGraph self.mockOutput = mockOutput @@ -64,21 +70,18 @@ log.debug('inferred stmt we might PUT: %s', ntStatement(stmt)) putUrl = deviceGraph.value(stmt[0], ROOM['putUrl']) putPred = deviceGraph.value(stmt[0], ROOM['putPredicate']) - matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'], - default=putPred) + matchPred = deviceGraph.value(stmt[0], ROOM['matchPredicate'], default=putPred) if putUrl and matchPred == stmt[1]: - log.debug('putDevices: stmt %s leads to putting at %s', - ntStatement(stmt), putUrl.n3()) - self._put(putUrl + '?' + urllib.parse.urlencode([ - ('s', str(stmt[0])), - ('p', str(putPred))]), + log.debug('putDevices: stmt %s leads to putting at %s', ntStatement(stmt), putUrl.n3()) + self._put(putUrl + '?' + urllib.parse.urlencode([('s', str(stmt[0])), ('p', str(putPred))]), payload=str(stmt[2].toPython()), agent=agentFor.get(stmt[0], None), refreshSecs=self._getRefreshSecs(stmt[0])) - activated.add((stmt[0], - # didn't test that this should be - # stmt[1] and not putPred - stmt[1])) + activated.add(( + stmt[0], + # didn't test that this should be + # stmt[1] and not putPred + stmt[1])) return activated def _oneShotPostActions(self, deviceGraph, inferred): @@ -97,18 +100,20 @@ # nothing in this actually makes them one-shot yet. they'll # just fire as often as we get in here, which is not desirable log.debug("_oneShotPostActions") + def err(e): log.warn("post %s failed", postTarget) + for osp in deviceGraph.subjects(RDF.type, ROOM['OneShotPost']): s = deviceGraph.value(osp, ROOM['subject']) p = deviceGraph.value(osp, ROOM['predicate']) if s is None or p is None: continue - #log.info("checking for %s %s", s, p) + # log.info("checking for %s %s", s, p) for postTarget in inferred.objects(s, p): log.debug("post target %r", postTarget) # this packet ought to have 'oneShot' in it somewhere - self.sendToLiveClients({"s":s, "p":p, "o":postTarget}) + self.sendToLiveClients({"s": s, "p": p, "o": postTarget}) log.debug(" POST %s", postTarget) if not self.mockOutput: @@ -129,8 +134,7 @@ """ defaultStmts = set() - for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'], - REASONING['default']): + for defaultDesc in deviceGraph.objects(REASONING['defaultOutput'], REASONING['default']): s = deviceGraph.value(defaultDesc, ROOM['subject']) p = deviceGraph.value(defaultDesc, ROOM['predicate']) if (s, p) not in activated: @@ -143,15 +147,14 @@ def _getRefreshSecs(self, target): # should be able to map(secsFromLiteral) in here somehow and # remove the workaround in httpputoutputs.currentRefreshSecs - return self.inputGraph.rxValue(target, ROOM['refreshPutValue'], - default=Literal('30s'))#.map(secsFromLiteral) + return self.inputGraph.rxValue(target, ROOM['refreshPutValue'], default=Literal('30s')) # .map(secsFromLiteral) def _put(self, url, payload: str, refreshSecs, agent=None): self.putOutputs.put(url, payload, agent, refreshSecs) -import cyclone.sse class PutOutputsTable(cyclone.sse.SSEHandler): + def __init__(self, application, request): cyclone.sse.SSEHandler.__init__(self, application, request) self.actions = self.settings.reasoning.actions @@ -167,8 +170,7 @@ if not self.bound: return puts = { - 'puts': [row.report() for _, row in - sorted(self.actions.putOutputs.state.items())], + 'puts': [row.report() for _, row in sorted(self.actions.putOutputs.state.items())], } self.sendEvent(message=json.dumps(puts).encode('utf8'), event=b'update') reactor.callLater(1, self.loop)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/deploy.yaml Sun Dec 27 03:29:18 2020 -0800 @@ -0,0 +1,38 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: reasoning +spec: + replicas: 1 + selector: + matchLabels: + app: reasoning + template: + metadata: + labels: + app: reasoning + spec: + containers: + - name: reasoning + image: bang5:5000/reasoning_image + imagePullPolicy: "Always" + ports: + - containerPort: 9071 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "kubernetes.io/hostname" + operator: In + values: ["bang"] +--- +apiVersion: v1 +kind: Service +metadata: + name: reasoning +spec: + ports: + - {port: 9071, targetPort: 9071} + selector: + app: reasoning
--- a/service/reasoning/inference.py Sat Dec 26 17:00:08 2020 -0800 +++ b/service/reasoning/inference.py Sun Dec 27 03:29:18 2020 -0800 @@ -2,27 +2,25 @@ see ./reasoning for usage """ -import os, contextlib -try: - from rdflib.Graph import Graph -except ImportError: - from rdflib import Graph +import contextlib +import os -from rdflib import Namespace +from prometheus_client import Summary +from rdflib import Graph, Namespace +from rdflib.graph import ConjunctiveGraph from rdflib.parser import StringInputSource -from FuXi.Rete.Util import generateTokenSet -from FuXi.Rete import ReteNetwork -from FuXi.Rete.RuleStore import N3RuleStore +from escapeoutputstatements import escapeOutputStatements + +READ_RULES_CALLS = Summary('read_rules_calls', 'calls') -from greplin import scales -STATS = scales.collection('/web', - scales.PmfStat('readRules')) +ROOM = Namespace("http://projects.bigasterisk.com/room/") +LOG = Namespace('http://www.w3.org/2000/10/swap/log#') + -from escapeoutputstatements import escapeOutputStatements -ROOM = Namespace("http://projects.bigasterisk.com/room/") - -def _loadAndEscape(ruleStore, n3, outputPatterns): +def _loadAndEscape(ruleStore: ConjunctiveGraph, n3: bytes, outputPatterns): + ruleStore.parse(StringInputSource(n3), format='n3') + return ruleGraph = Graph(ruleStore) # Can't escapeOutputStatements in the ruleStore since it @@ -32,13 +30,16 @@ # of my rules. This serialize/parse version is very slow (400ms), # but it only runs when the file changes. plainGraph = Graph() - plainGraph.parse(StringInputSource(n3.encode('utf8')), format='n3') # for inference + plainGraph.parse(StringInputSource(n3.encode('utf8')), format='n3') # for inference escapeOutputStatements(plainGraph, outputPatterns=outputPatterns) expandedN3 = plainGraph.serialize(format='n3') ruleGraph.parse(StringInputSource(expandedN3), format='n3') + _rulesCache = (None, None, None, None) + + def readRules(rulesPath, outputPatterns): """ returns (rulesN3, ruleStore) @@ -49,27 +50,59 @@ """ global _rulesCache - with STATS.readRules.time(): + with READ_RULES_CALLS.time(): mtime = os.path.getmtime(rulesPath) key = (rulesPath, mtime) if _rulesCache[:2] == key: _, _, rulesN3, ruleStore = _rulesCache else: - rulesN3 = open(rulesPath).read() # for web display + rulesN3 = open(rulesPath, 'rb').read() # for web display - ruleStore = N3RuleStore() + ruleStore = ConjunctiveGraph() _loadAndEscape(ruleStore, rulesN3, outputPatterns) - log.debug('%s rules' % len(ruleStore.rules)) + log.debug('%s rules' % len(ruleStore)) _rulesCache = key + (rulesN3, ruleStore) return rulesN3, ruleStore -def infer(graph, rules): + +def infer(graph: ConjunctiveGraph, rules: ConjunctiveGraph): + """ + returns new graph of inferred statements. """ - returns new graph of inferred statements. Plain rete api seems to - alter rules.formulae and rules.rules, but this function does not - alter the incoming rules object, so you can cache it. - """ + log.info(f'Begin inference of graph len={len(graph)} with rules len={len(rules)}:') + + workingSet = ConjunctiveGraph() + workingSet.addN(graph.quads()) + + implied = ConjunctiveGraph() + + delta = 1 + while delta > 0: + delta = -len(implied) + + for r in rules: + if r[1] == LOG['implies']: + containsSetup = all(st in workingSet for st in r[0]) + if containsSetup: + log.info(f' Rule {r[0]} -> present={containsSetup}') + for st in r[0]: + log.info(f' {st[0].n3()} {st[1].n3()} {st[2].n3()}') + + log.info(f' ...implies {len(r[2])} statements') + if containsSetup: + for st in r[2]: + workingSet.add(st) + implied.add(st) + else: + log.info(f' {r}') + delta += len(implied) + log.info(f' this inference round added {delta} more implied stmts') + log.info(f'{len(implied)} stmts implied:') + for st in implied: + log.info(f' {st}') + return implied + # based on fuxi/tools/rdfpipe.py target = Graph() tokenSet = generateTokenSet(graph) @@ -79,6 +112,7 @@ return target + @contextlib.contextmanager def _dontChangeRulesStore(rules): if not hasattr(rules, '_stashOriginalRules'): @@ -89,15 +123,21 @@ del rules.formulae[k] rules.rules = rules._stashOriginalRules[:] -import time, logging + +import logging +import time + log = logging.getLogger() + + def logTime(func): + def inner(*args, **kw): t1 = time.time() try: ret = func(*args, **kw) finally: - log.info("Call to %s took %.1f ms" % ( - func.__name__, 1000 * (time.time() - t1))) + log.info("Call to %s took %.1f ms" % (func.__name__, 1000 * (time.time() - t1))) return ret + return inner
--- a/service/reasoning/inputgraph.py Sat Dec 26 17:00:08 2020 -0800 +++ b/service/reasoning/inputgraph.py Sun Dec 27 03:29:18 2020 -0800 @@ -1,39 +1,36 @@ -import logging, time +import logging +import time import weakref from typing import Callable -from greplin import scales -from rdflib import Graph, ConjunctiveGraph -from rdflib import Namespace, URIRef, RDFS +from patchablegraph.patchsource import ReconnectingPatchSource +from prometheus_client import Summary +from rdfdb.patch import Patch +from rdfdb.rdflibpatch import patchQuads +from rdflib import RDFS, ConjunctiveGraph, Graph, Namespace, URIRef from rdflib.parser import StringInputSource from rx.subjects import BehaviorSubject +from twisted.internet import reactor from twisted.python.filepath import FilePath -from twisted.internet import reactor - -from patchablegraph.patchsource import ReconnectingPatchSource -from rdfdb.rdflibpatch import patchQuads -from rdfdb.patch import Patch log = logging.getLogger('fetch') ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") - -STATS = scales.collection('/web', - scales.PmfStat('combineGraph'), -) +COMBINE_GRAPH_CALLS = Summary('combine_graph_calls', 'calls') def parseRdf(text: str, contentType: str): g = Graph() g.parse(StringInputSource(text), format={ 'text/n3': 'n3', - }[contentType]) + }[contentType]) return g class RemoteData(object): + def __init__(self, onChange: Callable[[], None]): """we won't fire onChange during init""" self.onChange = onChange @@ -42,17 +39,16 @@ def _finishInit(self): self.patchSource = ReconnectingPatchSource( - URIRef('http://bang:9072/graph/home'), - #URIRef('http://frontdoor:10012/graph/events'), - self.onPatch, reconnectSecs=10, agent='reasoning') + URIRef('http://collector.default.svc.cluster.local:9072/graph/home'), + # URIRef('http://frontdoor:10012/graph/events'), + self.onPatch, + reconnectSecs=10, + agent='reasoning') def onPatch(self, p: Patch, fullGraph: bool): if fullGraph: self.graph = ConjunctiveGraph() - patchQuads(self.graph, - deleteQuads=p.delQuads, - addQuads=p.addQuads, - perfect=True) + patchQuads(self.graph, deleteQuads=p.delQuads, addQuads=p.addQuads, perfect=True) ignorePredicates = [ ROOM['signalStrength'], @@ -73,10 +69,9 @@ ] ignoreContexts = [ URIRef('http://bigasterisk.com/sse_collector/'), - ] + ] for affected in p.addQuads + p.delQuads: - if (affected[1] not in ignorePredicates and - affected[3] not in ignoreContexts): + if (affected[1] not in ignorePredicates and affected[3] not in ignoreContexts): log.debug(" remote graph changed") self.onChange() break @@ -85,6 +80,7 @@ class InputGraph(object): + def __init__(self, inputDirs, onChange): """ this has one Graph that's made of: @@ -118,7 +114,7 @@ def _rxUpdate(self, subj, pred, default, rxv): rxv.on_next(self.getGraph().value(subj, pred, default=default)) - def rxValue(self, subj, pred, default):# -> BehaviorSubject: + def rxValue(self, subj, pred, default): # -> BehaviorSubject: value = BehaviorSubject(default) self._rxValues[value] = (subj, pred, default) self._rxUpdate(subj, pred, default, value) @@ -166,13 +162,13 @@ self.addOneShot(g) return time.time() - t1 - @STATS.combineGraph.time() - def getGraph(self): + @COMBINE_GRAPH_CALLS.time() + def getGraph(self) -> ConjunctiveGraph: """rdflib Graph with the file+remote contents of the input graph""" # this could be much faster with the combined readonly graph # view from rdflib if self._combinedGraph is None: - self._combinedGraph = Graph() + self._combinedGraph = ConjunctiveGraph() if self._fileGraph: for s in self._fileGraph: self._combinedGraph.add(s)
--- a/service/reasoning/oneShot Sat Dec 26 17:00:08 2020 -0800 +++ b/service/reasoning/oneShot Sun Dec 27 03:29:18 2020 -0800 @@ -1,18 +1,26 @@ -#!/usr/bin/python +#!/usr/bin/python3 """ send a statement to the reasoning server for one update cycle. Args are s/p/o in n3 notation, with many prefixes predefined here. """ -import sys, requests, time, os +import json +import os +import subprocess +import sys +import time + +import requests + s, p, o = sys.argv[1:] prefixes = { '': 'http://projects.bigasterisk.com/room/', - 'room' : 'http://projects.bigasterisk.com/room/', + 'room': 'http://projects.bigasterisk.com/room/', 'shuttle': 'http://bigasterisk.com/room/livingRoom/shuttlepro/', 'sensor': 'http://bigasterisk.com/homeauto/sensor/', } + def expand(term): if ':' not in term or term.startswith(('<', '"', "'")): return term @@ -21,13 +29,16 @@ return '<%s%s>' % (prefixes[left], right) return term + +pod = json.loads(subprocess.check_output(["kubectl", "get", "pod", "--selector=app=reasoning", "-o", "json"])) +ip = pod['items'][0]['status']['podIP'] + stmt = '%s %s %s .' % (expand(s), expand(p), expand(o)) -print "Sending: %s" % stmt +print("Sending: %s" % stmt) t1 = time.time() -ret = requests.post( - 'http://%s/oneShot' % os.environ.get('REASONING', 'bang:9071'), - headers={"content-type": "text/n3"}, - data=stmt.encode('ascii')) +ret = requests.post('http://%s/oneShot' % os.environ.get('REASONING', f'{ip}:9071'), + headers={"content-type": "text/n3"}, + data=stmt.encode('ascii')) g = float(ret.headers['x-graph-ms']) -print "%.1f ms for graph update; %.1f ms other overhead" % (g, 1000 * (time.time() - t1) - g) +print("%.1f ms for graph update; %.1f ms other overhead" % (g, 1000 * (time.time() - t1) - g))
--- a/service/reasoning/reasoning.py Sat Dec 26 17:00:08 2020 -0800 +++ b/service/reasoning/reasoning.py Sun Dec 27 03:29:18 2020 -0800 @@ -1,4 +1,3 @@ -#!bin/python """ Graph consists of: input/* (read at startup) @@ -14,70 +13,73 @@ When do we gather? The services should be able to trigger us, perhaps with PSHB, that their graph has changed. """ -from crochet import no_setup -no_setup() - -import json, time, traceback, sys -from logging import getLogger, DEBUG, WARN +import json +import sys +import time +import traceback +from logging import DEBUG, WARN, getLogger from typing import Dict, Optional, Set, Tuple +import cyclone.web +import cyclone.websocket from colorlog import ColoredFormatter from docopt import docopt -from rdflib import Namespace, Literal, RDF, Graph, URIRef +from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) +from prometheus_client import Counter, Gauge, Histogram, Summary +from prometheus_client.exposition import generate_latest +from prometheus_client.registry import REGISTRY +from rdflib import RDF, Graph, Literal, Namespace, URIRef +from rdflib.graph import ConjunctiveGraph from rdflib.term import Node -from twisted.internet import reactor, defer -import cyclone.web, cyclone.websocket -from FuXi.Rete.RuleStore import N3RuleStore - -from greplin import scales -from greplin.scales.cyclonehandler import StatsHandler +from standardservice.logsetup import log, verboseLogging +from twisted.internet import defer, reactor -from inference import infer, readRules from actions import Actions, PutOutputsTable -from inputgraph import InputGraph from escapeoutputstatements import unquoteOutputStatements - -from standardservice.logsetup import log, verboseLogging -from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler - +from inference import infer, readRules +from inputgraph import InputGraph ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") NS = {'': ROOM, 'dev': DEV} -STATS = scales.collection('/web', - scales.PmfStat('graphChanged'), - scales.PmfStat('updateRules'), -) +GRAPH_CHANGED_CALLS = Summary('graph_changed_calls', 'calls') +UPDATE_RULES_CALLS = Summary('update_rules_calls', 'calls') + def ntStatement(stmt: Tuple[Node, Node, Node]): + def compact(u): if isinstance(u, URIRef) and u.startswith(ROOM): return 'room:' + u[len(ROOM):] return u.n3() + return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2])) + class Reasoning(object): - ruleStore: N3RuleStore + ruleStore: ConjunctiveGraph + def __init__(self, mockOutput=False): self.prevGraph = None self.rulesN3 = "(not read yet)" - self.inferred = Graph() # gets replaced in each graphChanged call - self.outputGraph = PatchableGraph() # copy of inferred, for now + self.inferred = Graph() # gets replaced in each graphChanged call + self.outputGraph = PatchableGraph() # copy of inferred, for now self.inputGraph = InputGraph([], self.graphChanged) self.actions = Actions(self.inputGraph, sendToLiveClients, mockOutput=mockOutput) self.inputGraph.updateFileData() - @STATS.updateRules.time() + @UPDATE_RULES_CALLS.time() def updateRules(self): rulesPath = 'rules.n3' try: t1 = time.time() self.rulesN3, self.ruleStore = readRules( - rulesPath, outputPatterns=[ + rulesPath, + outputPatterns=[ # Incomplete. See escapeoutputstatements.py for # explanation. (None, ROOM['brightness'], None), @@ -90,22 +92,19 @@ # this is so if you're just watching the inferred output, # you'll see the error too self.inferred = Graph() - self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], - Literal(traceback.format_exc()))) + self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], Literal(traceback.format_exc()))) self.copyOutput() raise - return [(ROOM['reasoner'], ROOM['ruleParseTime'], - Literal(ruleParseTime))], ruleParseTime + return [(ROOM['reasoner'], ROOM['ruleParseTime'], Literal(ruleParseTime))], ruleParseTime - @STATS.graphChanged.time() + @GRAPH_CHANGED_CALLS.time() def graphChanged(self, inputGraph: InputGraph, oneShot=False, oneShotGraph: Graph = None): """ If we're getting called for a oneShot event, the oneShotGraph statements are already in inputGraph.getGraph(). """ log.info("----------------------") - log.info("graphChanged (oneShot=%s %s stmts):", - oneShot, len(oneShotGraph) if oneShotGraph is not None else 0) + log.info("graphChanged (oneShot=%s %s stmts):", oneShot, len(oneShotGraph) if oneShotGraph is not None else 0) if oneShotGraph: for stmt in oneShotGraph: log.info(" oneshot -> %s", ntStatement(stmt)) @@ -135,17 +134,14 @@ if oneShot: self.inferred = oldInferred log.info("graphChanged took %.1f ms (rule parse %.1f ms, infer %.1f ms, putResults %.1f ms)" % - ((time.time() - t1) * 1000, - ruleParseSec * 1000, - inferSec * 1000, - putResultsTime * 1000)) + ((time.time() - t1) * 1000, ruleParseSec * 1000, inferSec * 1000, putResultsTime * 1000)) if not oneShot: self.copyOutput() def copyOutput(self): - self.outputGraph.setToGraph((s,p,o,ROOM['inferred']) for s,p,o in self.inferred) + self.outputGraph.setToGraph((s, p, o, ROOM['inferred']) for s, p, o in self.inferred) - def _makeInferred(self, inputGraph: InputGraph): + def _makeInferred(self, inputGraph: ConjunctiveGraph): t1 = time.time() out = infer(inputGraph, self.ruleStore) @@ -153,18 +149,19 @@ out.bind(p, n, override=True) inferenceTime = time.time() - t1 - out.add((ROOM['reasoner'], ROOM['inferenceTime'], - Literal(inferenceTime))) + out.add((ROOM['reasoner'], ROOM['inferenceTime'], Literal(inferenceTime))) return out, inferenceTime +class Index(cyclone.web.RequestHandler): -class Index(cyclone.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/html") self.write(open('index.html').read()) + class ImmediateUpdate(cyclone.web.RequestHandler): + def put(self): """ request an immediate load of the remote graphs; the thing we @@ -176,12 +173,13 @@ todo: this should do the right thing when many requests come in very quickly """ - log.warn("immediateUpdate from %s %s - ignored", - self.request.headers.get('User-Agent', '?'), + log.warn("immediateUpdate from %s %s - ignored", self.request.headers.get('User-Agent', '?'), self.request.headers['Host']) self.set_status(202) + class OneShot(cyclone.web.RequestHandler): + def post(self): """ payload is an rdf graph. The statements are momentarily added @@ -195,69 +193,79 @@ """ try: log.info('POST to oneShot, headers=%s', self.request.headers) - ct = self.request.headers.get( - 'Content-Type', - self.request.headers.get('content-type', '')) - dt = self.settings.reasoning.inputGraph.addOneShotFromString( - self.request.body, ct) + ct = self.request.headers.get('Content-Type', self.request.headers.get('content-type', '')) + dt = self.settings.reasoning.inputGraph.addOneShotFromString(self.request.body, ct) self.set_header('x-graph-ms', str(1000 * dt)) except Exception as e: traceback.print_exc() log.error(e) raise + # for reuse class GraphResource(cyclone.web.RequestHandler): + def get(self, which: str): self.set_header("Content-Type", "application/json") r = self.settings.reasoning - g = {'lastInput': r.inputGraph.getGraph(), - 'lastOutput': r.inferred, - }[which] + g = { + 'lastInput': r.inputGraph.getGraph(), + 'lastOutput': r.inferred, + }[which] self.write(self.jsonRdf(g)) def jsonRdf(self, g): return json.dumps(sorted(list(g))) + class NtGraphs(cyclone.web.RequestHandler): """same as what gets posted above""" + def get(self): r = self.settings.reasoning inputGraphNt = r.inputGraph.getGraph().serialize(format="nt") inferredNt = r.inferred.serialize(format="nt") self.set_header("Content-Type", "application/json") - self.write(json.dumps({"input": inputGraphNt, - "inferred": inferredNt})) + self.write(json.dumps({"input": inputGraphNt, "inferred": inferredNt})) + class Rules(cyclone.web.RequestHandler): + def get(self): self.set_header("Content-Type", "text/plain") self.write(self.settings.reasoning.rulesN3) + class Status(cyclone.web.RequestHandler): + def get(self): self.set_header("Content-Type", "text/plain") g = self.settings.reasoning.inputGraph.getGraph() msg = "" for badSource in g.subjects(RDF.type, ROOM['FailedGraphLoad']): - msg += "GET %s failed (%s). " % ( - badSource, g.value(badSource, ROOM['graphLoadError'])) + msg += "GET %s failed (%s). " % (badSource, g.value(badSource, ROOM['graphLoadError'])) if not msg: self.finish("all inputs ok") return self.set_status(500) self.finish(msg) + class Static(cyclone.web.RequestHandler): + def get(self, p: str): self.write(open(p).read()) + liveClients: Set[cyclone.websocket.WebSocketHandler] = set() -def sendToLiveClients(d: Dict[str, object]=None, asJson: Optional[str]=None): + + +def sendToLiveClients(d: Dict[str, object] = None, asJson: Optional[str] = None): j = asJson or json.dumps(d) for c in liveClients: c.sendMessage(j) + class Events(cyclone.websocket.WebSocketHandler): def connectionMade(self, *args, **kwargs): @@ -271,7 +279,16 @@ def messageReceived(self, message): log.info("got message %s" % message) + +class Metrics(cyclone.web.RequestHandler): + + def get(self): + self.add_header('content-type', 'text/plain') + self.write(generate_latest(REGISTRY)) + + class Application(cyclone.web.Application): + def __init__(self, reasoning): handlers = [ (r"/", Index), @@ -280,38 +297,39 @@ (r'/putOutputs', PutOutputsTable), (r'/(jquery.min.js)', Static), (r'/(lastInput|lastOutput)Graph', GraphResource), - - (r"/graph/reasoning", CycloneGraphHandler, - {'masterGraph': reasoning.outputGraph}), - (r"/graph/reasoning/events", CycloneGraphEventsHandler, - {'masterGraph': reasoning.outputGraph}), - + (r"/graph/reasoning", CycloneGraphHandler, { + 'masterGraph': reasoning.outputGraph + }), + (r"/graph/reasoning/events", CycloneGraphEventsHandler, { + 'masterGraph': reasoning.outputGraph + }), (r'/ntGraphs', NtGraphs), (r'/rules', Rules), (r'/status', Status), (r'/events', Events), - (r'/stats/(.*)', StatsHandler, {'serverName': 'reasoning'}), + (r'/metrics', Metrics), ] cyclone.web.Application.__init__(self, handlers, reasoning=reasoning) + def configLogging(arg): log.setLevel(WARN) if arg['-i'] or arg['-r'] or arg['-o'] or arg['-v']: - log.handlers[0].setFormatter(ColoredFormatter( - "%(log_color)s%(levelname)-8s %(name)-6s %(filename)-12s:%(lineno)-3s %(funcName)-20s%(reset)s %(white)s%(message)s", - datefmt=None, - reset=True, - log_colors={ - 'DEBUG': 'cyan', - 'INFO': 'green', - 'WARNING': 'yellow', - 'ERROR': 'red', - 'CRITICAL': 'red,bg_white', - }, - secondary_log_colors={}, - style='%' - )) + log.handlers[0].setFormatter( + ColoredFormatter( + "%(log_color)s%(levelname)-8s %(name)-6s %(filename)-12s:%(lineno)-3s %(funcName)-20s%(reset)s %(white)s%(message)s", + datefmt=None, + reset=True, + log_colors={ + 'DEBUG': 'cyan', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'red,bg_white', + }, + secondary_log_colors={}, + style='%')) defer.setDebugging(True) if arg['-i']:
--- a/service/reasoning/requirements.txt Sat Dec 26 17:00:08 2020 -0800 +++ b/service/reasoning/requirements.txt Sun Dec 27 03:29:18 2020 -0800 @@ -1,19 +1,16 @@ colorlog==2.6.0 -crochet==1.12.0 - docopt +prometheus_client==0.9.0 +rdflib_jsonld==0.5.0 +rdflib==5.0.0 rx==1.6.1 -service_identity -treq==18.6.0 -twisted - -rdflib==4.2.2 -rdflib_jsonld==0.4.0 -git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales +service_identity==18.1.0 +six==1.15.0 +treq==20.9.0 +twisted==20.3.0 https://github.com/drewp/cyclone/archive/python3.zip cycloneerr -export_to_influxdb==0.4.0 -patchablegraph==0.11.0 +patchablegraph==0.9.0 rdfdb==0.21.0 standardservice==0.6.0
--- a/service/reasoning/serv.n3 Sat Dec 26 17:00:08 2020 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,23 +0,0 @@ -@prefix : <http://bigasterisk.com/ns/serv#> . -@prefix auth: <http://bigasterisk.com/ns/serv/auth#> . -@prefix serv: <http://bigasterisk.com/services/> . - - -serv:reasoning a :Service; - :path "/reasoning/"; - :openid auth:admin; - :serverHost "bang"; - :internalPort 9071; - :prodDockerFlags ( - "-p" "9071:9071" - "--net=host" - ); - :localDockerFlags ( - "-v" "`pwd`:/opt" - ); - :localRunCmdline ( - "python3" "reasoning.py" "-iro" - ); - :dockerFile "Dockerfile" -. -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/skaffold.yaml Sun Dec 27 03:29:18 2020 -0800 @@ -0,0 +1,20 @@ +apiVersion: skaffold/v2beta5 +kind: Config +metadata: + name: reasoning +build: + local: + useDockerCLI: true + tagPolicy: + dateTime: + format: "2006-01-02_15-04-05" + timezone: "Local" + artifacts: + - image: bang5:5000/reasoning_image + sync: + infer: + - rules.n3 +deploy: + kubectl: + manifests: + - deploy.yaml
--- a/service/reasoning/tasks.py Sat Dec 26 17:00:08 2020 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,34 +0,0 @@ -from invoke import Collection, task -import sys -sys.path.append('/my/proj/release') -from serv_tasks import serv_tasks - -ns = Collection() -serv_tasks(ns, 'serv.n3', 'reasoning') - -@ns.add_task -@task(pre=[ns['build']]) -def local_run_mock(ctx): - ctx.run(f'docker run --name reasoning_local_run_mock --rm -it -p 9071:9071 -v `pwd`:/opt --dns 10.2.0.1 --dns-search bigasterisk.com --net=host bang6:5000/reasoning:latest python3 reasoning.py -iro --mockoutput', pty=True) - -@ns.add_task -@task(pre=[ns['build']]) -def pytype(ctx): - ctx.run(f'docker run ' - f'--name reasoning_pytype ' - f'--rm -it ' - f'-v `pwd`:/opt ' - f'--dns 10.2.0.1 ' - f'--dns-search bigasterisk.com ' - f'--net=host bang6:5000/reasoning:latest ' - f'pytype --pythonpath /usr/local/lib/python3.6/dist-packages:. ' - f'--jobs 4 ' - f'actions.py ' - f'escapeoutputstatements.py ' - f'graphop.py ' - f'httpputoutputs.py ' - f'inference.py ' - f'inputgraph.py ' - f'private_ipv6_addresses.py ' - f'rdflibtrig.py ' - f'reasoning.py', pty=True)