Mercurial > code > home > repos > homeauto
changeset 451:17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
Ignore-this: df20acbf7ec27226f3060f3b5a4c710b
author | drewp@bigasterisk.com |
---|---|
date | Fri, 19 Apr 2019 01:08:01 -0700 |
parents | 5595c447c630 |
children | a8073bcddd8b |
files | service/collector/Dockerfile service/collector/makefile service/collector/mypy.ini service/collector/requirements.txt service/collector/sse_collector.py |
diffstat | 5 files changed, 95 insertions(+), 65 deletions(-) [+] |
line wrap: on
line diff
--- a/service/collector/Dockerfile Fri Apr 19 01:07:29 2019 -0700 +++ b/service/collector/Dockerfile Fri Apr 19 01:08:01 2019 -0700 @@ -8,10 +8,11 @@ RUN pip3 install -Ur requirements.txt # not sure why this doesn't work from inside requirements.txt RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip' +RUN touch /usr/local/lib/python3.6/dist-packages/greplin/__init__.py COPY stubs ./stubs COPY twisted_sse_demo ./twisted_sse_demo -COPY *.py req* ./ +COPY *.py req* *.ini ./ EXPOSE 9072
--- a/service/collector/makefile Fri Apr 19 01:07:29 2019 -0700 +++ b/service/collector/makefile Fri Apr 19 01:08:01 2019 -0700 @@ -6,14 +6,19 @@ build_image: rm -rf tmp_ctx mkdir -p tmp_ctx - cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* stubs tmp_ctx + cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* *.ini stubs tmp_ctx docker build --network=host -t ${TAG} tmp_ctx push_image: build_image docker push ${TAG} shell: build_image - docker run --rm -it --cap-add SYS_PTRACE --net=host ${TAG} /bin/bash + docker run --rm -it --cap-add SYS_PTRACE \ + --name $(JOB)_shell \ + --net=host \ + -v `pwd`/.mypy_cache:/opt/.mypy_cache \ + -v `pwd`/sse_collector.py:/opt/sse_collector.py \ + ${TAG} /bin/bash local_run: build_image docker run --rm -it -p ${PORT}:${PORT} \ @@ -40,9 +45,9 @@ typecheck: build_image docker run --rm -it -p ${PORT}:${PORT} \ --net=host \ - -e=MYPY=/usr/local/lib/python3.6/dist-packages:stubs \ + -v `pwd`/.mypy_cache:/opt/.mypy_cache \ ${TAG} \ - /usr/local/bin/mypy --python-executable /usr/bin/python3 --no-implicit-optional --ignore-missing-imports sse_collector.py + /usr/local/bin/mypy -m sse_collector -m export_to_influxdb -m logsetup -m patchablegraph -m patchsource -m rdfdb.patch redeploy: push_image
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/collector/mypy.ini Fri Apr 19 01:08:01 2019 -0700 @@ -0,0 +1,7 @@ +[mypy] +no_implicit_optional = True +#ignore_missing_imports = True +python_executable = /usr/bin/python3 +warn_unused_configs = True +warn_return_any = True +mypy_path = /opt/stubs:/usr/local/lib/python3.6/dist-packages/ \ No newline at end of file
--- a/service/collector/requirements.txt Fri Apr 19 01:07:29 2019 -0700 +++ b/service/collector/requirements.txt Fri Apr 19 01:08:01 2019 -0700 @@ -1,7 +1,7 @@ docopt ipdb -service_identity -twisted +service_identity==18.1.0 +twisted==19.2.0 py-spy mypy @@ -11,3 +11,4 @@ git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales https://projects.bigasterisk.com/rdfdb/rdfdb-0.8.0.tar.gz https://github.com/drewp/cyclone/archive/python3.zip +influxdb==5.2.2
--- a/service/collector/sse_collector.py Fri Apr 19 01:07:29 2019 -0700 +++ b/service/collector/sse_collector.py Fri Apr 19 01:08:01 2019 -0700 @@ -7,17 +7,19 @@ - filter out unneeded stmts from the sources - give a time resolution and concatenate any patches that come faster than that res """ -import logging, collections, json, time -from twisted.internet import reactor, defer -import cyclone.web, cyclone.sse -from rdflib import URIRef, Namespace from docopt import docopt from greplin import scales from greplin.scales.cyclonehandler import StatsHandler +from rdflib import Namespace, URIRef, StatementType +from rdflib.term import Node +from twisted.internet import reactor, defer +from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional +import cyclone.web, cyclone.sse +import logging, collections, json, time + from logsetup import log, enableTwistedLog from patchablegraph import jsonFromPatch from rdfdb.patch import Patch -from typing import Callable, Dict, NewType # workaround for broken import in twisted_sse_demo/eventsourcee.py import sys; sys.path.append('twisted_sse_demo') @@ -25,11 +27,12 @@ from sse_collector_config import config -SourceUri = NewType('SourceUri', URIRef) +#SourceUri = NewType('SourceUri', URIRef) # doesn't work +class SourceUri(URIRef): pass ROOM = Namespace("http://projects.bigasterisk.com/room/") -COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') +COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) STATS = scales.collection('/root', scales.PmfStat('getState'), @@ -44,12 +47,12 @@ """ functions that make statements originating from sse_collector itself """ - def __init__(self, applyPatch: Callable[[Patch], None]): + def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): self.applyPatch = applyPatch self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef @STATS.localStatementsPatch.time() - def setSourceState(self, source, state): + def setSourceState(self, source: SourceUri, state: URIRef): """ add a patch to the COLLECTOR graph about the state of this source. state=None to remove the source. @@ -80,44 +83,54 @@ (source, ROOM['state'], oldState, COLLECTOR), ])) -def abbrevTerm(t): +def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: if isinstance(t, URIRef): return (t.replace('http://projects.bigasterisk.com/room/', 'room:') .replace('http://projects.bigasterisk.com/device/', 'dev:') .replace('http://bigasterisk.com/sse_collector/', 'sc:')) return t -def abbrevStmt(stmt): - return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) - +def abbrevStmt(stmt: StatementType) -> str: + t = tuple(map(abbrevTerm, stmt)) + return '(%s %s %s %s)' % (t[0], t[1], t[2], t[3]) + +StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[SomeGraph]]] + + +class PostDeleter(object): + def __init__(self, statements: StatementTable): + self.statements = statements + + def __enter__(self): + self._garbage: List[StatementType] = [] + return self + + def add(self, stmt: StatementType): + self._garbage.append(stmt) + + def __exit__(self, type, value, traceback): + if type is not None: + raise + for stmt in self._garbage: + del self.statements[stmt] + + class ActiveStatements(object): def __init__(self): # This table holds statements asserted by any of our sources # plus local statements that we introduce (source is # http://bigasterisk.com/sse_collector/). - self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers) + self.statements: StatementTable = collections.defaultdict(lambda: (set(), set())) - def state(self): + def state(self) -> Dict: return { 'len': len(self.statements), } - def _postDeleteStatements(self): - statements = self.statements - class PostDeleter(object): - def __enter__(self): - self._garbage = [] - return self - def add(self, stmt): - self._garbage.append(stmt) - def __exit__(self, type, value, traceback): - if type is not None: - raise - for stmt in self._garbage: - del statements[stmt] - return PostDeleter() + def _postDeleteStatements(self) -> PostDeleter: + return PostDeleter(self.statements) - def pprintTable(self): + def pprintTable(self) -> None: for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): print("%03d. %-80s from %s to %s" % ( i, @@ -126,15 +139,14 @@ handlers)) @STATS.makeSyncPatch.time() - def makeSyncPatch(self, handler, sources): + def makeSyncPatch(self, handler: SomeGraph, sources: Set[SourceUri]): # todo: this could run all handlers at once, which is how we use it anyway adds = [] dels = [] - sources_set = set(sources) with self._postDeleteStatements() as garbage: for stmt, (stmtSources, handlers) in self.statements.items(): - belongsInHandler = not sources_set.isdisjoint(stmtSources) + belongsInHandler = not sources.isdisjoint(stmtSources) handlerHasIt = handler in handlers log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) if belongsInHandler and not handlerHasIt: @@ -148,7 +160,7 @@ return Patch(addQuads=adds, delQuads=dels) - def applySourcePatch(self, source, p): + def applySourcePatch(self, source: SourceUri, p: Patch): for stmt in p.addQuads: sourceUrls, handlers = self.statements[stmt] if source in sourceUrls: @@ -170,7 +182,7 @@ garbage.add(stmt) @STATS.replaceSourceStatements.time() - def replaceSourceStatements(self, source, stmts): + def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[StatementType]): log.debug('replaceSourceStatements with %s stmts', len(stmts)) newStmts = set(stmts) @@ -188,19 +200,20 @@ self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) - def discardHandler(self, handler): + def discardHandler(self, handler: SomeGraph): with self._postDeleteStatements() as garbage: for stmt, (sources, handlers) in self.statements.items(): handlers.discard(handler) if not sources and not handlers: garbage.add(stmt) - def discardSource(self, source): + def discardSource(self, source: SourceUri): with self._postDeleteStatements() as garbage: for stmt, (sources, handlers) in self.statements.items(): sources.discard(source) if not sources and not handlers: garbage.add(stmt) + class GraphClients(object): """ @@ -212,28 +225,28 @@ statements come and go, we make patches to send to requesters. """ def __init__(self): - self.clients = {} # url: PatchSource (COLLECTOR is not listed) - self.handlers = set() # handler - self.statements = ActiveStatements() + self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) + self.handlers: Set[SomeGraph] = set() + self.statements: ActiveStatements = ActiveStatements() self._localStatements = LocalStatements(self._onPatch) - def state(self): + def state(self) -> Dict: return { 'clients': [ps.state() for ps in self.clients.values()], 'sseHandlers': [h.state() for h in self.handlers], 'statements': self.statements.state(), } - def _sourcesForHandler(self, handler): + def _sourcesForHandler(self, handler: SomeGraph) -> List[SourceUri]: streamId = handler.streamId matches = [s for s in config['streams'] if s['id'] == streamId] if len(matches) != 1: raise ValueError("%s matches for %r" % (len(matches), streamId)) - return list(map(URIRef, matches[0]['sources'])) + [COLLECTOR] + return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] @STATS.onPatch.time() - def _onPatch(self, source, p, fullGraph=False): + def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False): if fullGraph: # a reconnect may need to resend the full graph even # though we've already sent some statements @@ -253,7 +266,7 @@ ROOM['patchesReceived']) @STATS.sendUpdatePatch.time() - def _sendUpdatePatch(self, handler=None): + def _sendUpdatePatch(self, handler: Optional[SomeGraph]=None): """ send a patch event out this handler to bring it up to date with self.statements @@ -279,7 +292,7 @@ else: log.debug('nothing to send to %s', h) - def addSseHandler(self, handler): + def addSseHandler(self, handler: SomeGraph): log.info('addSseHandler %r %r', handler, handler.streamId) # fail early if id doesn't match @@ -301,7 +314,7 @@ self._sendUpdatePatch(handler) - def removeSseHandler(self, handler): + def removeSseHandler(self, handler: SomeGraph): log.info('removeSseHandler %r', handler) self.statements.discardHandler(handler) for source in self._sourcesForHandler(handler): @@ -314,7 +327,7 @@ self.handlers.remove(handler) - def _stopClient(self, url): + def _stopClient(self, url: SourceUri): if url == COLLECTOR: return @@ -329,7 +342,7 @@ class SomeGraph(cyclone.sse.SSEHandler): _handlerSerial = 0 - def __init__(self, application, request): + def __init__(self, application: cyclone.web.Application, request): cyclone.sse.SSEHandler.__init__(self, application, request) self.bound = False self.created = time.time() @@ -337,12 +350,12 @@ self._serial = SomeGraph._handlerSerial SomeGraph._handlerSerial += 1 - self.lastPatchSentTime = 0 + self.lastPatchSentTime: float = 0.0 - def __repr__(self): + def __repr__(self) -> str: return '<Handler #%s>' % self._serial - def state(self): + def state(self) -> Dict: return { 'created': round(self.created, 2), 'ageHours': round((time.time() - self.created) / 3600, 2), @@ -351,21 +364,22 @@ 'userAgent': self.request.headers.get('user-agent'), } - def bind(self, graphPath): - self.streamId = graphPath + def bind(self, *args, **kwargs): + self.streamId = args[0] self.graphClients.addSseHandler(self) # If something goes wrong with addSseHandler, I don't want to # try removeSseHandler. self.bound = True - def unbind(self): + def unbind(self) -> None: if self.bound: self.graphClients.removeSseHandler(self) + class State(cyclone.web.RequestHandler): @STATS.getState.time() - def get(self): + def get(self) -> None: try: state = self.settings.graphClients.state() except: @@ -374,9 +388,11 @@ self.write(json.dumps({'graphClients': state}, indent=2)) + class Root(cyclone.web.RequestHandler): - def get(self): + def get(self) -> None: self.write('<html><body>sse_collector</body></html>') + if __name__ == '__main__': arg = docopt(""" @@ -395,7 +411,7 @@ #exporter = InfluxExporter(... to export some stats values reactor.listenTCP( - 19072, + 9072, cyclone.web.Application( handlers=[ (r'/', Root),