Mercurial > code > home > repos > homeauto
diff service/reasoning/reasoning.py @ 795:c8562ace4917
big updates for k8s, py3, drop FuXi, use prometheus for metrics.
author | drewp@bigasterisk.com |
---|---|
date | Sun, 27 Dec 2020 03:29:18 -0800 |
parents | 3c18b4b3b72c |
children |
line wrap: on
line diff
--- a/service/reasoning/reasoning.py Sat Dec 26 17:00:08 2020 -0800 +++ b/service/reasoning/reasoning.py Sun Dec 27 03:29:18 2020 -0800 @@ -1,4 +1,3 @@ -#!bin/python """ Graph consists of: input/* (read at startup) @@ -14,70 +13,73 @@ When do we gather? The services should be able to trigger us, perhaps with PSHB, that their graph has changed. """ -from crochet import no_setup -no_setup() - -import json, time, traceback, sys -from logging import getLogger, DEBUG, WARN +import json +import sys +import time +import traceback +from logging import DEBUG, WARN, getLogger from typing import Dict, Optional, Set, Tuple +import cyclone.web +import cyclone.websocket from colorlog import ColoredFormatter from docopt import docopt -from rdflib import Namespace, Literal, RDF, Graph, URIRef +from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) +from prometheus_client import Counter, Gauge, Histogram, Summary +from prometheus_client.exposition import generate_latest +from prometheus_client.registry import REGISTRY +from rdflib import RDF, Graph, Literal, Namespace, URIRef +from rdflib.graph import ConjunctiveGraph from rdflib.term import Node -from twisted.internet import reactor, defer -import cyclone.web, cyclone.websocket -from FuXi.Rete.RuleStore import N3RuleStore - -from greplin import scales -from greplin.scales.cyclonehandler import StatsHandler +from standardservice.logsetup import log, verboseLogging +from twisted.internet import defer, reactor -from inference import infer, readRules from actions import Actions, PutOutputsTable -from inputgraph import InputGraph from escapeoutputstatements import unquoteOutputStatements - -from standardservice.logsetup import log, verboseLogging -from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler - +from inference import infer, readRules +from inputgraph import InputGraph ROOM = Namespace("http://projects.bigasterisk.com/room/") DEV = Namespace("http://projects.bigasterisk.com/device/") NS = {'': ROOM, 'dev': DEV} -STATS = scales.collection('/web', - scales.PmfStat('graphChanged'), - scales.PmfStat('updateRules'), -) +GRAPH_CHANGED_CALLS = Summary('graph_changed_calls', 'calls') +UPDATE_RULES_CALLS = Summary('update_rules_calls', 'calls') + def ntStatement(stmt: Tuple[Node, Node, Node]): + def compact(u): if isinstance(u, URIRef) and u.startswith(ROOM): return 'room:' + u[len(ROOM):] return u.n3() + return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2])) + class Reasoning(object): - ruleStore: N3RuleStore + ruleStore: ConjunctiveGraph + def __init__(self, mockOutput=False): self.prevGraph = None self.rulesN3 = "(not read yet)" - self.inferred = Graph() # gets replaced in each graphChanged call - self.outputGraph = PatchableGraph() # copy of inferred, for now + self.inferred = Graph() # gets replaced in each graphChanged call + self.outputGraph = PatchableGraph() # copy of inferred, for now self.inputGraph = InputGraph([], self.graphChanged) self.actions = Actions(self.inputGraph, sendToLiveClients, mockOutput=mockOutput) self.inputGraph.updateFileData() - @STATS.updateRules.time() + @UPDATE_RULES_CALLS.time() def updateRules(self): rulesPath = 'rules.n3' try: t1 = time.time() self.rulesN3, self.ruleStore = readRules( - rulesPath, outputPatterns=[ + rulesPath, + outputPatterns=[ # Incomplete. See escapeoutputstatements.py for # explanation. (None, ROOM['brightness'], None), @@ -90,22 +92,19 @@ # this is so if you're just watching the inferred output, # you'll see the error too self.inferred = Graph() - self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], - Literal(traceback.format_exc()))) + self.inferred.add((ROOM['reasoner'], ROOM['ruleParseError'], Literal(traceback.format_exc()))) self.copyOutput() raise - return [(ROOM['reasoner'], ROOM['ruleParseTime'], - Literal(ruleParseTime))], ruleParseTime + return [(ROOM['reasoner'], ROOM['ruleParseTime'], Literal(ruleParseTime))], ruleParseTime - @STATS.graphChanged.time() + @GRAPH_CHANGED_CALLS.time() def graphChanged(self, inputGraph: InputGraph, oneShot=False, oneShotGraph: Graph = None): """ If we're getting called for a oneShot event, the oneShotGraph statements are already in inputGraph.getGraph(). """ log.info("----------------------") - log.info("graphChanged (oneShot=%s %s stmts):", - oneShot, len(oneShotGraph) if oneShotGraph is not None else 0) + log.info("graphChanged (oneShot=%s %s stmts):", oneShot, len(oneShotGraph) if oneShotGraph is not None else 0) if oneShotGraph: for stmt in oneShotGraph: log.info(" oneshot -> %s", ntStatement(stmt)) @@ -135,17 +134,14 @@ if oneShot: self.inferred = oldInferred log.info("graphChanged took %.1f ms (rule parse %.1f ms, infer %.1f ms, putResults %.1f ms)" % - ((time.time() - t1) * 1000, - ruleParseSec * 1000, - inferSec * 1000, - putResultsTime * 1000)) + ((time.time() - t1) * 1000, ruleParseSec * 1000, inferSec * 1000, putResultsTime * 1000)) if not oneShot: self.copyOutput() def copyOutput(self): - self.outputGraph.setToGraph((s,p,o,ROOM['inferred']) for s,p,o in self.inferred) + self.outputGraph.setToGraph((s, p, o, ROOM['inferred']) for s, p, o in self.inferred) - def _makeInferred(self, inputGraph: InputGraph): + def _makeInferred(self, inputGraph: ConjunctiveGraph): t1 = time.time() out = infer(inputGraph, self.ruleStore) @@ -153,18 +149,19 @@ out.bind(p, n, override=True) inferenceTime = time.time() - t1 - out.add((ROOM['reasoner'], ROOM['inferenceTime'], - Literal(inferenceTime))) + out.add((ROOM['reasoner'], ROOM['inferenceTime'], Literal(inferenceTime))) return out, inferenceTime +class Index(cyclone.web.RequestHandler): -class Index(cyclone.web.RequestHandler): def get(self): self.set_header("Content-Type", "text/html") self.write(open('index.html').read()) + class ImmediateUpdate(cyclone.web.RequestHandler): + def put(self): """ request an immediate load of the remote graphs; the thing we @@ -176,12 +173,13 @@ todo: this should do the right thing when many requests come in very quickly """ - log.warn("immediateUpdate from %s %s - ignored", - self.request.headers.get('User-Agent', '?'), + log.warn("immediateUpdate from %s %s - ignored", self.request.headers.get('User-Agent', '?'), self.request.headers['Host']) self.set_status(202) + class OneShot(cyclone.web.RequestHandler): + def post(self): """ payload is an rdf graph. The statements are momentarily added @@ -195,69 +193,79 @@ """ try: log.info('POST to oneShot, headers=%s', self.request.headers) - ct = self.request.headers.get( - 'Content-Type', - self.request.headers.get('content-type', '')) - dt = self.settings.reasoning.inputGraph.addOneShotFromString( - self.request.body, ct) + ct = self.request.headers.get('Content-Type', self.request.headers.get('content-type', '')) + dt = self.settings.reasoning.inputGraph.addOneShotFromString(self.request.body, ct) self.set_header('x-graph-ms', str(1000 * dt)) except Exception as e: traceback.print_exc() log.error(e) raise + # for reuse class GraphResource(cyclone.web.RequestHandler): + def get(self, which: str): self.set_header("Content-Type", "application/json") r = self.settings.reasoning - g = {'lastInput': r.inputGraph.getGraph(), - 'lastOutput': r.inferred, - }[which] + g = { + 'lastInput': r.inputGraph.getGraph(), + 'lastOutput': r.inferred, + }[which] self.write(self.jsonRdf(g)) def jsonRdf(self, g): return json.dumps(sorted(list(g))) + class NtGraphs(cyclone.web.RequestHandler): """same as what gets posted above""" + def get(self): r = self.settings.reasoning inputGraphNt = r.inputGraph.getGraph().serialize(format="nt") inferredNt = r.inferred.serialize(format="nt") self.set_header("Content-Type", "application/json") - self.write(json.dumps({"input": inputGraphNt, - "inferred": inferredNt})) + self.write(json.dumps({"input": inputGraphNt, "inferred": inferredNt})) + class Rules(cyclone.web.RequestHandler): + def get(self): self.set_header("Content-Type", "text/plain") self.write(self.settings.reasoning.rulesN3) + class Status(cyclone.web.RequestHandler): + def get(self): self.set_header("Content-Type", "text/plain") g = self.settings.reasoning.inputGraph.getGraph() msg = "" for badSource in g.subjects(RDF.type, ROOM['FailedGraphLoad']): - msg += "GET %s failed (%s). " % ( - badSource, g.value(badSource, ROOM['graphLoadError'])) + msg += "GET %s failed (%s). " % (badSource, g.value(badSource, ROOM['graphLoadError'])) if not msg: self.finish("all inputs ok") return self.set_status(500) self.finish(msg) + class Static(cyclone.web.RequestHandler): + def get(self, p: str): self.write(open(p).read()) + liveClients: Set[cyclone.websocket.WebSocketHandler] = set() -def sendToLiveClients(d: Dict[str, object]=None, asJson: Optional[str]=None): + + +def sendToLiveClients(d: Dict[str, object] = None, asJson: Optional[str] = None): j = asJson or json.dumps(d) for c in liveClients: c.sendMessage(j) + class Events(cyclone.websocket.WebSocketHandler): def connectionMade(self, *args, **kwargs): @@ -271,7 +279,16 @@ def messageReceived(self, message): log.info("got message %s" % message) + +class Metrics(cyclone.web.RequestHandler): + + def get(self): + self.add_header('content-type', 'text/plain') + self.write(generate_latest(REGISTRY)) + + class Application(cyclone.web.Application): + def __init__(self, reasoning): handlers = [ (r"/", Index), @@ -280,38 +297,39 @@ (r'/putOutputs', PutOutputsTable), (r'/(jquery.min.js)', Static), (r'/(lastInput|lastOutput)Graph', GraphResource), - - (r"/graph/reasoning", CycloneGraphHandler, - {'masterGraph': reasoning.outputGraph}), - (r"/graph/reasoning/events", CycloneGraphEventsHandler, - {'masterGraph': reasoning.outputGraph}), - + (r"/graph/reasoning", CycloneGraphHandler, { + 'masterGraph': reasoning.outputGraph + }), + (r"/graph/reasoning/events", CycloneGraphEventsHandler, { + 'masterGraph': reasoning.outputGraph + }), (r'/ntGraphs', NtGraphs), (r'/rules', Rules), (r'/status', Status), (r'/events', Events), - (r'/stats/(.*)', StatsHandler, {'serverName': 'reasoning'}), + (r'/metrics', Metrics), ] cyclone.web.Application.__init__(self, handlers, reasoning=reasoning) + def configLogging(arg): log.setLevel(WARN) if arg['-i'] or arg['-r'] or arg['-o'] or arg['-v']: - log.handlers[0].setFormatter(ColoredFormatter( - "%(log_color)s%(levelname)-8s %(name)-6s %(filename)-12s:%(lineno)-3s %(funcName)-20s%(reset)s %(white)s%(message)s", - datefmt=None, - reset=True, - log_colors={ - 'DEBUG': 'cyan', - 'INFO': 'green', - 'WARNING': 'yellow', - 'ERROR': 'red', - 'CRITICAL': 'red,bg_white', - }, - secondary_log_colors={}, - style='%' - )) + log.handlers[0].setFormatter( + ColoredFormatter( + "%(log_color)s%(levelname)-8s %(name)-6s %(filename)-12s:%(lineno)-3s %(funcName)-20s%(reset)s %(white)s%(message)s", + datefmt=None, + reset=True, + log_colors={ + 'DEBUG': 'cyan', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'red,bg_white', + }, + secondary_log_colors={}, + style='%')) defer.setDebugging(True) if arg['-i']: