changeset 756:f3f667769aef

python 3! and some types and cleanups Ignore-this: 3453a547ee745fa83668f36956c835cd
author drewp@bigasterisk.com
date Fri, 14 Feb 2020 00:07:23 -0800
parents ffcad6bf9c57
children cee91fc85b03
files service/reasoning/Dockerfile service/reasoning/actions.py service/reasoning/graphop.py service/reasoning/httpputoutputs.py service/reasoning/inference.py service/reasoning/inputgraph.py service/reasoning/reasoning.py service/reasoning/serv.n3 service/reasoning/tasks.py
diffstat 9 files changed, 107 insertions(+), 81 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/Dockerfile	Thu Feb 13 23:00:06 2020 -0800
+++ b/service/reasoning/Dockerfile	Fri Feb 14 00:07:23 2020 -0800
@@ -3,14 +3,16 @@
 WORKDIR /opt
 
 COPY requirements.txt ./
-RUN pip install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt
+RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt
 
 COPY FuXi/ FuXi
-RUN pip install ./FuXi
+RUN pip3 install ./FuXi
+
+RUN pip3 install pytype
 
 COPY *.n3 *.py *.html req* ./
 COPY input ./input
 
 EXPOSE 9071
 
-CMD [ "python", "./reasoning.py" ]
+CMD [ "python3", "./reasoning.py" ]
--- a/service/reasoning/actions.py	Thu Feb 13 23:00:06 2020 -0800
+++ b/service/reasoning/actions.py	Fri Feb 14 00:07:23 2020 -0800
@@ -7,6 +7,7 @@
 import treq
 
 from httpputoutputs import HttpPutOutputs
+from inputgraph import InputGraph
 
 log = logging.getLogger('output')
 
@@ -28,7 +29,7 @@
 
 
 class Actions(object):
-    def __init__(self, inputGraph, sendToLiveClients, mockOutput=False):
+    def __init__(self, inputGraph: InputGraph, sendToLiveClients, mockOutput=False):
         self.inputGraph = inputGraph
         self.mockOutput = mockOutput
         self.putOutputs = HttpPutOutputs(mockOutput=mockOutput)
@@ -68,7 +69,7 @@
             if putUrl and matchPred == stmt[1]:
                 log.debug('putDevices: stmt %s leads to putting at %s',
                           ntStatement(stmt), putUrl.n3())
-                self._put(putUrl + '?' + urllib.urlencode([
+                self._put(putUrl + '?' + urllib.parse.urlencode([
                     ('s', str(stmt[0])),
                     ('p', str(putPred))]),
                           str(stmt[2].toPython()),
@@ -146,6 +147,8 @@
                                        default=Literal('30s'))#.map(secsFromLiteral)
 
     def _put(self, url, payload, refreshSecs, agent=None):
+        if isinstance(payload, str):
+            payload = payload.encode('utf8')
         assert isinstance(payload, bytes)
         self.putOutputs.put(url, payload, agent, refreshSecs)
 
--- a/service/reasoning/graphop.py	Thu Feb 13 23:00:06 2020 -0800
+++ b/service/reasoning/graphop.py	Fri Feb 14 00:07:23 2020 -0800
@@ -1,8 +1,10 @@
 import logging
 from rdflib import URIRef, ConjunctiveGraph
+from typing import List
 log = logging.getLogger()
 
-def graphWithoutMetadata(g, ignorePredicates=[]):
+
+def graphWithoutMetadata(g: ConjunctiveGraph, ignorePredicates=[]):
     """
     graph filter that removes any statements whose subjects are
     contexts in the graph and also any statements with the given
@@ -17,7 +19,9 @@
             out.addN([stmt])
     return out
 
-def graphEqual(a, b, ignorePredicates=[]):
+
+def graphEqual(a: ConjunctiveGraph, b: ConjunctiveGraph,
+               ignorePredicates: List[URIRef]=[]):
     """
     compare graphs, omitting any metadata statements about contexts
     (especially modification times) and also any statements using the
@@ -27,7 +31,7 @@
     stmtsB = set(graphWithoutMetadata(b, ignorePredicates))
     if stmtsA == stmtsB:
         return True
-    
+
     if log.getEffectiveLevel() <= logging.INFO:
         lost = stmtsA - stmtsB
         if lost:
--- a/service/reasoning/httpputoutputs.py	Thu Feb 13 23:00:06 2020 -0800
+++ b/service/reasoning/httpputoutputs.py	Fri Feb 14 00:07:23 2020 -0800
@@ -1,24 +1,31 @@
 import logging
 import time
 
+from rdflib import URIRef
 from rx.subjects import BehaviorSubject
 from twisted.internet import reactor
+from twisted.python.failure import Failure
+from twisted.internet.interfaces import IDelayedCall
 import treq
+from typing import Optional
 
 log = logging.getLogger('httpputoutputs')
 
+
 class HttpPutOutput(object):
-    def __init__(self, url,
-                 refreshSecs,#: BehaviorSubject,
+    lastChangeTime: float
+
+    def __init__(self, url: str,
+                 refreshSecs: BehaviorSubject,
                  mockOutput=False):
         self.url = url
         self.mockOutput = mockOutput
-        self.payload = None
-        self.foafAgent = None
-        self.nextCall = None
-        self.lastErr = None
-        self.numRequests = 0
-        self.refreshSecs = refreshSecs
+        self.payload: Optional[str] = None
+        self.foafAgent: Optional[URIRef] = None
+        self.nextCall: IDelayedCall = None
+        self.lastErr: Optional[Failure] = None
+        self.numRequests: int = 0
+        self.refreshSecs: float = refreshSecs
 
     def report(self):
         return {
@@ -33,7 +40,7 @@
             'lastErr': str(self.lastErr) if self.lastErr is not None else None,
             }
 
-    def setPayload(self, payload, foafAgent):
+    def setPayload(self, payload: str, foafAgent: URIRef):
         if self.numRequests > 0 and (self.payload == payload and
                                      self.foafAgent == foafAgent):
             return
@@ -101,6 +108,7 @@
         self.nextCall = reactor.callLater(self.currentRefreshSecs(),
                                           self.makeRequest)
 
+
 class HttpPutOutputs(object):
     """these grow forever"""
     def __init__(self, mockOutput=False):
@@ -108,6 +116,7 @@
         self.state = {} # url: HttpPutOutput
 
     def put(self, url, payload, foafAgent, refreshSecs):
+        assert isinstance(url, str)
         if url not in self.state:
             self.state[url] = HttpPutOutput(url, mockOutput=self.mockOutput,
                                             refreshSecs=refreshSecs)
--- a/service/reasoning/inference.py	Thu Feb 13 23:00:06 2020 -0800
+++ b/service/reasoning/inference.py	Fri Feb 14 00:07:23 2020 -0800
@@ -7,7 +7,7 @@
     from rdflib.Graph import Graph
 except ImportError:
     from rdflib import Graph
-    
+
 from rdflib import Namespace
 from rdflib.parser import StringInputSource
 
@@ -15,7 +15,7 @@
 from FuXi.Rete import ReteNetwork
 from FuXi.Rete.RuleStore import N3RuleStore
 
-from greplin import scales 
+from greplin import scales
 STATS = scales.collection('/web',
                           scales.PmfStat('readRules'))
 
@@ -32,7 +32,7 @@
     # of my rules. This serialize/parse version is very slow (400ms),
     # but it only runs when the file changes.
     plainGraph = Graph()
-    plainGraph.parse(StringInputSource(n3), format='n3') # for inference
+    plainGraph.parse(StringInputSource(n3.encode('utf8')), format='n3') # for inference
     escapeOutputStatements(plainGraph, outputPatterns=outputPatterns)
     expandedN3 = plainGraph.serialize(format='n3')
 
@@ -60,7 +60,7 @@
             ruleStore = N3RuleStore()
             _loadAndEscape(ruleStore, rulesN3, outputPatterns)
             log.debug('%s rules' % len(ruleStore.rules))
-            
+
             _rulesCache = key + (rulesN3, ruleStore)
         return rulesN3, ruleStore
 
@@ -76,7 +76,7 @@
     with _dontChangeRulesStore(rules):
         network = ReteNetwork(rules, inferredTarget=target)
         network.feedFactsToAdd(tokenSet)
-    
+
     return target
 
 @contextlib.contextmanager
@@ -84,11 +84,11 @@
     if not hasattr(rules, '_stashOriginalRules'):
         rules._stashOriginalRules = rules.rules[:]
     yield
-    for k in rules.formulae.keys():
+    for k in list(rules.formulae.keys()):
         if not k.startswith('_:Formula'):
             del rules.formulae[k]
     rules.rules = rules._stashOriginalRules[:]
-    
+
 import time, logging
 log = logging.getLogger()
 def logTime(func):
--- a/service/reasoning/inputgraph.py	Thu Feb 13 23:00:06 2020 -0800
+++ b/service/reasoning/inputgraph.py	Fri Feb 14 00:07:23 2020 -0800
@@ -1,5 +1,6 @@
 import logging, time
 import weakref
+from typing import Callable
 
 from greplin import scales
 from rdflib import Graph, ConjunctiveGraph
@@ -11,6 +12,7 @@
 
 from patchablegraph.patchsource import ReconnectingPatchSource
 from rdfdb.rdflibpatch import patchQuads
+from rdfdb.patch import Patch
 
 log = logging.getLogger('fetch')
 
@@ -21,7 +23,9 @@
 STATS = scales.collection('/web',
                           scales.PmfStat('combineGraph'),
 )
-def parseRdf(text, contentType):
+
+
+def parseRdf(text: str, contentType: str):
     g = Graph()
     g.parse(StringInputSource(text), format={
         'text/n3': 'n3',
@@ -30,7 +34,7 @@
 
 
 class RemoteData(object):
-    def __init__(self, onChange):
+    def __init__(self, onChange: Callable[[], None]):
         """we won't fire onChange during init"""
         self.onChange = onChange
         self.graph = ConjunctiveGraph()
@@ -42,7 +46,7 @@
             #URIRef('http://frontdoor:10012/graph/events'),
             self.onPatch, reconnectSecs=10, agent='reasoning')
 
-    def onPatch(self, p, fullGraph):
+    def onPatch(self, p: Patch, fullGraph: bool):
         if fullGraph:
             self.graph = ConjunctiveGraph()
         patchQuads(self.graph,
@@ -79,6 +83,7 @@
         else:
             log.debug("  remote graph has no changes to trigger rules")
 
+
 class InputGraph(object):
     def __init__(self, inputDirs, onChange):
         """
--- a/service/reasoning/reasoning.py	Thu Feb 13 23:00:06 2020 -0800
+++ b/service/reasoning/reasoning.py	Fri Feb 14 00:07:23 2020 -0800
@@ -19,12 +19,15 @@
 
 import json, time, traceback, sys
 from logging import getLogger, DEBUG, WARN
+from typing import Dict, Optional, Set, Tuple
 
 from colorlog import ColoredFormatter
 from docopt import docopt
 from rdflib import Namespace, Literal, RDF, Graph, URIRef
+from rdflib.term import Node
 from twisted.internet import reactor, defer
 import cyclone.web, cyclone.websocket
+from FuXi.Rete.RuleStore import N3RuleStore
 
 from greplin import scales
 from greplin.scales.cyclonehandler import StatsHandler
@@ -48,7 +51,7 @@
                           scales.PmfStat('updateRules'),
 )
 
-def ntStatement(stmt):
+def ntStatement(stmt: Tuple[Node, Node, Node]):
     def compact(u):
         if isinstance(u, URIRef) and u.startswith(ROOM):
             return 'room:' + u[len(ROOM):]
@@ -56,6 +59,7 @@
     return '%s %s %s .' % (compact(stmt[0]), compact(stmt[1]), compact(stmt[2]))
 
 class Reasoning(object):
+    ruleStore: N3RuleStore
     def __init__(self, mockOutput=False):
         self.prevGraph = None
 
@@ -94,7 +98,7 @@
                  Literal(ruleParseTime))], ruleParseTime
 
     @STATS.graphChanged.time()
-    def graphChanged(self, inputGraph, oneShot=False, oneShotGraph=None):
+    def graphChanged(self, inputGraph: InputGraph, oneShot=False, oneShotGraph: Graph = None):
         """
         If we're getting called for a oneShot event, the oneShotGraph
         statements are already in inputGraph.getGraph().
@@ -141,11 +145,11 @@
     def copyOutput(self):
         self.outputGraph.setToGraph((s,p,o,ROOM['inferred']) for s,p,o in self.inferred)
 
-    def _makeInferred(self, inputGraph):
+    def _makeInferred(self, inputGraph: InputGraph):
         t1 = time.time()
 
         out = infer(inputGraph, self.ruleStore)
-        for p, n in NS.iteritems():
+        for p, n in NS.items():
             out.bind(p, n, override=True)
 
         inferenceTime = time.time() - t1
@@ -201,7 +205,7 @@
 
 # for reuse
 class GraphResource(cyclone.web.RequestHandler):
-    def get(self, which):
+    def get(self, which: str):
         self.set_header("Content-Type", "application/json")
         r = self.settings.reasoning
         g = {'lastInput': r.inputGraph.getGraph(),
@@ -242,11 +246,11 @@
         self.finish(msg)
 
 class Static(cyclone.web.RequestHandler):
-    def get(self, p):
+    def get(self, p: str):
         self.write(open(p).read())
 
-liveClients = set()
-def sendToLiveClients(d=None, asJson=None):
+liveClients: Set[cyclone.websocket.WebSocketHandler] = set()
+def sendToLiveClients(d: Dict[str, object]=None, asJson: Optional[str]=None):
     j = asJson or json.dumps(d)
     for c in liveClients:
         c.sendMessage(j)
@@ -274,8 +278,10 @@
             (r'/(jquery.min.js)', Static),
             (r'/(lastInput|lastOutput)Graph', GraphResource),
 
-            (r"/graph/reasoning", CycloneGraphHandler, {'masterGraph': reasoning.outputGraph}),
-            (r"/graph/reasoning/events", CycloneGraphEventsHandler, {'masterGraph': reasoning.outputGraph}),
+            (r"/graph/reasoning", CycloneGraphHandler,
+             {'masterGraph': reasoning.outputGraph}),
+            (r"/graph/reasoning/events", CycloneGraphEventsHandler,
+             {'masterGraph': reasoning.outputGraph}),
 
             (r'/ntGraphs', NtGraphs),
             (r'/rules', Rules),
@@ -289,22 +295,22 @@
     log.setLevel(WARN)
 
     if arg['-i'] or arg['-r'] or arg['-o'] or arg['-v']:
-        log.handlers[0].setFormatter(ColoredFormatter("%(log_color)s%(levelname)-8s %(name)-6s %(filename)-12s:%(lineno)-3s %(funcName)-20s%(reset)s %(white)s%(message)s",
-        datefmt=None,
-        reset=True,
-        log_colors={
+        log.handlers[0].setFormatter(ColoredFormatter(
+            "%(log_color)s%(levelname)-8s %(name)-6s %(filename)-12s:%(lineno)-3s %(funcName)-20s%(reset)s %(white)s%(message)s",
+            datefmt=None,
+            reset=True,
+            log_colors={
                 'DEBUG':    'cyan',
                 'INFO':     'green',
                 'WARNING':  'yellow',
                 'ERROR':    'red',
                 'CRITICAL': 'red,bg_white',
-        },
-        secondary_log_colors={},
-        style='%'
-))
+            },
+            secondary_log_colors={},
+            style='%'
+        ))
         defer.setDebugging(True)
 
-
     if arg['-i']:
         import twisted.python.log
         twisted.python.log.startLogging(sys.stdout)
--- a/service/reasoning/serv.n3	Thu Feb 13 23:00:06 2020 -0800
+++ b/service/reasoning/serv.n3	Fri Feb 14 00:07:23 2020 -0800
@@ -16,7 +16,7 @@
         "-v" "`pwd`:/opt"
       );
       :localRunCmdline (
-        "python3" "reasoning.py" "-v"
+        "python3" "reasoning.py" "-iro"
       );
       :dockerFile "Dockerfile"
 .
--- a/service/reasoning/tasks.py	Thu Feb 13 23:00:06 2020 -0800
+++ b/service/reasoning/tasks.py	Fri Feb 14 00:07:23 2020 -0800
@@ -1,37 +1,34 @@
-from invoke import task
-
-JOB='reasoning'
-PORT=9071
-
-TAG=f'bang6:5000/{JOB}_x86:latest'
+from invoke import Collection, task
+import sys
+sys.path.append('/my/proj/release')
+from serv_tasks import serv_tasks
 
-@task
-def build_image(ctx):
-    ctx.run(f'docker build --network=host -t {TAG} .')
+ns = Collection()
+serv_tasks(ns, 'serv.n3', 'reasoning')
 
-@task(pre=[build_image])
-def push_image(ctx):
-    ctx.run(f'docker push {TAG}')
-
-@task
-def shell(ctx):
-    ctx.run(f'docker run --name {JOB}_shell --rm -it --cap-add SYS_PTRACE -v `pwd`:/mnt --dns 10.2.0.1 --dns-search bigasterisk.com --net=host {TAG}  /bin/bash', pty=True)
+@ns.add_task
+@task(pre=[ns['build']])
+def local_run_mock(ctx):
+    ctx.run(f'docker run --name reasoning_local_run_mock --rm -it -p 9071:9071 -v `pwd`:/opt --dns 10.2.0.1 --dns-search bigasterisk.com --net=host bang6:5000/reasoning:latest python3 reasoning.py -iro --mockoutput', pty=True)
 
-@task(pre=[build_image])
-def local_run(ctx):
-    ctx.run(f'docker run --name {JOB}_local --rm -it '
-            f'-p {PORT}:{PORT} '
-            f'-v `pwd`:/mnt '
-            f'-v `pwd`/index.html:/opt/index.html '
-            f'--dns 10.2.0.1 --dns-search bigasterisk.com '
-            f'--net=host '
-            f'{TAG} '
-            f'python /mnt/{JOB}.py -iro', pty=True)
-
-@task(pre=[build_image])
-def local_run_mock(ctx):
-    ctx.run(f'docker run --name {JOB}_local_run_mock --rm -it -p {PORT}:{PORT} -v `pwd`:/mnt  --dns 10.2.0.1 --dns-search bigasterisk.com --net=host {TAG} python /mnt/{JOB}.py -iro --mockoutput', pty=True)
-
-@task(pre=[push_image])
-def redeploy(ctx):
-    ctx.run(f'supervisorctl -s http://bang:9001/ restart {JOB}_{PORT}')
+@ns.add_task
+@task(pre=[ns['build']])
+def pytype(ctx):
+    ctx.run(f'docker run '
+            f'--name reasoning_pytype '
+            f'--rm -it '
+            f'-v `pwd`:/opt '
+            f'--dns 10.2.0.1 '
+            f'--dns-search bigasterisk.com '
+            f'--net=host bang6:5000/reasoning:latest '
+            f'pytype --pythonpath /usr/local/lib/python3.6/dist-packages:. '
+            f'--jobs 4 '
+            f'actions.py '
+            f'escapeoutputstatements.py '
+            f'graphop.py '
+            f'httpputoutputs.py '
+            f'inference.py '
+            f'inputgraph.py '
+            f'private_ipv6_addresses.py '
+            f'rdflibtrig.py '
+            f'reasoning.py', pty=True)