changeset 449:ef7eba0551f2

collector partial py3+types update. WIP Ignore-this: 3fe8cc7b09bbfc8bec7f5d6a5e1630b
author drewp@bigasterisk.com
date Thu, 18 Apr 2019 22:00:06 -0700
parents 8cd163e0e50c
children 5595c447c630
files service/collector/Dockerfile service/collector/makefile service/collector/requirements.txt service/collector/sse_collector.py
diffstat 4 files changed, 58 insertions(+), 36 deletions(-) [+]
line wrap: on
line diff
--- a/service/collector/Dockerfile	Thu Apr 18 21:59:47 2019 -0700
+++ b/service/collector/Dockerfile	Thu Apr 18 22:00:06 2019 -0700
@@ -2,13 +2,17 @@
 
 WORKDIR /opt
 
+RUN apt-get install -y vim
+
 COPY requirements.txt ./
-RUN pip install -r requirements.txt
-RUN pip install py-spy
+RUN pip3 install -Ur requirements.txt
+# not sure why this doesn't work from inside requirements.txt
+RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip'
 
+COPY stubs ./stubs
 COPY twisted_sse_demo ./twisted_sse_demo
 COPY *.py req* ./
 
 EXPOSE 9072
 
-CMD [ "python", "./sse_collector.py" ]
+CMD [ "python3", "./sse_collector.py" ]
--- a/service/collector/makefile	Thu Apr 18 21:59:47 2019 -0700
+++ b/service/collector/makefile	Thu Apr 18 22:00:06 2019 -0700
@@ -6,18 +6,20 @@
 build_image:
 	rm -rf tmp_ctx
 	mkdir -p tmp_ctx
-	cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* tmp_ctx
+	cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* stubs tmp_ctx
 	docker build --network=host -t ${TAG} tmp_ctx
+
+push_image: build_image
 	docker push ${TAG}
 
-shell:
+shell: build_image
 	docker run --rm -it --cap-add SYS_PTRACE --net=host ${TAG} /bin/bash
 
 local_run: build_image
 	docker run --rm -it -p ${PORT}:${PORT} \
           --net=host \
           ${TAG} \
-          python sse_collector.py -v
+          python3 sse_collector.py -v
 
 local_run_strace: build_image
 	docker run --rm -it -p ${PORT}:${PORT} \
@@ -25,7 +27,7 @@
           --net=host \
           --cap-add SYS_PTRACE \
           ${TAG} \
-          strace -f -tts 200 python /mnt/sse_collector.py -v
+          strace -f -tts 200 python3 /mnt/sse_collector.py -v
 
 local_run_pyspy: build_image
 	docker run --rm -it -p ${PORT}:${PORT} \
@@ -33,9 +35,16 @@
           --net=host \
           --cap-add SYS_PTRACE \
           ${TAG} \
-          py-spy -- python /mnt/sse_collector.py
+          py-spy -- python3 /mnt/sse_collector.py
+
+typecheck: build_image
+	docker run --rm -it -p ${PORT}:${PORT} \
+          --net=host \
+	  -e=MYPY=/usr/local/lib/python3.6/dist-packages:stubs \
+          ${TAG} \
+           /usr/local/bin/mypy --python-executable /usr/bin/python3 --no-implicit-optional --ignore-missing-imports sse_collector.py
 
 
-redeploy: build_image
+redeploy: push_image
 	supervisorctl restart sse_collector_9072
 
--- a/service/collector/requirements.txt	Thu Apr 18 21:59:47 2019 -0700
+++ b/service/collector/requirements.txt	Thu Apr 18 22:00:06 2019 -0700
@@ -1,15 +1,13 @@
-cyclone
 docopt
 ipdb
 service_identity
 twisted
-
-#rdflib==4.2.2
-git+http://github.com/drewp/rdflib.git@5fa18be1231a5e4dfc86ec28f2f754158c6f6f0b#egg=rdflib
+py-spy
+mypy
 
-#rdflib-jsonld==0.4.0
-#git+http://github.com/RDFLib/rdflib-jsonld@cc5f005b222105724cd59c6069df9982fbd28c98#egg=rdflib_jsonld
-git+http://github.com/drewp/rdflib-jsonld.git@0a560c9f1aa7c7bbb80fea389e1f5fa51d1287f8#egg=rdflib_jsonld
+rdflib==4.2.2
+rdflib-jsonld==0.4.0
 
 git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
 https://projects.bigasterisk.com/rdfdb/rdfdb-0.8.0.tar.gz
+https://github.com/drewp/cyclone/archive/python3.zip
--- a/service/collector/sse_collector.py	Thu Apr 18 21:59:47 2019 -0700
+++ b/service/collector/sse_collector.py	Thu Apr 18 22:00:06 2019 -0700
@@ -1,4 +1,3 @@
-from __future__ import division
 """
 requesting /graph/foo returns an SSE patch stream that's the
 result of fetching multiple other SSE patch streams. The result stream
@@ -8,7 +7,7 @@
 - filter out unneeded stmts from the sources
 - give a time resolution and concatenate any patches that come faster than that res
 """
-import sys, logging, collections, json, time
+import logging, collections, json, time
 from twisted.internet import reactor, defer
 import cyclone.web, cyclone.sse
 from rdflib import URIRef, Namespace
@@ -16,12 +15,19 @@
 from greplin import scales
 from greplin.scales.cyclonehandler import StatsHandler
 from logsetup import log, enableTwistedLog
-from logsetup import log
 from patchablegraph import jsonFromPatch
 from rdfdb.patch import Patch
+from typing import Callable, Dict, NewType
+
+# workaround for broken import in twisted_sse_demo/eventsourcee.py
+import sys; sys.path.append('twisted_sse_demo')
 from patchsource import ReconnectingPatchSource
+
 from sse_collector_config import config
 
+SourceUri = NewType('SourceUri', URIRef)
+
+
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
 
@@ -38,9 +44,9 @@
     """
     functions that make statements originating from sse_collector itself
     """
-    def __init__(self, applyPatch):
+    def __init__(self, applyPatch: Callable[[Patch], None]):
         self.applyPatch = applyPatch
-        self._sourceState = {} # source: state URIRef
+        self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef
 
     @STATS.localStatementsPatch.time()
     def setSourceState(self, source, state):
@@ -113,11 +119,11 @@
         
     def pprintTable(self):
         for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
-            print "%03d. %-80s from %s to %s" % (
+            print("%03d. %-80s from %s to %s" % (
                 i,
                 abbrevStmt(stmt),
                 [abbrevTerm(s) for s in sources],
-                handlers)
+                handlers))
 
     @STATS.makeSyncPatch.time()
     def makeSyncPatch(self, handler, sources):
@@ -127,7 +133,7 @@
 
         sources_set = set(sources)
         with self._postDeleteStatements() as garbage:
-            for stmt, (stmtSources, handlers) in self.statements.iteritems():
+            for stmt, (stmtSources, handlers) in self.statements.items():
                 belongsInHandler = not sources_set.isdisjoint(stmtSources)
                 handlerHasIt = handler in handlers
                 log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
@@ -169,7 +175,7 @@
         newStmts = set(stmts)
 
         with self._postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.statements.iteritems():
+            for stmt, (sources, handlers) in self.statements.items():
                 if source in sources:
                     if stmt not in stmts:
                         sources.remove(source)
@@ -184,14 +190,14 @@
 
     def discardHandler(self, handler):
         with self._postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.statements.iteritems():
+            for stmt, (sources, handlers) in self.statements.items():
                 handlers.discard(handler)
                 if not sources and not handlers:
                     garbage.add(stmt)
 
     def discardSource(self, source):
         with self._postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.statements.iteritems():
+            for stmt, (sources, handlers) in self.statements.items():
                 sources.discard(source)
                 if not sources and not handlers:
                     garbage.add(stmt)
@@ -224,7 +230,7 @@
         matches = [s for s in config['streams'] if s['id'] == streamId]
         if len(matches) != 1:
             raise ValueError("%s matches for %r" % (len(matches), streamId))
-        return map(URIRef, matches[0]['sources']) + [COLLECTOR]
+        return list(map(URIRef, matches[0]['sources'])) + [COLLECTOR]
 
     @STATS.onPatch.time()
     def _onPatch(self, source, p, fullGraph=False):
@@ -256,7 +262,7 @@
         # reduce loops here- prepare all patches at once
         for h in (self.handlers if handler is None else [handler]):
             period = 1
-            if 'Raspbian' in h.request.headers.get('user-agent'):
+            if 'Raspbian' in h.request.headers.get('user-agent', ''):
                 period = 5
             if h.lastPatchSentTime > now - period:
                 continue
@@ -268,7 +274,7 @@
                 # nice for this service to try to break it up into multiple sends,
                 # although there's no guarantee at all since any single stmt 
                 # could be any length.
-                h.sendEvent(message=jsonFromPatch(p), event='patch')
+                h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch')
                 h.lastPatchSentTime = now
             else:
                 log.debug('nothing to send to %s', h)
@@ -325,9 +331,9 @@
     _handlerSerial = 0
     def __init__(self, application, request):
         cyclone.sse.SSEHandler.__init__(self, application, request)
-        self.streamId = request.uri[len('/graph/'):]
+        self.bound = False
+        self.created = time.time()
         self.graphClients = self.settings.graphClients
-        self.created = time.time()
         
         self._serial = SomeGraph._handlerSerial
         SomeGraph._handlerSerial += 1
@@ -345,11 +351,17 @@
             'userAgent': self.request.headers.get('user-agent'),
         }
         
-    def bind(self):
+    def bind(self, graphPath):
+        self.streamId = graphPath
+
         self.graphClients.addSseHandler(self)
+        # If something goes wrong with addSseHandler, I don't want to
+        # try removeSseHandler.
+        self.bound = True
         
     def unbind(self):
-        self.graphClients.removeSseHandler(self)
+        if self.bound:
+            self.graphClients.removeSseHandler(self)
 
 class State(cyclone.web.RequestHandler):
     @STATS.getState.time()
@@ -374,8 +386,7 @@
     """)
     
     if arg['-v']:
-        import twisted.python.log
-        twisted.python.log.startLogging(sys.stdout)
+        enableTwistedLog()
         log.setLevel(logging.DEBUG)
         defer.setDebugging(True)