changeset 6:a3b6b06fc699

cleanup and type fixes
author drewp@bigasterisk.com
date Tue, 29 Mar 2022 22:20:34 -0700
parents 29e10f3a497f
children fd73907cef40
files collector.py
diffstat 1 files changed, 24 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/collector.py	Tue Mar 29 22:11:44 2022 -0700
+++ b/collector.py	Tue Mar 29 22:20:34 2022 -0700
@@ -11,18 +11,18 @@
 import json
 import logging
 import time
-from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union)
+from typing import (Callable, Dict, List, Optional, Sequence, Set, Tuple, Union)
 
 import cyclone.sse
 import cyclone.web
 from docopt import docopt
 from patchablegraph import jsonFromPatch
 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource
-from prometheus_client import Counter, Gauge, Histogram, Summary
+from prometheus_client import Summary
 from prometheus_client.exposition import generate_latest
 from prometheus_client.registry import REGISTRY
 from rdfdb.patch import Patch
-from rdflib import Namespace,  URIRef
+from rdflib import Namespace, URIRef
 from rdflib.term import Node
 from standardservice.logsetup import enableTwistedLog, log
 from twisted.internet import defer, reactor
@@ -31,7 +31,8 @@
 
 Statement = Tuple[Node, Node, Node, Node]
 
-#SourceUri = NewType('SourceUri', URIRef) # doesn't work
+
+# SourceUri = NewType('SourceUri', URIRef) # doesn't work
 class SourceUri(URIRef):
     pass
 
@@ -59,12 +60,12 @@
     functions that make statements originating from sse_collector itself
     """
 
-    def __init__(self, applyPatch: Callable[[URIRef, Patch], None]):
+    def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]):
         self.applyPatch = applyPatch
-        self._sourceState: Dict[SourceUri, URIRef] = {}  # source: state URIRef
+        self._sourceState: Dict[SourceUri, Optional[URIRef]] = {}  # source: state URIRef
 
     @LOCAL_STATEMENTS_PATCH_CALLS.time()
-    def setSourceState(self, source: SourceUri, state: URIRef):
+    def setSourceState(self, source: SourceUri, state: Optional[URIRef]):
         """
         add a patch to the COLLECTOR graph about the state of this
         source. state=None to remove the source.
@@ -274,7 +275,7 @@
     """
 
     def __init__(self):
-        self.clients: Dict[SourceUri, PatchSource] = {}  # (COLLECTOR is not listed)
+        self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {}  # (COLLECTOR is not listed)
         self.handlers: Set[PatchSink] = set()
         self.statements: ActiveStatements = ActiveStatements()
 
@@ -436,17 +437,19 @@
 
     graphClients = GraphClients()
 
-    reactor.listenTCP(9072,
-                      cyclone.web.Application(handlers=[
-                          (r"/()", cyclone.web.StaticFileHandler, {
-                              "path": ".",
-                              "default_filename": "index.html"
-                          }),
-                          (r'/state', State),
-                          (r'/graph/', GraphList),
-                          (r'/graph/(.+)', PatchSink),
-                          (r'/metrics', Metrics),
-                      ],
-                                              graphClients=graphClients),
-                      interface='::')
+    reactor.listenTCP(
+        9072,
+        cyclone.web.Application(  #
+            handlers=[
+                (r"/()", cyclone.web.StaticFileHandler, {
+                    "path": ".",
+                    "default_filename": "index.html"
+                }),
+                (r'/state', State),
+                (r'/graph/', GraphList),
+                (r'/graph/(.+)', PatchSink),
+                (r'/metrics', Metrics),
+            ],
+            graphClients=graphClients),
+        interface='::')
     reactor.run()