comparison merge.py @ 12:032e59be8fe9

refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
author drewp@bigasterisk.com
date Fri, 25 Nov 2022 20:58:08 -0800
parents
children bfd95926be6e
comparison
equal deleted inserted replaced
11:baf886e01ed1 12:032e59be8fe9
1 import collections
2 from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, NewType
3 from prometheus_client import Summary
4
5 from rdfdb.patch import Patch
6 from rdflib import Namespace, URIRef
7 from rdflib.term import Node
8 from standardservice.logsetup import enableTwistedLog, log
9 from patchsink import PatchSink
10 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls')
11 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls')
12 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls')
13
14 SourceUri = NewType('SourceUri', URIRef)
15
16 Statement = Tuple[Node, Node, Node, Node]
17 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]]
18
19 ROOM = Namespace("http://projects.bigasterisk.com/room/")
20 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
21
22
23 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]:
24 if isinstance(t, URIRef):
25 return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/',
26 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:'))
27 return t
28
29
30 def abbrevStmt(stmt: Statement) -> str:
31 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3]))
32
33
34 class LocalStatements(object):
35 """
36 functions that make statements originating from sse_collector itself
37 """
38
39 def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]):
40 self.applyPatch = applyPatch
41 self._sourceState: Dict[SourceUri, Optional[URIRef]] = {} # source: state URIRef
42
43 @LOCAL_STATEMENTS_PATCH_CALLS.time()
44 def setSourceState(self, source: SourceUri, state: Optional[URIRef]):
45 """
46 add a patch to the COLLECTOR graph about the state of this
47 source. state=None to remove the source.
48 """
49 oldState = self._sourceState.get(source, None)
50 if state == oldState:
51 return
52 log.info('source state %s -> %s', source, state)
53 if oldState is None:
54 self._sourceState[source] = state
55 self.applyPatch(COLLECTOR, Patch(addQuads=[
56 (COLLECTOR, ROOM['source'], source, COLLECTOR),
57 (source, ROOM['state'], state, COLLECTOR),
58 ]))
59 elif state is None:
60 del self._sourceState[source]
61 self.applyPatch(COLLECTOR, Patch(delQuads=[
62 (COLLECTOR, ROOM['source'], source, COLLECTOR),
63 (source, ROOM['state'], oldState, COLLECTOR),
64 ]))
65 else:
66 self._sourceState[source] = state
67 self.applyPatch(COLLECTOR, Patch(addQuads=[
68 (source, ROOM['state'], state, COLLECTOR),
69 ], delQuads=[
70 (source, ROOM['state'], oldState, COLLECTOR),
71 ]))
72
73
74 class PostDeleter(object):
75
76 def __init__(self, statements: StatementTable):
77 self.statements = statements
78
79 def __enter__(self):
80 self._garbage: List[Statement] = []
81 return self
82
83 def add(self, stmt: Statement):
84 self._garbage.append(stmt)
85
86 def __exit__(self, type, value, traceback):
87 if type is not None:
88 raise NotImplementedError()
89 for stmt in self._garbage:
90 del self.statements[stmt]
91
92
93 class ActiveStatements(object):
94
95 def __init__(self):
96 # This table holds statements asserted by any of our sources
97 # plus local statements that we introduce (source is
98 # http://bigasterisk.com/sse_collector/).
99 self.table: StatementTable = collections.defaultdict(lambda: (set(), set()))
100
101 def state(self) -> Dict:
102 return {
103 'len': len(self.table),
104 }
105
106 def postDeleteStatements(self) -> PostDeleter:
107 return PostDeleter(self.table)
108
109 def pprintTable(self) -> None:
110 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())):
111 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers))
112
113 @MAKE_SYNC_PATCH_CALLS.time()
114 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]):
115 # todo: this could run all handlers at once, which is how we
116 # use it anyway
117 adds = []
118 dels = []
119
120 with self.postDeleteStatements() as garbage:
121 for stmt, (stmtSources, handlers) in self.table.items():
122 belongsInHandler = not sources.isdisjoint(stmtSources)
123 handlerHasIt = handler in handlers
124 # log.debug("%s belong=%s has=%s",
125 # abbrevStmt(stmt), belongsInHandler, handlerHasIt)
126 if belongsInHandler and not handlerHasIt:
127 adds.append(stmt)
128 handlers.add(handler)
129 elif not belongsInHandler and handlerHasIt:
130 dels.append(stmt)
131 handlers.remove(handler)
132 if not handlers and not stmtSources:
133 garbage.add(stmt)
134
135 return Patch(addQuads=adds, delQuads=dels)
136
137 def applySourcePatch(self, source: SourceUri, p: Patch):
138 for stmt in p.addQuads:
139 sourceUrls, handlers = self.table[stmt]
140 if source in sourceUrls:
141 raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt)))
142 sourceUrls.add(source)
143
144 with self.postDeleteStatements() as garbage:
145 for stmt in p.delQuads:
146 sourceUrls, handlers = self.table[stmt]
147 if source not in sourceUrls:
148 raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt)))
149 sourceUrls.remove(source)
150 # this is rare, since some handler probably still has
151 # the stmt we're deleting, but it can happen e.g. when
152 # a handler was just deleted
153 if not sourceUrls and not handlers:
154 garbage.add(stmt)
155
156 @REPLACE_SOURCE_STATEMENTS_CALLS.time()
157 def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]):
158 log.debug('replaceSourceStatements with %s stmts', len(stmts))
159 newStmts = set(stmts)
160
161 with self.postDeleteStatements() as garbage:
162 for stmt, (sources, handlers) in self.table.items():
163 if source in sources:
164 if stmt not in stmts:
165 sources.remove(source)
166 if not sources and not handlers:
167 garbage.add(stmt)
168 else:
169 if stmt in stmts:
170 sources.add(source)
171 newStmts.discard(stmt)
172
173 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
174
175 def discardHandler(self, handler: PatchSink):
176 with self.postDeleteStatements() as garbage:
177 for stmt, (sources, handlers) in self.table.items():
178 handlers.discard(handler)
179 if not sources and not handlers:
180 garbage.add(stmt)
181
182 def discardSource(self, source: SourceUri):
183 with self.postDeleteStatements() as garbage:
184 for stmt, (sources, handlers) in self.table.items():
185 sources.discard(source)
186 if not sources and not handlers:
187 garbage.add(stmt)