Mercurial > code > home > repos > collector
comparison merge.py @ 13:bfd95926be6e default tip
initial port to starlette. missing some disconnect & cleanup functionality
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Nov 2022 14:13:51 -0800 |
parents | 032e59be8fe9 |
children |
comparison
equal
deleted
inserted
replaced
12:032e59be8fe9 | 13:bfd95926be6e |
---|---|
1 import logging | |
1 import collections | 2 import collections |
2 from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, NewType | 3 from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, NewType |
3 from prometheus_client import Summary | 4 from prometheus_client import Summary |
4 | 5 |
5 from rdfdb.patch import Patch | 6 from rdfdb.patch import Patch |
6 from rdflib import Namespace, URIRef | 7 from rdflib import Namespace, URIRef |
7 from rdflib.term import Node | 8 from rdflib.term import Node |
8 from standardservice.logsetup import enableTwistedLog, log | 9 log = logging.getLogger() |
9 from patchsink import PatchSink | |
10 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') | 10 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') |
11 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') | 11 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') |
12 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') | 12 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') |
13 | 13 |
14 # we deal with PatchSourceResponse in here, but only as objs to __contains__ and compare. | |
15 from patchsink import PatchSinkResponse | |
16 OutputHandler = Union[PatchSinkResponse, type(None)] # A patchsink.PatchSinkResponse, but make type error if we try to look in it | |
17 | |
14 SourceUri = NewType('SourceUri', URIRef) | 18 SourceUri = NewType('SourceUri', URIRef) |
15 | 19 |
16 Statement = Tuple[Node, Node, Node, Node] | 20 Statement = Tuple[Node, Node, Node, Node] |
17 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] | 21 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[OutputHandler]]] |
18 | 22 |
19 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 23 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
20 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) | 24 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) |
21 | 25 |
22 | 26 |
109 def pprintTable(self) -> None: | 113 def pprintTable(self) -> None: |
110 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): | 114 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): |
111 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) | 115 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) |
112 | 116 |
113 @MAKE_SYNC_PATCH_CALLS.time() | 117 @MAKE_SYNC_PATCH_CALLS.time() |
114 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): | 118 def makeSyncPatch(self, handler: OutputHandler, sources: Set[SourceUri]): |
115 # todo: this could run all handlers at once, which is how we | 119 # todo: this could run all handlers at once, which is how we |
116 # use it anyway | 120 # use it anyway |
117 adds = [] | 121 adds = [] |
118 dels = [] | 122 dels = [] |
119 | 123 |
170 sources.add(source) | 174 sources.add(source) |
171 newStmts.discard(stmt) | 175 newStmts.discard(stmt) |
172 | 176 |
173 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) | 177 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) |
174 | 178 |
175 def discardHandler(self, handler: PatchSink): | 179 def discardHandler(self, handler: OutputHandler): |
176 with self.postDeleteStatements() as garbage: | 180 with self.postDeleteStatements() as garbage: |
177 for stmt, (sources, handlers) in self.table.items(): | 181 for stmt, (sources, handlers) in self.table.items(): |
178 handlers.discard(handler) | 182 handlers.discard(handler) |
179 if not sources and not handlers: | 183 if not sources and not handlers: |
180 garbage.add(stmt) | 184 garbage.add(stmt) |