annotate service/collector/sse_collector.py @ 451:17a556ddc5ac

add types to sse_collector.py. Surprisingly few bugs found. Ignore-this: df20acbf7ec27226f3060f3b5a4c710b
author drewp@bigasterisk.com
date Fri, 19 Apr 2019 01:08:01 -0700
parents ef7eba0551f2
children 91ab9f926aa1
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
1 """
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
2 requesting /graph/foo returns an SSE patch stream that's the
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
3 result of fetching multiple other SSE patch streams. The result stream
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
4 may include new statements injected by this service.
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
5
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
6 Future:
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
7 - filter out unneeded stmts from the sources
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
8 - give a time resolution and concatenate any patches that come faster than that res
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
9 """
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
10 from docopt import docopt
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
11 from greplin import scales
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
12 from greplin.scales.cyclonehandler import StatsHandler
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
13 from rdflib import Namespace, URIRef, StatementType
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
14 from rdflib.term import Node
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
15 from twisted.internet import reactor, defer
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
16 from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
17 import cyclone.web, cyclone.sse
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
18 import logging, collections, json, time
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
19
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
20 from logsetup import log, enableTwistedLog
302
46c5fae89823 factor out patchsource
drewp@bigasterisk.com
parents: 301
diff changeset
21 from patchablegraph import jsonFromPatch
351
7716b1810d6c reasoning & collector move into docker images
drewp@bigasterisk.com
parents: 316
diff changeset
22 from rdfdb.patch import Patch
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
23
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
24 # workaround for broken import in twisted_sse_demo/eventsourcee.py
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
25 import sys; sys.path.append('twisted_sse_demo')
302
46c5fae89823 factor out patchsource
drewp@bigasterisk.com
parents: 301
diff changeset
26 from patchsource import ReconnectingPatchSource
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
27
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
28 from sse_collector_config import config
302
46c5fae89823 factor out patchsource
drewp@bigasterisk.com
parents: 301
diff changeset
29
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
30 #SourceUri = NewType('SourceUri', URIRef) # doesn't work
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
31 class SourceUri(URIRef): pass
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
32
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
33
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
34 ROOM = Namespace("http://projects.bigasterisk.com/room/")
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
35 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
36
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
37 STATS = scales.collection('/root',
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
38 scales.PmfStat('getState'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
39 scales.PmfStat('localStatementsPatch'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
40 scales.PmfStat('makeSyncPatch'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
41 scales.PmfStat('onPatch'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
42 scales.PmfStat('sendUpdatePatch'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
43 scales.PmfStat('replaceSourceStatements'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
44 )
351
7716b1810d6c reasoning & collector move into docker images
drewp@bigasterisk.com
parents: 316
diff changeset
45
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
46 class LocalStatements(object):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
47 """
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
48 functions that make statements originating from sse_collector itself
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
49 """
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
50 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]):
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
51 self.applyPatch = applyPatch
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
52 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
53
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
54 @STATS.localStatementsPatch.time()
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
55 def setSourceState(self, source: SourceUri, state: URIRef):
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
56 """
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
57 add a patch to the COLLECTOR graph about the state of this
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
58 source. state=None to remove the source.
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
59 """
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
60 oldState = self._sourceState.get(source, None)
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
61 if state == oldState:
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
62 return
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
63 log.info('source state %s -> %s', source, state)
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
64 if oldState is None:
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
65 self._sourceState[source] = state
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
66 self.applyPatch(COLLECTOR, Patch(addQuads=[
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
67 (COLLECTOR, ROOM['source'], source, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
68 (source, ROOM['state'], state, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
69 ]))
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
70 elif state is None:
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
71 del self._sourceState[source]
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
72 self.applyPatch(COLLECTOR, Patch(delQuads=[
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
73 (COLLECTOR, ROOM['source'], source, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
74 (source, ROOM['state'], oldState, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
75 ]))
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
76 else:
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
77 self._sourceState[source] = state
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
78 self.applyPatch(COLLECTOR, Patch(
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
79 addQuads=[
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
80 (source, ROOM['state'], state, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
81 ],
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
82 delQuads=[
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
83 (source, ROOM['state'], oldState, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
84 ]))
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
85
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
86 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]:
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
87 if isinstance(t, URIRef):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
88 return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
446
346b85a9adbb rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp@bigasterisk.com
parents: 444
diff changeset
89 .replace('http://projects.bigasterisk.com/device/', 'dev:')
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
90 .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
91 return t
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
92
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
93 def abbrevStmt(stmt: StatementType) -> str:
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
94 t = tuple(map(abbrevTerm, stmt))
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
95 return '(%s %s %s %s)' % (t[0], t[1], t[2], t[3])
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
96
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
97 StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[SomeGraph]]]
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
98
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
99
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
100 class PostDeleter(object):
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
101 def __init__(self, statements: StatementTable):
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
102 self.statements = statements
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
103
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
104 def __enter__(self):
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
105 self._garbage: List[StatementType] = []
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
106 return self
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
107
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
108 def add(self, stmt: StatementType):
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
109 self._garbage.append(stmt)
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
110
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
111 def __exit__(self, type, value, traceback):
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
112 if type is not None:
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
113 raise
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
114 for stmt in self._garbage:
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
115 del self.statements[stmt]
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
116
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
117
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
118 class ActiveStatements(object):
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
119 def __init__(self):
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
120 # This table holds statements asserted by any of our sources
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
121 # plus local statements that we introduce (source is
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
122 # http://bigasterisk.com/sse_collector/).
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
123 self.statements: StatementTable = collections.defaultdict(lambda: (set(), set()))
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
124
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
125 def state(self) -> Dict:
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
126 return {
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
127 'len': len(self.statements),
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
128 }
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
129
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
130 def _postDeleteStatements(self) -> PostDeleter:
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
131 return PostDeleter(self.statements)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
132
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
133 def pprintTable(self) -> None:
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
134 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
135 print("%03d. %-80s from %s to %s" % (
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
136 i,
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
137 abbrevStmt(stmt),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
138 [abbrevTerm(s) for s in sources],
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
139 handlers))
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
140
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
141 @STATS.makeSyncPatch.time()
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
142 def makeSyncPatch(self, handler: SomeGraph, sources: Set[SourceUri]):
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
143 # todo: this could run all handlers at once, which is how we use it anyway
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
144 adds = []
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
145 dels = []
444
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
146
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
147 with self._postDeleteStatements() as garbage:
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
148 for stmt, (stmtSources, handlers) in self.statements.items():
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
149 belongsInHandler = not sources.isdisjoint(stmtSources)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
150 handlerHasIt = handler in handlers
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
151 log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
152 if belongsInHandler and not handlerHasIt:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
153 adds.append(stmt)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
154 handlers.add(handler)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
155 elif not belongsInHandler and handlerHasIt:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
156 dels.append(stmt)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
157 handlers.remove(handler)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
158 if not handlers and not stmtSources:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
159 garbage.add(stmt)
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
160
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
161 return Patch(addQuads=adds, delQuads=dels)
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
162
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
163 def applySourcePatch(self, source: SourceUri, p: Patch):
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
164 for stmt in p.addQuads:
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
165 sourceUrls, handlers = self.statements[stmt]
446
346b85a9adbb rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp@bigasterisk.com
parents: 444
diff changeset
166 if source in sourceUrls:
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
167 raise ValueError("%s added stmt that it already had: %s" %
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
168 (source, abbrevStmt(stmt)))
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
169 sourceUrls.add(source)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
170
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
171 with self._postDeleteStatements() as garbage:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
172 for stmt in p.delQuads:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
173 sourceUrls, handlers = self.statements[stmt]
446
346b85a9adbb rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp@bigasterisk.com
parents: 444
diff changeset
174 if source not in sourceUrls:
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
175 raise ValueError("%s deleting stmt that it didn't have: %s" %
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
176 (source, abbrevStmt(stmt)))
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
177 sourceUrls.remove(source)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
178 # this is rare, since some handler probably still has
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
179 # the stmt we're deleting, but it can happen e.g. when
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
180 # a handler was just deleted
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
181 if not sourceUrls and not handlers:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
182 garbage.add(stmt)
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
183
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
184 @STATS.replaceSourceStatements.time()
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
185 def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[StatementType]):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
186 log.debug('replaceSourceStatements with %s stmts', len(stmts))
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
187 newStmts = set(stmts)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
188
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
189 with self._postDeleteStatements() as garbage:
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
190 for stmt, (sources, handlers) in self.statements.items():
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
191 if source in sources:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
192 if stmt not in stmts:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
193 sources.remove(source)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
194 if not sources and not handlers:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
195 garbage.add(stmt)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
196 else:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
197 if stmt in stmts:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
198 sources.add(source)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
199 newStmts.discard(stmt)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
200
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
201 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
202
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
203 def discardHandler(self, handler: SomeGraph):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
204 with self._postDeleteStatements() as garbage:
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
205 for stmt, (sources, handlers) in self.statements.items():
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
206 handlers.discard(handler)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
207 if not sources and not handlers:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
208 garbage.add(stmt)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
209
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
210 def discardSource(self, source: SourceUri):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
211 with self._postDeleteStatements() as garbage:
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
212 for stmt, (sources, handlers) in self.statements.items():
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
213 sources.discard(source)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
214 if not sources and not handlers:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
215 garbage.add(stmt)
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
216
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
217
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
218 class GraphClients(object):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
219 """
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
220 All the active PatchSources and SSEHandlers
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
221
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
222 To handle all the overlapping-statement cases, we store a set of
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
223 true statements along with the sources that are currently
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
224 asserting them and the requesters who currently know them. As
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
225 statements come and go, we make patches to send to requesters.
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
226 """
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
227 def __init__(self):
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
228 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed)
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
229 self.handlers: Set[SomeGraph] = set()
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
230 self.statements: ActiveStatements = ActiveStatements()
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
231
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
232 self._localStatements = LocalStatements(self._onPatch)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
233
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
234 def state(self) -> Dict:
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
235 return {
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
236 'clients': [ps.state() for ps in self.clients.values()],
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
237 'sseHandlers': [h.state() for h in self.handlers],
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
238 'statements': self.statements.state(),
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
239 }
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
240
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
241 def _sourcesForHandler(self, handler: SomeGraph) -> List[SourceUri]:
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
242 streamId = handler.streamId
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
243 matches = [s for s in config['streams'] if s['id'] == streamId]
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
244 if len(matches) != 1:
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
245 raise ValueError("%s matches for %r" % (len(matches), streamId))
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
246 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR]
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
247
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
248 @STATS.onPatch.time()
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
249 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
250 if fullGraph:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
251 # a reconnect may need to resend the full graph even
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
252 # though we've already sent some statements
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
253 self.statements.replaceSourceStatements(source, p.addQuads)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
254 else:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
255 self.statements.applySourcePatch(source, p)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
256
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
257 self._sendUpdatePatch()
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
258
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
259 if log.isEnabledFor(logging.DEBUG):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
260 self.statements.pprintTable()
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
261
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
262 if source != COLLECTOR:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
263 self._localStatements.setSourceState(
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
264 source,
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
265 ROOM['fullGraphReceived'] if fullGraph else
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
266 ROOM['patchesReceived'])
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
267
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
268 @STATS.sendUpdatePatch.time()
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
269 def _sendUpdatePatch(self, handler: Optional[SomeGraph]=None):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
270 """
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
271 send a patch event out this handler to bring it up to date with
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
272 self.statements
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
273 """
444
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
274 now = time.time()
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
275 # reduce loops here- prepare all patches at once
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
276 for h in (self.handlers if handler is None else [handler]):
444
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
277 period = 1
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
278 if 'Raspbian' in h.request.headers.get('user-agent', ''):
444
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
279 period = 5
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
280 if h.lastPatchSentTime > now - period:
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
281 continue
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
282 p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h))
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
283 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
284 if not p.isNoop():
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
285 log.debug("send patch %s to %s", p.shortSummary(), h)
316
02d9915b3bbb patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents: 313
diff changeset
286 # This can be a giant line, which was a problem once. Might be
02d9915b3bbb patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents: 313
diff changeset
287 # nice for this service to try to break it up into multiple sends,
02d9915b3bbb patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents: 313
diff changeset
288 # although there's no guarantee at all since any single stmt
02d9915b3bbb patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents: 313
diff changeset
289 # could be any length.
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
290 h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch')
444
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
291 h.lastPatchSentTime = now
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
292 else:
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
293 log.debug('nothing to send to %s', h)
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
294
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
295 def addSseHandler(self, handler: SomeGraph):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
296 log.info('addSseHandler %r %r', handler, handler.streamId)
313
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
297
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
298 # fail early if id doesn't match
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
299 sources = self._sourcesForHandler(handler)
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
300
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
301 self.handlers.add(handler)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
302
313
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
303 for source in sources:
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
304 if source not in self.clients and source != COLLECTOR:
442
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
305 log.debug('connect to patch source %s', source)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
306 self._localStatements.setSourceState(source, ROOM['connect'])
302
46c5fae89823 factor out patchsource
drewp@bigasterisk.com
parents: 301
diff changeset
307 self.clients[source] = ReconnectingPatchSource(
444
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
308 source,
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
309 listener=lambda p, fullGraph, source=source: self._onPatch(
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
310 source, p, fullGraph),
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
311 reconnectSecs=10)
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
312 log.debug('bring new client up to date')
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
313 handler.sendEvent(message='hello', event='greets')
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
314
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
315 self._sendUpdatePatch(handler)
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
316
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
317 def removeSseHandler(self, handler: SomeGraph):
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
318 log.info('removeSseHandler %r', handler)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
319 self.statements.discardHandler(handler)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
320 for source in self._sourcesForHandler(handler):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
321 for otherHandler in self.handlers:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
322 if (otherHandler != handler and
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
323 source in self._sourcesForHandler(otherHandler)):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
324 break
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
325 else:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
326 self._stopClient(source)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
327
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
328 self.handlers.remove(handler)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
329
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
330 def _stopClient(self, url: SourceUri):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
331 if url == COLLECTOR:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
332 return
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
333
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
334 self.clients[url].stop()
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
335
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
336 self.statements.discardSource(url)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
337
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
338 self._localStatements.setSourceState(url, None)
444
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
339 if url in self.clients:
96d712dccf28 WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents: 443
diff changeset
340 del self.clients[url]
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
341
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
342
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
343 class SomeGraph(cyclone.sse.SSEHandler):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
344 _handlerSerial = 0
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
345 def __init__(self, application: cyclone.web.Application, request):
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
346 cyclone.sse.SSEHandler.__init__(self, application, request)
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
347 self.bound = False
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
348 self.created = time.time()
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
349 self.graphClients = self.settings.graphClients
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
350
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
351 self._serial = SomeGraph._handlerSerial
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
352 SomeGraph._handlerSerial += 1
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
353 self.lastPatchSentTime: float = 0.0
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
354
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
355 def __repr__(self) -> str:
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
356 return '<Handler #%s>' % self._serial
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
357
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
358 def state(self) -> Dict:
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
359 return {
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
360 'created': round(self.created, 2),
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
361 'ageHours': round((time.time() - self.created) / 3600, 2),
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
362 'streamId': self.streamId,
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
363 'remoteIp': self.request.remote_ip,
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
364 'userAgent': self.request.headers.get('user-agent'),
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
365 }
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
366
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
367 def bind(self, *args, **kwargs):
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
368 self.streamId = args[0]
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
369
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
370 self.graphClients.addSseHandler(self)
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
371 # If something goes wrong with addSseHandler, I don't want to
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
372 # try removeSseHandler.
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
373 self.bound = True
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
374
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
375 def unbind(self) -> None:
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
376 if self.bound:
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
377 self.graphClients.removeSseHandler(self)
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
378
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
379
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
380 class State(cyclone.web.RequestHandler):
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
381 @STATS.getState.time()
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
382 def get(self) -> None:
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
383 try:
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
384 state = self.settings.graphClients.state()
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
385 except:
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
386 import traceback; traceback.print_exc()
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
387 raise
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
388
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
389 self.write(json.dumps({'graphClients': state}, indent=2))
313
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
390
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
391
313
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
392 class Root(cyclone.web.RequestHandler):
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
393 def get(self) -> None:
313
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
394 self.write('<html><body>sse_collector</body></html>')
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
395
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
396
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
397 if __name__ == '__main__':
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
398 arg = docopt("""
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
399 Usage: sse_collector.py [options]
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
400
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
401 -v Verbose
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
402 """)
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
403
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
404 if arg['-v']:
449
ef7eba0551f2 collector partial py3+types update. WIP
drewp@bigasterisk.com
parents: 446
diff changeset
405 enableTwistedLog()
442
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
406 log.setLevel(logging.DEBUG)
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
407 defer.setDebugging(True)
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
408
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
409
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
410 graphClients = GraphClients()
442
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
411 #exporter = InfluxExporter(... to export some stats values
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
412
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
413 reactor.listenTCP(
451
17a556ddc5ac add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents: 449
diff changeset
414 9072,
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
415 cyclone.web.Application(
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
416 handlers=[
313
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
417 (r'/', Root),
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
418 (r'/state', State),
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
419 (r'/graph/(.*)', SomeGraph),
442
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
420 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}),
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
421 ],
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
422 graphClients=graphClients),
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
423 interface='::')
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
424 reactor.run()