comparison collector.py @ 6:a3b6b06fc699

cleanup and type fixes
author drewp@bigasterisk.com
date Tue, 29 Mar 2022 22:20:34 -0700
parents 1275220a644b
children 36471461685f
comparison
equal deleted inserted replaced
5:29e10f3a497f 6:a3b6b06fc699
9 """ 9 """
10 import collections 10 import collections
11 import json 11 import json
12 import logging 12 import logging
13 import time 13 import time
14 from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union) 14 from typing import (Callable, Dict, List, Optional, Sequence, Set, Tuple, Union)
15 15
16 import cyclone.sse 16 import cyclone.sse
17 import cyclone.web 17 import cyclone.web
18 from docopt import docopt 18 from docopt import docopt
19 from patchablegraph import jsonFromPatch 19 from patchablegraph import jsonFromPatch
20 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource 20 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource
21 from prometheus_client import Counter, Gauge, Histogram, Summary 21 from prometheus_client import Summary
22 from prometheus_client.exposition import generate_latest 22 from prometheus_client.exposition import generate_latest
23 from prometheus_client.registry import REGISTRY 23 from prometheus_client.registry import REGISTRY
24 from rdfdb.patch import Patch 24 from rdfdb.patch import Patch
25 from rdflib import Namespace, URIRef 25 from rdflib import Namespace, URIRef
26 from rdflib.term import Node 26 from rdflib.term import Node
27 from standardservice.logsetup import enableTwistedLog, log 27 from standardservice.logsetup import enableTwistedLog, log
28 from twisted.internet import defer, reactor 28 from twisted.internet import defer, reactor
29 29
30 from collector_config import config 30 from collector_config import config
31 31
32 Statement = Tuple[Node, Node, Node, Node] 32 Statement = Tuple[Node, Node, Node, Node]
33 33
34 #SourceUri = NewType('SourceUri', URIRef) # doesn't work 34
35 # SourceUri = NewType('SourceUri', URIRef) # doesn't work
35 class SourceUri(URIRef): 36 class SourceUri(URIRef):
36 pass 37 pass
37 38
38 39
39 ROOM = Namespace("http://projects.bigasterisk.com/room/") 40 ROOM = Namespace("http://projects.bigasterisk.com/room/")
57 class LocalStatements(object): 58 class LocalStatements(object):
58 """ 59 """
59 functions that make statements originating from sse_collector itself 60 functions that make statements originating from sse_collector itself
60 """ 61 """
61 62
62 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): 63 def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]):
63 self.applyPatch = applyPatch 64 self.applyPatch = applyPatch
64 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef 65 self._sourceState: Dict[SourceUri, Optional[URIRef]] = {} # source: state URIRef
65 66
66 @LOCAL_STATEMENTS_PATCH_CALLS.time() 67 @LOCAL_STATEMENTS_PATCH_CALLS.time()
67 def setSourceState(self, source: SourceUri, state: URIRef): 68 def setSourceState(self, source: SourceUri, state: Optional[URIRef]):
68 """ 69 """
69 add a patch to the COLLECTOR graph about the state of this 70 add a patch to the COLLECTOR graph about the state of this
70 source. state=None to remove the source. 71 source. state=None to remove the source.
71 """ 72 """
72 oldState = self._sourceState.get(source, None) 73 oldState = self._sourceState.get(source, None)
272 asserting them and the requesters who currently know them. As 273 asserting them and the requesters who currently know them. As
273 statements come and go, we make patches to send to requesters. 274 statements come and go, we make patches to send to requesters.
274 """ 275 """
275 276
276 def __init__(self): 277 def __init__(self):
277 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) 278 self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {} # (COLLECTOR is not listed)
278 self.handlers: Set[PatchSink] = set() 279 self.handlers: Set[PatchSink] = set()
279 self.statements: ActiveStatements = ActiveStatements() 280 self.statements: ActiveStatements = ActiveStatements()
280 281
281 self._localStatements = LocalStatements(self._onPatch) 282 self._localStatements = LocalStatements(self._onPatch)
282 283
434 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) 435 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO)
435 defer.setDebugging(True) 436 defer.setDebugging(True)
436 437
437 graphClients = GraphClients() 438 graphClients = GraphClients()
438 439
439 reactor.listenTCP(9072, 440 reactor.listenTCP(
440 cyclone.web.Application(handlers=[ 441 9072,
441 (r"/()", cyclone.web.StaticFileHandler, { 442 cyclone.web.Application( #
442 "path": ".", 443 handlers=[
443 "default_filename": "index.html" 444 (r"/()", cyclone.web.StaticFileHandler, {
444 }), 445 "path": ".",
445 (r'/state', State), 446 "default_filename": "index.html"
446 (r'/graph/', GraphList), 447 }),
447 (r'/graph/(.+)', PatchSink), 448 (r'/state', State),
448 (r'/metrics', Metrics), 449 (r'/graph/', GraphList),
449 ], 450 (r'/graph/(.+)', PatchSink),
450 graphClients=graphClients), 451 (r'/metrics', Metrics),
451 interface='::') 452 ],
453 graphClients=graphClients),
454 interface='::')
452 reactor.run() 455 reactor.run()