changeset 1254:666f9a2198a7

add types to sse_collector.py. Surprisingly few bugs found. Ignore-this: df20acbf7ec27226f3060f3b5a4c710b darcs-hash:7c699d51556f20c785dfc9190e9ce8e38c0fa1e6
author drewp <drewp@bigasterisk.com>
date Fri, 19 Apr 2019 01:08:01 -0700
parents 0e7044b23dad
children 9e1f067010b3
files service/collector/Dockerfile service/collector/makefile service/collector/mypy.ini service/collector/requirements.txt service/collector/sse_collector.py
diffstat 5 files changed, 95 insertions(+), 65 deletions(-) [+]
line wrap: on
line diff
--- a/service/collector/Dockerfile	Fri Apr 19 01:07:29 2019 -0700
+++ b/service/collector/Dockerfile	Fri Apr 19 01:08:01 2019 -0700
@@ -8,10 +8,11 @@
 RUN pip3 install -Ur requirements.txt
 # not sure why this doesn't work from inside requirements.txt
 RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip'
+RUN touch /usr/local/lib/python3.6/dist-packages/greplin/__init__.py
 
 COPY stubs ./stubs
 COPY twisted_sse_demo ./twisted_sse_demo
-COPY *.py req* ./
+COPY *.py req* *.ini ./
 
 EXPOSE 9072
 
--- a/service/collector/makefile	Fri Apr 19 01:07:29 2019 -0700
+++ b/service/collector/makefile	Fri Apr 19 01:08:01 2019 -0700
@@ -6,14 +6,19 @@
 build_image:
 	rm -rf tmp_ctx
 	mkdir -p tmp_ctx
-	cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* stubs tmp_ctx
+	cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* *.ini stubs tmp_ctx
 	docker build --network=host -t ${TAG} tmp_ctx
 
 push_image: build_image
 	docker push ${TAG}
 
 shell: build_image
-	docker run --rm -it --cap-add SYS_PTRACE --net=host ${TAG} /bin/bash
+	docker run --rm -it --cap-add SYS_PTRACE \
+	  --name $(JOB)_shell \
+	  --net=host \
+	  -v `pwd`/.mypy_cache:/opt/.mypy_cache \
+	  -v `pwd`/sse_collector.py:/opt/sse_collector.py \
+	  ${TAG} /bin/bash
 
 local_run: build_image
 	docker run --rm -it -p ${PORT}:${PORT} \
@@ -40,9 +45,9 @@
 typecheck: build_image
 	docker run --rm -it -p ${PORT}:${PORT} \
           --net=host \
-	  -e=MYPY=/usr/local/lib/python3.6/dist-packages:stubs \
+	  -v `pwd`/.mypy_cache:/opt/.mypy_cache \
           ${TAG} \
-           /usr/local/bin/mypy --python-executable /usr/bin/python3 --no-implicit-optional --ignore-missing-imports sse_collector.py
+           /usr/local/bin/mypy -m sse_collector -m export_to_influxdb -m logsetup -m patchablegraph -m patchsource -m rdfdb.patch
 
 
 redeploy: push_image
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/mypy.ini	Fri Apr 19 01:08:01 2019 -0700
@@ -0,0 +1,7 @@
+[mypy]
+no_implicit_optional = True
+#ignore_missing_imports = True
+python_executable = /usr/bin/python3
+warn_unused_configs = True
+warn_return_any = True
+mypy_path = /opt/stubs:/usr/local/lib/python3.6/dist-packages/
\ No newline at end of file
--- a/service/collector/requirements.txt	Fri Apr 19 01:07:29 2019 -0700
+++ b/service/collector/requirements.txt	Fri Apr 19 01:08:01 2019 -0700
@@ -1,7 +1,7 @@
 docopt
 ipdb
-service_identity
-twisted
+service_identity==18.1.0
+twisted==19.2.0
 py-spy
 mypy
 
@@ -11,3 +11,4 @@
 git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
 https://projects.bigasterisk.com/rdfdb/rdfdb-0.8.0.tar.gz
 https://github.com/drewp/cyclone/archive/python3.zip
+influxdb==5.2.2
--- a/service/collector/sse_collector.py	Fri Apr 19 01:07:29 2019 -0700
+++ b/service/collector/sse_collector.py	Fri Apr 19 01:08:01 2019 -0700
@@ -7,17 +7,19 @@
 - filter out unneeded stmts from the sources
 - give a time resolution and concatenate any patches that come faster than that res
 """
-import logging, collections, json, time
-from twisted.internet import reactor, defer
-import cyclone.web, cyclone.sse
-from rdflib import URIRef, Namespace
 from docopt import docopt
 from greplin import scales
 from greplin.scales.cyclonehandler import StatsHandler
+from rdflib import Namespace, URIRef, StatementType
+from rdflib.term import Node
+from twisted.internet import reactor, defer
+from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional
+import cyclone.web, cyclone.sse
+import logging, collections, json, time
+
 from logsetup import log, enableTwistedLog
 from patchablegraph import jsonFromPatch
 from rdfdb.patch import Patch
-from typing import Callable, Dict, NewType
 
 # workaround for broken import in twisted_sse_demo/eventsourcee.py
 import sys; sys.path.append('twisted_sse_demo')
@@ -25,11 +27,12 @@
 
 from sse_collector_config import config
 
-SourceUri = NewType('SourceUri', URIRef)
+#SourceUri = NewType('SourceUri', URIRef) # doesn't work
+class SourceUri(URIRef): pass
 
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
-COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
+COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
 
 STATS = scales.collection('/root',
                           scales.PmfStat('getState'),
@@ -44,12 +47,12 @@
     """
     functions that make statements originating from sse_collector itself
     """
-    def __init__(self, applyPatch: Callable[[Patch], None]):
+    def __init__(self, applyPatch: Callable[[URIRef, Patch], None]):
         self.applyPatch = applyPatch
         self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef
 
     @STATS.localStatementsPatch.time()
-    def setSourceState(self, source, state):
+    def setSourceState(self, source: SourceUri, state: URIRef):
         """
         add a patch to the COLLECTOR graph about the state of this
         source. state=None to remove the source.
@@ -80,44 +83,54 @@
                     (source, ROOM['state'], oldState, COLLECTOR),
                 ]))
 
-def abbrevTerm(t):
+def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]:
     if isinstance(t, URIRef):
         return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
                 .replace('http://projects.bigasterisk.com/device/', 'dev:')
                 .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
     return t
 
-def abbrevStmt(stmt):
-    return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt))
-    
+def abbrevStmt(stmt: StatementType) -> str:
+    t = tuple(map(abbrevTerm, stmt))
+    return '(%s %s %s %s)' % (t[0], t[1], t[2], t[3])
+
+StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[SomeGraph]]]
+
+
+class PostDeleter(object):
+    def __init__(self, statements: StatementTable):
+        self.statements = statements
+
+    def __enter__(self):
+        self._garbage: List[StatementType] = []
+        return self
+        
+    def add(self, stmt: StatementType):
+        self._garbage.append(stmt)
+        
+    def __exit__(self, type, value, traceback):
+        if type is not None:
+            raise
+        for stmt in self._garbage:
+            del self.statements[stmt]
+
+            
 class ActiveStatements(object):
     def __init__(self):
         # This table holds statements asserted by any of our sources
         # plus local statements that we introduce (source is
         # http://bigasterisk.com/sse_collector/).
-        self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)
+        self.statements: StatementTable = collections.defaultdict(lambda: (set(), set()))
 
-    def state(self):
+    def state(self) -> Dict:
         return {
             'len': len(self.statements),
             }
         
-    def _postDeleteStatements(self):
-        statements = self.statements
-        class PostDeleter(object):
-            def __enter__(self):
-                self._garbage = []
-                return self
-            def add(self, stmt):
-                self._garbage.append(stmt)
-            def __exit__(self, type, value, traceback):
-                if type is not None:
-                    raise
-                for stmt in self._garbage:
-                    del statements[stmt]
-        return PostDeleter()
+    def _postDeleteStatements(self) -> PostDeleter:
+        return PostDeleter(self.statements)
         
-    def pprintTable(self):
+    def pprintTable(self) -> None:
         for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
             print("%03d. %-80s from %s to %s" % (
                 i,
@@ -126,15 +139,14 @@
                 handlers))
 
     @STATS.makeSyncPatch.time()
-    def makeSyncPatch(self, handler, sources):
+    def makeSyncPatch(self, handler: SomeGraph, sources: Set[SourceUri]):
         # todo: this could run all handlers at once, which is how we use it anyway
         adds = []
         dels = []
 
-        sources_set = set(sources)
         with self._postDeleteStatements() as garbage:
             for stmt, (stmtSources, handlers) in self.statements.items():
-                belongsInHandler = not sources_set.isdisjoint(stmtSources)
+                belongsInHandler = not sources.isdisjoint(stmtSources)
                 handlerHasIt = handler in handlers
                 log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
                 if belongsInHandler and not handlerHasIt:
@@ -148,7 +160,7 @@
 
         return Patch(addQuads=adds, delQuads=dels)
         
-    def applySourcePatch(self, source, p):
+    def applySourcePatch(self, source: SourceUri, p: Patch):
         for stmt in p.addQuads:
             sourceUrls, handlers = self.statements[stmt]
             if source in sourceUrls:
@@ -170,7 +182,7 @@
                     garbage.add(stmt)
 
     @STATS.replaceSourceStatements.time()
-    def replaceSourceStatements(self, source, stmts):
+    def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[StatementType]):
         log.debug('replaceSourceStatements with %s stmts', len(stmts))
         newStmts = set(stmts)
 
@@ -188,19 +200,20 @@
 
         self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
 
-    def discardHandler(self, handler):
+    def discardHandler(self, handler: SomeGraph):
         with self._postDeleteStatements() as garbage:
             for stmt, (sources, handlers) in self.statements.items():
                 handlers.discard(handler)
                 if not sources and not handlers:
                     garbage.add(stmt)
 
-    def discardSource(self, source):
+    def discardSource(self, source: SourceUri):
         with self._postDeleteStatements() as garbage:
             for stmt, (sources, handlers) in self.statements.items():
                 sources.discard(source)
                 if not sources and not handlers:
                     garbage.add(stmt)
+
                     
 class GraphClients(object):
     """
@@ -212,28 +225,28 @@
     statements come and go, we make patches to send to requesters.
     """
     def __init__(self):
-        self.clients = {}  # url: PatchSource (COLLECTOR is not listed)
-        self.handlers = set()  # handler
-        self.statements = ActiveStatements()
+        self.clients: Dict[SourceUri, PatchSource] = {}  # (COLLECTOR is not listed)
+        self.handlers: Set[SomeGraph] = set()
+        self.statements: ActiveStatements = ActiveStatements()
         
         self._localStatements = LocalStatements(self._onPatch)
 
-    def state(self):
+    def state(self) -> Dict:
         return {
             'clients': [ps.state() for ps in self.clients.values()],
             'sseHandlers': [h.state() for h in self.handlers],
             'statements': self.statements.state(),
         }
 
-    def _sourcesForHandler(self, handler):
+    def _sourcesForHandler(self, handler: SomeGraph) -> List[SourceUri]:
         streamId = handler.streamId
         matches = [s for s in config['streams'] if s['id'] == streamId]
         if len(matches) != 1:
             raise ValueError("%s matches for %r" % (len(matches), streamId))
-        return list(map(URIRef, matches[0]['sources'])) + [COLLECTOR]
+        return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR]
 
     @STATS.onPatch.time()
-    def _onPatch(self, source, p, fullGraph=False):
+    def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False):
         if fullGraph:
             # a reconnect may need to resend the full graph even
             # though we've already sent some statements
@@ -253,7 +266,7 @@
                 ROOM['patchesReceived'])
 
     @STATS.sendUpdatePatch.time()
-    def _sendUpdatePatch(self, handler=None):
+    def _sendUpdatePatch(self, handler: Optional[SomeGraph]=None):
         """
         send a patch event out this handler to bring it up to date with
         self.statements
@@ -279,7 +292,7 @@
             else:
                 log.debug('nothing to send to %s', h)
                 
-    def addSseHandler(self, handler):
+    def addSseHandler(self, handler: SomeGraph):
         log.info('addSseHandler %r %r', handler, handler.streamId)
 
         # fail early if id doesn't match
@@ -301,7 +314,7 @@
 
         self._sendUpdatePatch(handler)
         
-    def removeSseHandler(self, handler):
+    def removeSseHandler(self, handler: SomeGraph):
         log.info('removeSseHandler %r', handler)
         self.statements.discardHandler(handler)
         for source in self._sourcesForHandler(handler):
@@ -314,7 +327,7 @@
             
         self.handlers.remove(handler)
 
-    def _stopClient(self, url):
+    def _stopClient(self, url: SourceUri):
         if url == COLLECTOR:
             return
             
@@ -329,7 +342,7 @@
 
 class SomeGraph(cyclone.sse.SSEHandler):
     _handlerSerial = 0
-    def __init__(self, application, request):
+    def __init__(self, application: cyclone.web.Application, request):
         cyclone.sse.SSEHandler.__init__(self, application, request)
         self.bound = False
         self.created = time.time()
@@ -337,12 +350,12 @@
         
         self._serial = SomeGraph._handlerSerial
         SomeGraph._handlerSerial += 1
-        self.lastPatchSentTime = 0
+        self.lastPatchSentTime: float = 0.0
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return '<Handler #%s>' % self._serial
 
-    def state(self):
+    def state(self) -> Dict:
         return {
             'created': round(self.created, 2),
             'ageHours': round((time.time() - self.created) / 3600, 2),
@@ -351,21 +364,22 @@
             'userAgent': self.request.headers.get('user-agent'),
         }
         
-    def bind(self, graphPath):
-        self.streamId = graphPath
+    def bind(self, *args, **kwargs):
+        self.streamId = args[0]
 
         self.graphClients.addSseHandler(self)
         # If something goes wrong with addSseHandler, I don't want to
         # try removeSseHandler.
         self.bound = True
         
-    def unbind(self):
+    def unbind(self) -> None:
         if self.bound:
             self.graphClients.removeSseHandler(self)
 
+            
 class State(cyclone.web.RequestHandler):
     @STATS.getState.time()
-    def get(self):
+    def get(self) -> None:
         try:
             state = self.settings.graphClients.state()
         except:
@@ -374,9 +388,11 @@
         
         self.write(json.dumps({'graphClients': state}, indent=2))
 
+        
 class Root(cyclone.web.RequestHandler):
-    def get(self):
+    def get(self) -> None:
         self.write('<html><body>sse_collector</body></html>')
+
         
 if __name__ == '__main__':
     arg = docopt("""
@@ -395,7 +411,7 @@
     #exporter = InfluxExporter(... to export some stats values
     
     reactor.listenTCP(
-        19072,
+        9072,
         cyclone.web.Application(
             handlers=[
                 (r'/', Root),