comparison merge.py @ 13:bfd95926be6e default tip

initial port to starlette. missing some disconnect & cleanup functionality
author drewp@bigasterisk.com
date Sat, 26 Nov 2022 14:13:51 -0800
parents 032e59be8fe9
children
comparison
equal deleted inserted replaced
12:032e59be8fe9 13:bfd95926be6e
1 import logging
1 import collections 2 import collections
2 from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, NewType 3 from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, NewType
3 from prometheus_client import Summary 4 from prometheus_client import Summary
4 5
5 from rdfdb.patch import Patch 6 from rdfdb.patch import Patch
6 from rdflib import Namespace, URIRef 7 from rdflib import Namespace, URIRef
7 from rdflib.term import Node 8 from rdflib.term import Node
8 from standardservice.logsetup import enableTwistedLog, log 9 log = logging.getLogger()
9 from patchsink import PatchSink
10 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') 10 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls')
11 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') 11 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls')
12 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') 12 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls')
13 13
14 # we deal with PatchSourceResponse in here, but only as objs to __contains__ and compare.
15 from patchsink import PatchSinkResponse
16 OutputHandler = Union[PatchSinkResponse, type(None)] # A patchsink.PatchSinkResponse, but make type error if we try to look in it
17
14 SourceUri = NewType('SourceUri', URIRef) 18 SourceUri = NewType('SourceUri', URIRef)
15 19
16 Statement = Tuple[Node, Node, Node, Node] 20 Statement = Tuple[Node, Node, Node, Node]
17 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] 21 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[OutputHandler]]]
18 22
19 ROOM = Namespace("http://projects.bigasterisk.com/room/") 23 ROOM = Namespace("http://projects.bigasterisk.com/room/")
20 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) 24 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
21 25
22 26
109 def pprintTable(self) -> None: 113 def pprintTable(self) -> None:
110 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): 114 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())):
111 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) 115 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers))
112 116
113 @MAKE_SYNC_PATCH_CALLS.time() 117 @MAKE_SYNC_PATCH_CALLS.time()
114 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): 118 def makeSyncPatch(self, handler: OutputHandler, sources: Set[SourceUri]):
115 # todo: this could run all handlers at once, which is how we 119 # todo: this could run all handlers at once, which is how we
116 # use it anyway 120 # use it anyway
117 adds = [] 121 adds = []
118 dels = [] 122 dels = []
119 123
170 sources.add(source) 174 sources.add(source)
171 newStmts.discard(stmt) 175 newStmts.discard(stmt)
172 176
173 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) 177 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
174 178
175 def discardHandler(self, handler: PatchSink): 179 def discardHandler(self, handler: OutputHandler):
176 with self.postDeleteStatements() as garbage: 180 with self.postDeleteStatements() as garbage:
177 for stmt, (sources, handlers) in self.table.items(): 181 for stmt, (sources, handlers) in self.table.items():
178 handlers.discard(handler) 182 handlers.discard(handler)
179 if not sources and not handlers: 183 if not sources and not handlers:
180 garbage.add(stmt) 184 garbage.add(stmt)