Mercurial > code > home > repos > collector
comparison collector.py @ 6:a3b6b06fc699
cleanup and type fixes
author | drewp@bigasterisk.com |
---|---|
date | Tue, 29 Mar 2022 22:20:34 -0700 |
parents | 1275220a644b |
children | 36471461685f |
comparison
equal
deleted
inserted
replaced
5:29e10f3a497f | 6:a3b6b06fc699 |
---|---|
9 """ | 9 """ |
10 import collections | 10 import collections |
11 import json | 11 import json |
12 import logging | 12 import logging |
13 import time | 13 import time |
14 from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union) | 14 from typing import (Callable, Dict, List, Optional, Sequence, Set, Tuple, Union) |
15 | 15 |
16 import cyclone.sse | 16 import cyclone.sse |
17 import cyclone.web | 17 import cyclone.web |
18 from docopt import docopt | 18 from docopt import docopt |
19 from patchablegraph import jsonFromPatch | 19 from patchablegraph import jsonFromPatch |
20 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource | 20 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource |
21 from prometheus_client import Counter, Gauge, Histogram, Summary | 21 from prometheus_client import Summary |
22 from prometheus_client.exposition import generate_latest | 22 from prometheus_client.exposition import generate_latest |
23 from prometheus_client.registry import REGISTRY | 23 from prometheus_client.registry import REGISTRY |
24 from rdfdb.patch import Patch | 24 from rdfdb.patch import Patch |
25 from rdflib import Namespace, URIRef | 25 from rdflib import Namespace, URIRef |
26 from rdflib.term import Node | 26 from rdflib.term import Node |
27 from standardservice.logsetup import enableTwistedLog, log | 27 from standardservice.logsetup import enableTwistedLog, log |
28 from twisted.internet import defer, reactor | 28 from twisted.internet import defer, reactor |
29 | 29 |
30 from collector_config import config | 30 from collector_config import config |
31 | 31 |
32 Statement = Tuple[Node, Node, Node, Node] | 32 Statement = Tuple[Node, Node, Node, Node] |
33 | 33 |
34 #SourceUri = NewType('SourceUri', URIRef) # doesn't work | 34 |
35 # SourceUri = NewType('SourceUri', URIRef) # doesn't work | |
35 class SourceUri(URIRef): | 36 class SourceUri(URIRef): |
36 pass | 37 pass |
37 | 38 |
38 | 39 |
39 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 40 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
57 class LocalStatements(object): | 58 class LocalStatements(object): |
58 """ | 59 """ |
59 functions that make statements originating from sse_collector itself | 60 functions that make statements originating from sse_collector itself |
60 """ | 61 """ |
61 | 62 |
62 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): | 63 def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]): |
63 self.applyPatch = applyPatch | 64 self.applyPatch = applyPatch |
64 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef | 65 self._sourceState: Dict[SourceUri, Optional[URIRef]] = {} # source: state URIRef |
65 | 66 |
66 @LOCAL_STATEMENTS_PATCH_CALLS.time() | 67 @LOCAL_STATEMENTS_PATCH_CALLS.time() |
67 def setSourceState(self, source: SourceUri, state: URIRef): | 68 def setSourceState(self, source: SourceUri, state: Optional[URIRef]): |
68 """ | 69 """ |
69 add a patch to the COLLECTOR graph about the state of this | 70 add a patch to the COLLECTOR graph about the state of this |
70 source. state=None to remove the source. | 71 source. state=None to remove the source. |
71 """ | 72 """ |
72 oldState = self._sourceState.get(source, None) | 73 oldState = self._sourceState.get(source, None) |
272 asserting them and the requesters who currently know them. As | 273 asserting them and the requesters who currently know them. As |
273 statements come and go, we make patches to send to requesters. | 274 statements come and go, we make patches to send to requesters. |
274 """ | 275 """ |
275 | 276 |
276 def __init__(self): | 277 def __init__(self): |
277 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) | 278 self.clients: Dict[SourceUri, Union[PatchSource, ReconnectingPatchSource]] = {} # (COLLECTOR is not listed) |
278 self.handlers: Set[PatchSink] = set() | 279 self.handlers: Set[PatchSink] = set() |
279 self.statements: ActiveStatements = ActiveStatements() | 280 self.statements: ActiveStatements = ActiveStatements() |
280 | 281 |
281 self._localStatements = LocalStatements(self._onPatch) | 282 self._localStatements = LocalStatements(self._onPatch) |
282 | 283 |
434 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) | 435 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) |
435 defer.setDebugging(True) | 436 defer.setDebugging(True) |
436 | 437 |
437 graphClients = GraphClients() | 438 graphClients = GraphClients() |
438 | 439 |
439 reactor.listenTCP(9072, | 440 reactor.listenTCP( |
440 cyclone.web.Application(handlers=[ | 441 9072, |
441 (r"/()", cyclone.web.StaticFileHandler, { | 442 cyclone.web.Application( # |
442 "path": ".", | 443 handlers=[ |
443 "default_filename": "index.html" | 444 (r"/()", cyclone.web.StaticFileHandler, { |
444 }), | 445 "path": ".", |
445 (r'/state', State), | 446 "default_filename": "index.html" |
446 (r'/graph/', GraphList), | 447 }), |
447 (r'/graph/(.+)', PatchSink), | 448 (r'/state', State), |
448 (r'/metrics', Metrics), | 449 (r'/graph/', GraphList), |
449 ], | 450 (r'/graph/(.+)', PatchSink), |
450 graphClients=graphClients), | 451 (r'/metrics', Metrics), |
451 interface='::') | 452 ], |
453 graphClients=graphClients), | |
454 interface='::') | |
452 reactor.run() | 455 reactor.run() |