Mercurial > code > home > repos > collector
changeset 6:a3b6b06fc699
cleanup and type fixes
author | drewp@bigasterisk.com |
---|---|
date | Tue, 29 Mar 2022 22:20:34 -0700 |
parents | 29e10f3a497f |
children | fd73907cef40 |
files | collector.py |
diffstat | 1 files changed, 24 insertions(+), 21 deletions(-) [+] |
line wrap: on
line diff
--- a/collector.py Tue Mar 29 22:11:44 2022 -0700 +++ b/collector.py Tue Mar 29 22:20:34 2022 -0700 @@ -11,18 +11,18 @@ import json import logging import time -from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union) +from typing import (Callable, Dict, List, Optional, Sequence, Set, Tuple, Union) import cyclone.sse import cyclone.web from docopt import docopt from patchablegraph import jsonFromPatch from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource -from prometheus_client import Counter, Gauge, Histogram, Summary +from prometheus_client import Summary from prometheus_client.exposition import generate_latest from prometheus_client.registry import REGISTRY from rdfdb.patch import Patch -from rdflib import Namespace, URIRef +from rdflib import Namespace, URIRef from rdflib.term import Node from standardservice.logsetup import enableTwistedLog, log from twisted.internet import defer, reactor @@ -31,7 +31,8 @@ Statement = Tuple[Node, Node, Node, Node] -#SourceUri = NewType('SourceUri', URIRef) # doesn't work + +# SourceUri = NewType('SourceUri', URIRef) # doesn't work class SourceUri(URIRef): pass @@ -59,12 +60,12 @@ functions that make statements originating from sse_collector itself """ - def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): + def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]): self.applyPatch = applyPatch - self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef + self._sourceState: Dict[SourceUri, Optional[URIRef]] = {} # source: state URIRef @LOCAL_STATEMENTS_PATCH_CALLS.time() - def setSourceState(self, source: SourceUri, state: URIRef): + def setSourceState(self, source: SourceUri, state: Optional[URIRef]): """ add a patch to the COLLECTOR graph about the state of this source. state=None to remove the source. @@ -274,7 +275,7 @@ """ def __init__(self): - self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) + self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {} # (COLLECTOR is not listed) self.handlers: Set[PatchSink] = set() self.statements: ActiveStatements = ActiveStatements() @@ -436,17 +437,19 @@ graphClients = GraphClients() - reactor.listenTCP(9072, - cyclone.web.Application(handlers=[ - (r"/()", cyclone.web.StaticFileHandler, { - "path": ".", - "default_filename": "index.html" - }), - (r'/state', State), - (r'/graph/', GraphList), - (r'/graph/(.+)', PatchSink), - (r'/metrics', Metrics), - ], - graphClients=graphClients), - interface='::') + reactor.listenTCP( + 9072, + cyclone.web.Application( # + handlers=[ + (r"/()", cyclone.web.StaticFileHandler, { + "path": ".", + "default_filename": "index.html" + }), + (r'/state', State), + (r'/graph/', GraphList), + (r'/graph/(.+)', PatchSink), + (r'/metrics', Metrics), + ], + graphClients=graphClients), + interface='::') reactor.run()