Mercurial > code > home > repos > collector
comparison merge.py @ 12:032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
author | drewp@bigasterisk.com |
---|---|
date | Fri, 25 Nov 2022 20:58:08 -0800 |
parents | |
children | bfd95926be6e |
comparison
equal
deleted
inserted
replaced
11:baf886e01ed1 | 12:032e59be8fe9 |
---|---|
1 import collections | |
2 from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, NewType | |
3 from prometheus_client import Summary | |
4 | |
5 from rdfdb.patch import Patch | |
6 from rdflib import Namespace, URIRef | |
7 from rdflib.term import Node | |
8 from standardservice.logsetup import enableTwistedLog, log | |
9 from patchsink import PatchSink | |
10 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') | |
11 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') | |
12 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') | |
13 | |
14 SourceUri = NewType('SourceUri', URIRef) | |
15 | |
16 Statement = Tuple[Node, Node, Node, Node] | |
17 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] | |
18 | |
19 ROOM = Namespace("http://projects.bigasterisk.com/room/") | |
20 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) | |
21 | |
22 | |
23 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: | |
24 if isinstance(t, URIRef): | |
25 return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/', | |
26 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:')) | |
27 return t | |
28 | |
29 | |
30 def abbrevStmt(stmt: Statement) -> str: | |
31 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) | |
32 | |
33 | |
34 class LocalStatements(object): | |
35 """ | |
36 functions that make statements originating from sse_collector itself | |
37 """ | |
38 | |
39 def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]): | |
40 self.applyPatch = applyPatch | |
41 self._sourceState: Dict[SourceUri, Optional[URIRef]] = {} # source: state URIRef | |
42 | |
43 @LOCAL_STATEMENTS_PATCH_CALLS.time() | |
44 def setSourceState(self, source: SourceUri, state: Optional[URIRef]): | |
45 """ | |
46 add a patch to the COLLECTOR graph about the state of this | |
47 source. state=None to remove the source. | |
48 """ | |
49 oldState = self._sourceState.get(source, None) | |
50 if state == oldState: | |
51 return | |
52 log.info('source state %s -> %s', source, state) | |
53 if oldState is None: | |
54 self._sourceState[source] = state | |
55 self.applyPatch(COLLECTOR, Patch(addQuads=[ | |
56 (COLLECTOR, ROOM['source'], source, COLLECTOR), | |
57 (source, ROOM['state'], state, COLLECTOR), | |
58 ])) | |
59 elif state is None: | |
60 del self._sourceState[source] | |
61 self.applyPatch(COLLECTOR, Patch(delQuads=[ | |
62 (COLLECTOR, ROOM['source'], source, COLLECTOR), | |
63 (source, ROOM['state'], oldState, COLLECTOR), | |
64 ])) | |
65 else: | |
66 self._sourceState[source] = state | |
67 self.applyPatch(COLLECTOR, Patch(addQuads=[ | |
68 (source, ROOM['state'], state, COLLECTOR), | |
69 ], delQuads=[ | |
70 (source, ROOM['state'], oldState, COLLECTOR), | |
71 ])) | |
72 | |
73 | |
74 class PostDeleter(object): | |
75 | |
76 def __init__(self, statements: StatementTable): | |
77 self.statements = statements | |
78 | |
79 def __enter__(self): | |
80 self._garbage: List[Statement] = [] | |
81 return self | |
82 | |
83 def add(self, stmt: Statement): | |
84 self._garbage.append(stmt) | |
85 | |
86 def __exit__(self, type, value, traceback): | |
87 if type is not None: | |
88 raise NotImplementedError() | |
89 for stmt in self._garbage: | |
90 del self.statements[stmt] | |
91 | |
92 | |
93 class ActiveStatements(object): | |
94 | |
95 def __init__(self): | |
96 # This table holds statements asserted by any of our sources | |
97 # plus local statements that we introduce (source is | |
98 # http://bigasterisk.com/sse_collector/). | |
99 self.table: StatementTable = collections.defaultdict(lambda: (set(), set())) | |
100 | |
101 def state(self) -> Dict: | |
102 return { | |
103 'len': len(self.table), | |
104 } | |
105 | |
106 def postDeleteStatements(self) -> PostDeleter: | |
107 return PostDeleter(self.table) | |
108 | |
109 def pprintTable(self) -> None: | |
110 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): | |
111 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) | |
112 | |
113 @MAKE_SYNC_PATCH_CALLS.time() | |
114 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): | |
115 # todo: this could run all handlers at once, which is how we | |
116 # use it anyway | |
117 adds = [] | |
118 dels = [] | |
119 | |
120 with self.postDeleteStatements() as garbage: | |
121 for stmt, (stmtSources, handlers) in self.table.items(): | |
122 belongsInHandler = not sources.isdisjoint(stmtSources) | |
123 handlerHasIt = handler in handlers | |
124 # log.debug("%s belong=%s has=%s", | |
125 # abbrevStmt(stmt), belongsInHandler, handlerHasIt) | |
126 if belongsInHandler and not handlerHasIt: | |
127 adds.append(stmt) | |
128 handlers.add(handler) | |
129 elif not belongsInHandler and handlerHasIt: | |
130 dels.append(stmt) | |
131 handlers.remove(handler) | |
132 if not handlers and not stmtSources: | |
133 garbage.add(stmt) | |
134 | |
135 return Patch(addQuads=adds, delQuads=dels) | |
136 | |
137 def applySourcePatch(self, source: SourceUri, p: Patch): | |
138 for stmt in p.addQuads: | |
139 sourceUrls, handlers = self.table[stmt] | |
140 if source in sourceUrls: | |
141 raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) | |
142 sourceUrls.add(source) | |
143 | |
144 with self.postDeleteStatements() as garbage: | |
145 for stmt in p.delQuads: | |
146 sourceUrls, handlers = self.table[stmt] | |
147 if source not in sourceUrls: | |
148 raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt))) | |
149 sourceUrls.remove(source) | |
150 # this is rare, since some handler probably still has | |
151 # the stmt we're deleting, but it can happen e.g. when | |
152 # a handler was just deleted | |
153 if not sourceUrls and not handlers: | |
154 garbage.add(stmt) | |
155 | |
156 @REPLACE_SOURCE_STATEMENTS_CALLS.time() | |
157 def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]): | |
158 log.debug('replaceSourceStatements with %s stmts', len(stmts)) | |
159 newStmts = set(stmts) | |
160 | |
161 with self.postDeleteStatements() as garbage: | |
162 for stmt, (sources, handlers) in self.table.items(): | |
163 if source in sources: | |
164 if stmt not in stmts: | |
165 sources.remove(source) | |
166 if not sources and not handlers: | |
167 garbage.add(stmt) | |
168 else: | |
169 if stmt in stmts: | |
170 sources.add(source) | |
171 newStmts.discard(stmt) | |
172 | |
173 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) | |
174 | |
175 def discardHandler(self, handler: PatchSink): | |
176 with self.postDeleteStatements() as garbage: | |
177 for stmt, (sources, handlers) in self.table.items(): | |
178 handlers.discard(handler) | |
179 if not sources and not handlers: | |
180 garbage.add(stmt) | |
181 | |
182 def discardSource(self, source: SourceUri): | |
183 with self.postDeleteStatements() as garbage: | |
184 for stmt, (sources, handlers) in self.table.items(): | |
185 sources.discard(source) | |
186 if not sources and not handlers: | |
187 garbage.add(stmt) |