# HG changeset patch # User drewp@bigasterisk.com # Date 1555650006 25200 # Node ID ef7eba0551f26669dcd8a0871742de09b00ce3c5 # Parent 8cd163e0e50ccf9549fae69b3f9913c38e690c42 collector partial py3+types update. WIP Ignore-this: 3fe8cc7b09bbfc8bec7f5d6a5e1630b diff -r 8cd163e0e50c -r ef7eba0551f2 service/collector/Dockerfile --- a/service/collector/Dockerfile Thu Apr 18 21:59:47 2019 -0700 +++ b/service/collector/Dockerfile Thu Apr 18 22:00:06 2019 -0700 @@ -2,13 +2,17 @@ WORKDIR /opt +RUN apt-get install -y vim + COPY requirements.txt ./ -RUN pip install -r requirements.txt -RUN pip install py-spy +RUN pip3 install -Ur requirements.txt +# not sure why this doesn't work from inside requirements.txt +RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip' +COPY stubs ./stubs COPY twisted_sse_demo ./twisted_sse_demo COPY *.py req* ./ EXPOSE 9072 -CMD [ "python", "./sse_collector.py" ] +CMD [ "python3", "./sse_collector.py" ] diff -r 8cd163e0e50c -r ef7eba0551f2 service/collector/makefile --- a/service/collector/makefile Thu Apr 18 21:59:47 2019 -0700 +++ b/service/collector/makefile Thu Apr 18 22:00:06 2019 -0700 @@ -6,18 +6,20 @@ build_image: rm -rf tmp_ctx mkdir -p tmp_ctx - cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* tmp_ctx + cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* stubs tmp_ctx docker build --network=host -t ${TAG} tmp_ctx + +push_image: build_image docker push ${TAG} -shell: +shell: build_image docker run --rm -it --cap-add SYS_PTRACE --net=host ${TAG} /bin/bash local_run: build_image docker run --rm -it -p ${PORT}:${PORT} \ --net=host \ ${TAG} \ - python sse_collector.py -v + python3 sse_collector.py -v local_run_strace: build_image docker run --rm -it -p ${PORT}:${PORT} \ @@ -25,7 +27,7 @@ --net=host \ --cap-add SYS_PTRACE \ ${TAG} \ - strace -f -tts 200 python /mnt/sse_collector.py -v + strace -f -tts 200 python3 /mnt/sse_collector.py -v local_run_pyspy: build_image docker run --rm -it -p ${PORT}:${PORT} \ @@ -33,9 +35,16 @@ --net=host \ --cap-add SYS_PTRACE \ ${TAG} \ - py-spy -- python /mnt/sse_collector.py + py-spy -- python3 /mnt/sse_collector.py + +typecheck: build_image + docker run --rm -it -p ${PORT}:${PORT} \ + --net=host \ + -e=MYPY=/usr/local/lib/python3.6/dist-packages:stubs \ + ${TAG} \ + /usr/local/bin/mypy --python-executable /usr/bin/python3 --no-implicit-optional --ignore-missing-imports sse_collector.py -redeploy: build_image +redeploy: push_image supervisorctl restart sse_collector_9072 diff -r 8cd163e0e50c -r ef7eba0551f2 service/collector/requirements.txt --- a/service/collector/requirements.txt Thu Apr 18 21:59:47 2019 -0700 +++ b/service/collector/requirements.txt Thu Apr 18 22:00:06 2019 -0700 @@ -1,15 +1,13 @@ -cyclone docopt ipdb service_identity twisted - -#rdflib==4.2.2 -git+http://github.com/drewp/rdflib.git@5fa18be1231a5e4dfc86ec28f2f754158c6f6f0b#egg=rdflib +py-spy +mypy -#rdflib-jsonld==0.4.0 -#git+http://github.com/RDFLib/rdflib-jsonld@cc5f005b222105724cd59c6069df9982fbd28c98#egg=rdflib_jsonld -git+http://github.com/drewp/rdflib-jsonld.git@0a560c9f1aa7c7bbb80fea389e1f5fa51d1287f8#egg=rdflib_jsonld +rdflib==4.2.2 +rdflib-jsonld==0.4.0 git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales https://projects.bigasterisk.com/rdfdb/rdfdb-0.8.0.tar.gz +https://github.com/drewp/cyclone/archive/python3.zip diff -r 8cd163e0e50c -r ef7eba0551f2 service/collector/sse_collector.py --- a/service/collector/sse_collector.py Thu Apr 18 21:59:47 2019 -0700 +++ b/service/collector/sse_collector.py Thu Apr 18 22:00:06 2019 -0700 @@ -1,4 +1,3 @@ -from __future__ import division """ requesting /graph/foo returns an SSE patch stream that's the result of fetching multiple other SSE patch streams. The result stream @@ -8,7 +7,7 @@ - filter out unneeded stmts from the sources - give a time resolution and concatenate any patches that come faster than that res """ -import sys, logging, collections, json, time +import logging, collections, json, time from twisted.internet import reactor, defer import cyclone.web, cyclone.sse from rdflib import URIRef, Namespace @@ -16,12 +15,19 @@ from greplin import scales from greplin.scales.cyclonehandler import StatsHandler from logsetup import log, enableTwistedLog -from logsetup import log from patchablegraph import jsonFromPatch from rdfdb.patch import Patch +from typing import Callable, Dict, NewType + +# workaround for broken import in twisted_sse_demo/eventsourcee.py +import sys; sys.path.append('twisted_sse_demo') from patchsource import ReconnectingPatchSource + from sse_collector_config import config +SourceUri = NewType('SourceUri', URIRef) + + ROOM = Namespace("http://projects.bigasterisk.com/room/") COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') @@ -38,9 +44,9 @@ """ functions that make statements originating from sse_collector itself """ - def __init__(self, applyPatch): + def __init__(self, applyPatch: Callable[[Patch], None]): self.applyPatch = applyPatch - self._sourceState = {} # source: state URIRef + self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef @STATS.localStatementsPatch.time() def setSourceState(self, source, state): @@ -113,11 +119,11 @@ def pprintTable(self): for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): - print "%03d. %-80s from %s to %s" % ( + print("%03d. %-80s from %s to %s" % ( i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], - handlers) + handlers)) @STATS.makeSyncPatch.time() def makeSyncPatch(self, handler, sources): @@ -127,7 +133,7 @@ sources_set = set(sources) with self._postDeleteStatements() as garbage: - for stmt, (stmtSources, handlers) in self.statements.iteritems(): + for stmt, (stmtSources, handlers) in self.statements.items(): belongsInHandler = not sources_set.isdisjoint(stmtSources) handlerHasIt = handler in handlers log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) @@ -169,7 +175,7 @@ newStmts = set(stmts) with self._postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.statements.iteritems(): + for stmt, (sources, handlers) in self.statements.items(): if source in sources: if stmt not in stmts: sources.remove(source) @@ -184,14 +190,14 @@ def discardHandler(self, handler): with self._postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.statements.iteritems(): + for stmt, (sources, handlers) in self.statements.items(): handlers.discard(handler) if not sources and not handlers: garbage.add(stmt) def discardSource(self, source): with self._postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.statements.iteritems(): + for stmt, (sources, handlers) in self.statements.items(): sources.discard(source) if not sources and not handlers: garbage.add(stmt) @@ -224,7 +230,7 @@ matches = [s for s in config['streams'] if s['id'] == streamId] if len(matches) != 1: raise ValueError("%s matches for %r" % (len(matches), streamId)) - return map(URIRef, matches[0]['sources']) + [COLLECTOR] + return list(map(URIRef, matches[0]['sources'])) + [COLLECTOR] @STATS.onPatch.time() def _onPatch(self, source, p, fullGraph=False): @@ -256,7 +262,7 @@ # reduce loops here- prepare all patches at once for h in (self.handlers if handler is None else [handler]): period = 1 - if 'Raspbian' in h.request.headers.get('user-agent'): + if 'Raspbian' in h.request.headers.get('user-agent', ''): period = 5 if h.lastPatchSentTime > now - period: continue @@ -268,7 +274,7 @@ # nice for this service to try to break it up into multiple sends, # although there's no guarantee at all since any single stmt # could be any length. - h.sendEvent(message=jsonFromPatch(p), event='patch') + h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch') h.lastPatchSentTime = now else: log.debug('nothing to send to %s', h) @@ -325,9 +331,9 @@ _handlerSerial = 0 def __init__(self, application, request): cyclone.sse.SSEHandler.__init__(self, application, request) - self.streamId = request.uri[len('/graph/'):] + self.bound = False + self.created = time.time() self.graphClients = self.settings.graphClients - self.created = time.time() self._serial = SomeGraph._handlerSerial SomeGraph._handlerSerial += 1 @@ -345,11 +351,17 @@ 'userAgent': self.request.headers.get('user-agent'), } - def bind(self): + def bind(self, graphPath): + self.streamId = graphPath + self.graphClients.addSseHandler(self) + # If something goes wrong with addSseHandler, I don't want to + # try removeSseHandler. + self.bound = True def unbind(self): - self.graphClients.removeSseHandler(self) + if self.bound: + self.graphClients.removeSseHandler(self) class State(cyclone.web.RequestHandler): @STATS.getState.time() @@ -374,8 +386,7 @@ """) if arg['-v']: - import twisted.python.log - twisted.python.log.startLogging(sys.stdout) + enableTwistedLog() log.setLevel(logging.DEBUG) defer.setDebugging(True)