Mercurial > code > home > repos > homeauto
changeset 756:f3f667769aef
python 3! and some types and cleanups
Ignore-this: 3453a547ee745fa83668f36956c835cd
author | drewp@bigasterisk.com |
---|---|
date | Fri, 14 Feb 2020 00:07:23 -0800 |
parents | ffcad6bf9c57 |
children | cee91fc85b03 |
files | service/reasoning/Dockerfile service/reasoning/actions.py service/reasoning/graphop.py service/reasoning/httpputoutputs.py service/reasoning/inference.py service/reasoning/inputgraph.py service/reasoning/reasoning.py service/reasoning/serv.n3 service/reasoning/tasks.py |
diffstat | 9 files changed, 107 insertions(+), 81 deletions(-) [+] |
line wrap: on
line diff
--- a/service/reasoning/Dockerfile Thu Feb 13 23:00:06 2020 -0800 +++ b/service/reasoning/Dockerfile Fri Feb 14 00:07:23 2020 -0800 @@ -3,14 +3,16 @@ WORKDIR /opt COPY requirements.txt ./ -RUN pip install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt +RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt COPY FuXi/ FuXi -RUN pip install ./FuXi +RUN pip3 install ./FuXi + +RUN pip3 install pytype COPY *.n3 *.py *.html req* ./ COPY input ./input EXPOSE 9071 -CMD [ "python", "./reasoning.py" ] +CMD [ "python3", "./reasoning.py" ]
--- a/service/reasoning/actions.py Thu Feb 13 23:00:06 2020 -0800 +++ b/service/reasoning/actions.py Fri Feb 14 00:07:23 2020 -0800 @@ -7,6 +7,7 @@ import treq from httpputoutputs import HttpPutOutputs +from inputgraph import InputGraph log = logging.getLogger('output') @@ -28,7 +29,7 @@ class Actions(object): - def __init__(self, inputGraph, sendToLiveClients, mockOutput=False): + def __init__(self, inputGraph: InputGraph, sendToLiveClients, mockOutput=False): self.inputGraph = inputGraph self.mockOutput = mockOutput self.putOutputs = HttpPutOutputs(mockOutput=mockOutput) @@ -68,7 +69,7 @@ if putUrl and matchPred == stmt[1]: log.debug('putDevices: stmt %s leads to putting at %s', ntStatement(stmt), putUrl.n3()) - self._put(putUrl + '?' + urllib.urlencode([ + self._put(putUrl + '?' + urllib.parse.urlencode([ ('s', str(stmt[0])), ('p', str(putPred))]), str(stmt[2].toPython()), @@ -146,6 +147,8 @@ default=Literal('30s'))#.map(secsFromLiteral) def _put(self, url, payload, refreshSecs, agent=None): + if isinstance(payload, str): + payload = payload.encode('utf8') assert isinstance(payload, bytes) self.putOutputs.put(url, payload, agent, refreshSecs)
--- a/service/reasoning/graphop.py Thu Feb 13 23:00:06 2020 -0800 +++ b/service/reasoning/graphop.py Fri Feb 14 00:07:23 2020 -0800 @@ -1,8 +1,10 @@ import logging from rdflib import URIRef, ConjunctiveGraph +from typing import List log = logging.getLogger() -def graphWithoutMetadata(g, ignorePredicates=[]): + +def graphWithoutMetadata(g: ConjunctiveGraph, ignorePredicates=[]): """ graph filter that removes any statements whose subjects are contexts in the graph and also any statements with the given @@ -17,7 +19,9 @@ out.addN([stmt]) return out -def graphEqual(a, b, ignorePredicates=[]): + +def graphEqual(a: ConjunctiveGraph, b: ConjunctiveGraph, + ignorePredicates: List[URIRef]=[]): """ compare graphs, omitting any metadata statements about contexts (especially modification times) and also any statements using the @@ -27,7 +31,7 @@ stmtsB = set(graphWithoutMetadata(b, ignorePredicates)) if stmtsA == stmtsB: return True - + if log.getEffectiveLevel() <= logging.INFO: lost = stmtsA - stmtsB if lost:
--- a/service/reasoning/httpputoutputs.py Thu Feb 13 23:00:06 2020 -0800 +++ b/service/reasoning/httpputoutputs.py Fri Feb 14 00:07:23 2020 -0800 @@ -1,24 +1,31 @@ import logging import time +from rdflib import URIRef from rx.subjects import BehaviorSubject from twisted.internet import reactor +from twisted.python.failure import Failure +from twisted.internet.interfaces import IDelayedCall import treq +from typing import Optional log = logging.getLogger('httpputoutputs') + class HttpPutOutput(object): - def __init__(self, url, - refreshSecs,#: BehaviorSubject, + lastChangeTime: float + + def __init__(self, url: str, + refreshSecs: BehaviorSubject, mockOutput=False): self.url = url self.mockOutput = mockOutput - self.payload = None - self.foafAgent = None - self.nextCall = None - self.lastErr = None - self.numRequests = 0 - self.refreshSecs = refreshSecs + self.payload: Optional[str] = None + self.foafAgent: Optional[URIRef] = None + self.nextCall: IDelayedCall = None + self.lastErr: Optional[Failure] = None + self.numRequests: int = 0 + self.refreshSecs: float = refreshSecs def report(self): return { @@ -33,7 +40,7 @@ 'lastErr': str(self.lastErr) if self.lastErr is not None else None, } - def setPayload(self, payload, foafAgent): + def setPayload(self, payload: str, foafAgent: URIRef): if self.numRequests > 0 and (self.payload == payload and self.foafAgent == foafAgent): return @@ -101,6 +108,7 @@ self.nextCall = reactor.callLater(self.currentRefreshSecs(), self.makeRequest) + class HttpPutOutputs(object): """these grow forever""" def __init__(self, mockOutput=False): @@ -108,6 +116,7 @@ self.state = {} # url: HttpPutOutput def put(self, url, payload, foafAgent, refreshSecs): + assert isinstance(url, str) if url not in self.state: self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput, refreshSecs=refreshSecs)
--- a/service/reasoning/inference.py Thu Feb 13 23:00:06 2020 -0800 +++ b/service/reasoning/inference.py Fri Feb 14 00:07:23 2020 -0800 @@ -7,7 +7,7 @@ from rdflib.Graph import Graph except ImportError: from rdflib import Graph - + from rdflib import Namespace from rdflib.parser import StringInputSource @@ -15,7 +15,7 @@ from FuXi.Rete import ReteNetwork from FuXi.Rete.RuleStore import N3RuleStore -from greplin import scales +from greplin import scales STATS = scales.collection('/web', scales.PmfStat('readRules')) @@ -32,7 +32,7 @@ # of my rules. This serialize/parse version is very slow (400ms), # but it only runs when the file changes. plainGraph = Graph() - plainGraph.parse(StringInputSource(n3), format='n3') # for inference + plainGraph.parse(StringInputSource(n3.encode('utf8')), format='n3') # for inference escapeOutputStatements(plainGraph, outputPatterns=outputPatterns) expandedN3 = plainGraph.serialize(format='n3') @@ -60,7 +60,7 @@ ruleStore = N3RuleStore() _loadAndEscape(ruleStore, rulesN3, outputPatterns) log.debug('%s rules' % len(ruleStore.rules)) - + _rulesCache = key + (rulesN3, ruleStore) return rulesN3, ruleStore @@ -76,7 +76,7 @@ with _dontChangeRulesStore(rules): network = ReteNetwork(rules, inferredTarget=target) network.feedFactsToAdd(tokenSet) - + return target @contextlib.contextmanager @@ -84,11 +84,11 @@ if not hasattr(rules, '_stashOriginalRules'): rules._stashOriginalRules = rules.rules[:] yield - for k in rules.formulae.keys(): + for k in list(rules.formulae.keys()): if not k.startswith('_:Formula'): del rules.formulae[k] rules.rules = rules._stashOriginalRules[:] - + import time, logging log = logging.getLogger() def logTime(func):
--- a/service/reasoning/inputgraph.py Thu Feb 13 23:00:06 2020 -0800 +++ b/service/reasoning/inputgraph.py Fri Feb 14 00:07:23 2020 -0800 @@ -1,5 +1,6 @@ import logging, time import weakref +from typing import Callable from greplin import scales from rdflib import Graph, ConjunctiveGraph @@ -11,6 +12,7 @@ from patchablegraph.patchsource import ReconnectingPatchSource from rdfdb.rdflibpatch import patchQuads +from rdfdb.patch import Patch log = logging.getLogger('fetch') @@ -21,7 +23,9 @@ STATS = scales.collection('/web', scales.PmfStat('combineGraph'), ) -def parseRdf(text, contentType): + + +def parseRdf(text: str, contentType: str): g = Graph() g.parse(StringInputSource(text), format={ 'text/n3': 'n3', @@ -30,7 +34,7 @@ class RemoteData(object): - def __init__(self, onChange): + def __init__(self, onChange: Callable[[], None]): """we won't fire onChange during init""" self.onChange = onChange self.graph = ConjunctiveGraph() @@ -42,7 +46,7 @@ #URIRef('http://frontdoor:10012/graph/events'), self.onPatch, reconnectSecs=10, agent='reasoning') - def onPatch(self, p, fullGraph): + def onPatch(self, p: Patch, fullGraph: bool): if fullGraph: self.graph = ConjunctiveGraph() patchQuads(self.graph, @@ -79,6 +83,7 @@ else: log.debug(" remote graph has no changes to trigger rules") + class InputGraph(object): def __init__(self, inputDirs, onChange): """
--- a/service/reasoning/reasoning.py Thu Feb 13 23:00:06 2020 -0800 +++ b/service/reasoning/reasoning.py Fri Feb 14 00:07:23 2020 -0800 @@ -19,12 +19,15 @@ import json, time, traceback, sys from logging import getLogger, DEBUG, WARN +from typing import Dict, Optional, Set, Tuple from colorlog import ColoredFormatter from docopt import docopt from rdflib import Namespace, Literal, RDF, Graph, URIRef +from rdflib.term import Node from twisted.internet import reactor, defer import cyclone.web, cyclone.websocket +from FuXi.Rete.RuleStore import N3RuleStore from greplin import scales from greplin.scales.cyclonehandler import StatsHandler @@ -48,7 +51,7 @@ scales.PmfStat('updateRules'), ) -def ntStatement(stmt): +def ntStatement(stmt: Tuple[Node, Node, Node]): def compact(u): if isinstance(u, URIRef) and u.startswith(ROOM): return 'room:' + u[len(ROOM):] @@ -56,6 +59,7 @@ return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2])) class Reasoning(object): + ruleStore: N3RuleStore def __init__(self, mockOutput=False): self.prevGraph = None @@ -94,7 +98,7 @@ Literal(ruleParseTime))], ruleParseTime @STATS.graphChanged.time() - def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None): + def graphChanged(self, inputGraph: InputGraph, oneShot=False, oneShotGraph: Graph = None): """ If we're getting called for a oneShot event, the oneShotGraph statements are already in inputGraph.getGraph(). @@ -141,11 +145,11 @@ def copyOutput(self): self.outputGraph.setToGraph((s,p,o,ROOM['inferred']) for s,p,o in self.inferred) - def _makeInferred(self, inputGraph): + def _makeInferred(self, inputGraph: InputGraph): t1 = time.time() out = infer(inputGraph, self.ruleStore) - for p, n in NS.iteritems(): + for p, n in NS.items(): out.bind(p, n, override=True) inferenceTime = time.time() - t1 @@ -201,7 +205,7 @@ # for reuse class GraphResource(cyclone.web.RequestHandler): - def get(self, which): + def get(self, which: str): self.set_header("Content-Type", "application/json") r = self.settings.reasoning g = {'lastInput': r.inputGraph.getGraph(), @@ -242,11 +246,11 @@ self.finish(msg) class Static(cyclone.web.RequestHandler): - def get(self, p): + def get(self, p: str): self.write(open(p).read()) -liveClients = set() -def sendToLiveClients(d=None, asJson=None): +liveClients: Set[cyclone.websocket.WebSocketHandler] = set() +def sendToLiveClients(d: Dict[str, object]=None, asJson: Optional[str]=None): j = asJson or json.dumps(d) for c in liveClients: c.sendMessage(j) @@ -274,8 +278,10 @@ (r'/(jquery.min.js)', Static), (r'/(lastInput|lastOutput)Graph', GraphResource), - (r"/graph/reasoning", CycloneGraphHandler, {'masterGraph': reasoning.outputGraph}), - (r"/graph/reasoning/events", CycloneGraphEventsHandler, {'masterGraph': reasoning.outputGraph}), + (r"/graph/reasoning", CycloneGraphHandler, + {'masterGraph': reasoning.outputGraph}), + (r"/graph/reasoning/events", CycloneGraphEventsHandler, + {'masterGraph': reasoning.outputGraph}), (r'/ntGraphs', NtGraphs), (r'/rules', Rules), @@ -289,22 +295,22 @@ log.setLevel(WARN) if arg['-i'] or arg['-r'] or arg['-o'] or arg['-v']: - log.handlers[0].setFormatter(ColoredFormatter("%(log_color)s%(levelname)-8s %(name)-6s %(filename)-12s:%(lineno)-3s %(funcName)-20s%(reset)s %(white)s%(message)s", - datefmt=None, - reset=True, - log_colors={ + log.handlers[0].setFormatter(ColoredFormatter( + "%(log_color)s%(levelname)-8s %(name)-6s %(filename)-12s:%(lineno)-3s %(funcName)-20s%(reset)s %(white)s%(message)s", + datefmt=None, + reset=True, + log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'red,bg_white', - }, - secondary_log_colors={}, - style='%' -)) + }, + secondary_log_colors={}, + style='%' + )) defer.setDebugging(True) - if arg['-i']: import twisted.python.log twisted.python.log.startLogging(sys.stdout)
--- a/service/reasoning/serv.n3 Thu Feb 13 23:00:06 2020 -0800 +++ b/service/reasoning/serv.n3 Fri Feb 14 00:07:23 2020 -0800 @@ -16,7 +16,7 @@ "-v" "`pwd`:/opt" ); :localRunCmdline ( - "python3" "reasoning.py" "-v" + "python3" "reasoning.py" "-iro" ); :dockerFile "Dockerfile" .
--- a/service/reasoning/tasks.py Thu Feb 13 23:00:06 2020 -0800 +++ b/service/reasoning/tasks.py Fri Feb 14 00:07:23 2020 -0800 @@ -1,37 +1,34 @@ -from invoke import task - -JOB='reasoning' -PORT=9071 - -TAG=f'bang6:5000/{JOB}_x86:latest' +from invoke import Collection, task +import sys +sys.path.append('/my/proj/release') +from serv_tasks import serv_tasks -@task -def build_image(ctx): - ctx.run(f'docker build --network=host -t {TAG} .') +ns = Collection() +serv_tasks(ns, 'serv.n3', 'reasoning') -@task(pre=[build_image]) -def push_image(ctx): - ctx.run(f'docker push {TAG}') - -@task -def shell(ctx): - ctx.run(f'docker run --name {JOB}_shell --rm -it --cap-add SYS_PTRACE -v `pwd`:/mnt --dns 10.2.0.1 --dns-search bigasterisk.com --net=host {TAG} /bin/bash', pty=True) +@ns.add_task +@task(pre=[ns['build']]) +def local_run_mock(ctx): + ctx.run(f'docker run --name reasoning_local_run_mock --rm -it -p 9071:9071 -v `pwd`:/opt --dns 10.2.0.1 --dns-search bigasterisk.com --net=host bang6:5000/reasoning:latest python3 reasoning.py -iro --mockoutput', pty=True) -@task(pre=[build_image]) -def local_run(ctx): - ctx.run(f'docker run --name {JOB}_local --rm -it ' - f'-p {PORT}:{PORT} ' - f'-v `pwd`:/mnt ' - f'-v `pwd`/index.html:/opt/index.html ' - f'--dns 10.2.0.1 --dns-search bigasterisk.com ' - f'--net=host ' - f'{TAG} ' - f'python /mnt/{JOB}.py -iro', pty=True) - -@task(pre=[build_image]) -def local_run_mock(ctx): - ctx.run(f'docker run --name {JOB}_local_run_mock --rm -it -p {PORT}:{PORT} -v `pwd`:/mnt --dns 10.2.0.1 --dns-search bigasterisk.com --net=host {TAG} python /mnt/{JOB}.py -iro --mockoutput', pty=True) - -@task(pre=[push_image]) -def redeploy(ctx): - ctx.run(f'supervisorctl -s http://bang:9001/ restart {JOB}_{PORT}') +@ns.add_task +@task(pre=[ns['build']]) +def pytype(ctx): + ctx.run(f'docker run ' + f'--name reasoning_pytype ' + f'--rm -it ' + f'-v `pwd`:/opt ' + f'--dns 10.2.0.1 ' + f'--dns-search bigasterisk.com ' + f'--net=host bang6:5000/reasoning:latest ' + f'pytype --pythonpath /usr/local/lib/python3.6/dist-packages:. ' + f'--jobs 4 ' + f'actions.py ' + f'escapeoutputstatements.py ' + f'graphop.py ' + f'httpputoutputs.py ' + f'inference.py ' + f'inputgraph.py ' + f'private_ipv6_addresses.py ' + f'rdflibtrig.py ' + f'reasoning.py', pty=True)