Mercurial > code > home > repos > homeauto
annotate service/collector/sse_collector.py @ 1516:4322e8b1481f
consolidate debug page into ./index.html for now
Ignore-this: 56775a500165c5c707238d99fbdb6739
darcs-hash:ffca0b24595096ba593c64f1609a56acd50e1522
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 04 Feb 2020 22:41:51 -0800 |
parents | 732e30c509d6 |
children |
rev | line source |
---|---|
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
1 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
2 requesting /graph/foo returns an SSE patch stream that's the |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
3 result of fetching multiple other SSE patch streams. The result stream |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
4 may include new statements injected by this service. |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
5 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
6 Future: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
7 - filter out unneeded stmts from the sources |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
8 - give a time resolution and concatenate any patches that come faster than that res |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
9 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
10 from docopt import docopt |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
11 from greplin import scales |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
12 from greplin.scales.cyclonehandler import StatsHandler |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
13 from rdflib import Namespace, URIRef |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
14 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
15 from typing import TYPE_CHECKING |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
16 if TYPE_CHECKING: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
17 from rdflib import StatementType |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
18 else: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
19 class StatementType: pass # type: ignore |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
20 |
1493 | 21 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
22 from rdflib.term import Node |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
23 from twisted.internet import reactor, defer |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
24 from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
25 import cyclone.web, cyclone.sse |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
26 import logging, collections, json, time |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
27 |
1398
53d7168bbe4c
standardize build. fix /state report
drewp <drewp@bigasterisk.com>
parents:
1273
diff
changeset
|
28 from standardservice.logsetup import log, enableTwistedLog |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
29 from patchablegraph import jsonFromPatch |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
30 from rdfdb.patch import Patch |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
31 |
1398
53d7168bbe4c
standardize build. fix /state report
drewp <drewp@bigasterisk.com>
parents:
1273
diff
changeset
|
32 from patchablegraph.patchsource import ReconnectingPatchSource |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
33 |
1516
4322e8b1481f
consolidate debug page into ./index.html for now
drewp <drewp@bigasterisk.com>
parents:
1494
diff
changeset
|
34 from collector_config import config |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
35 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
36 #SourceUri = NewType('SourceUri', URIRef) # doesn't work |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
37 class SourceUri(URIRef): pass |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
38 |
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
39 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
40 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
41 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
42 |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
43 STATS = scales.collection('/root', |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
44 scales.PmfStat('getState'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
45 scales.PmfStat('localStatementsPatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
46 scales.PmfStat('makeSyncPatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
47 scales.PmfStat('onPatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
48 scales.PmfStat('sendUpdatePatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
49 scales.PmfStat('replaceSourceStatements'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
50 ) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
51 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
52 class LocalStatements(object): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
53 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
54 functions that make statements originating from sse_collector itself |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
55 """ |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
56 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
57 self.applyPatch = applyPatch |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
58 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
59 |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
60 @STATS.localStatementsPatch.time() |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
61 def setSourceState(self, source: SourceUri, state: URIRef): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
62 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
63 add a patch to the COLLECTOR graph about the state of this |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
64 source. state=None to remove the source. |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
65 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
66 oldState = self._sourceState.get(source, None) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
67 if state == oldState: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
68 return |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
69 log.info('source state %s -> %s', source, state) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
70 if oldState is None: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
71 self._sourceState[source] = state |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
72 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
73 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
74 (source, ROOM['state'], state, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
75 ])) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
76 elif state is None: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
77 del self._sourceState[source] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
78 self.applyPatch(COLLECTOR, Patch(delQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
79 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
80 (source, ROOM['state'], oldState, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
81 ])) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
82 else: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
83 self._sourceState[source] = state |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
84 self.applyPatch(COLLECTOR, Patch( |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
85 addQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
86 (source, ROOM['state'], state, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
87 ], |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
88 delQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
89 (source, ROOM['state'], oldState, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
90 ])) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
91 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
92 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
93 if isinstance(t, URIRef): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
94 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') |
1249
2ddfb2cf95ab
rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp <drewp@bigasterisk.com>
parents:
1247
diff
changeset
|
95 .replace('http://projects.bigasterisk.com/device/', 'dev:') |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
96 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
97 return t |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
98 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
99 def abbrevStmt(stmt: StatementType) -> str: |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
100 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
101 abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
102 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
103 class PatchSink(cyclone.sse.SSEHandler): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
104 _handlerSerial = 0 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
105 def __init__(self, application: cyclone.web.Application, request): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
106 cyclone.sse.SSEHandler.__init__(self, application, request) |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
107 self.bound = False |
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
108 self.created = time.time() |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
109 self.graphClients = self.settings.graphClients |
1493 | 110 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
111 self._serial = PatchSink._handlerSerial |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
112 PatchSink._handlerSerial += 1 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
113 self.lastPatchSentTime: float = 0.0 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
114 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
115 def __repr__(self) -> str: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
116 return '<Handler #%s>' % self._serial |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
117 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
118 def state(self) -> Dict: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
119 return { |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
120 'created': round(self.created, 2), |
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
121 'ageHours': round((time.time() - self.created) / 3600, 2), |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
122 'streamId': self.streamId, |
1451 | 123 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing |
124 'foafAgent': self.request.headers.get('X-Foaf-Agent'), | |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
125 'userAgent': self.request.headers.get('user-agent'), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
126 } |
1493 | 127 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
128 def bind(self, *args, **kwargs): |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
129 self.streamId = args[0] |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
130 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
131 self.graphClients.addSseHandler(self) |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
132 # If something goes wrong with addSseHandler, I don't want to |
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
133 # try removeSseHandler. |
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
134 self.bound = True |
1493 | 135 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
136 def unbind(self) -> None: |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
137 if self.bound: |
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
138 self.graphClients.removeSseHandler(self) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
139 |
1493 | 140 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
141 StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[PatchSink]]] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
142 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
143 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
144 class PostDeleter(object): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
145 def __init__(self, statements: StatementTable): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
146 self.statements = statements |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
147 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
148 def __enter__(self): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
149 self._garbage: List[StatementType] = [] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
150 return self |
1493 | 151 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
152 def add(self, stmt: StatementType): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
153 self._garbage.append(stmt) |
1493 | 154 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
155 def __exit__(self, type, value, traceback): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
156 if type is not None: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
157 raise |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
158 for stmt in self._garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
159 del self.statements[stmt] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
160 |
1493 | 161 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
162 class ActiveStatements(object): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
163 def __init__(self): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
164 # This table holds statements asserted by any of our sources |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
165 # plus local statements that we introduce (source is |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
166 # http://bigasterisk.com/sse_collector/). |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
167 self.table: StatementTable = collections.defaultdict( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
168 lambda: (set(), set())) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
169 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
170 def state(self) -> Dict: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
171 return { |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
172 'len': len(self.table), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
173 } |
1493 | 174 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
175 def postDeleteStatements(self) -> PostDeleter: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
176 return PostDeleter(self.table) |
1493 | 177 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
178 def pprintTable(self) -> None: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
179 for i, (stmt, (sources, handlers)) in enumerate( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
180 sorted(self.table.items())): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
181 print("%03d. %-80s from %s to %s" % ( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
182 i, |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
183 abbrevStmt(stmt), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
184 [abbrevTerm(s) for s in sources], |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
185 handlers)) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
186 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
187 @STATS.makeSyncPatch.time() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
188 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
189 # todo: this could run all handlers at once, which is how we |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
190 # use it anyway |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
191 adds = [] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
192 dels = [] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
193 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
194 with self.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
195 for stmt, (stmtSources, handlers) in self.table.items(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
196 belongsInHandler = not sources.isdisjoint(stmtSources) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
197 handlerHasIt = handler in handlers |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
198 #log.debug("%s belong=%s has=%s", |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
199 # abbrevStmt(stmt), belongsInHandler, handlerHasIt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
200 if belongsInHandler and not handlerHasIt: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
201 adds.append(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
202 handlers.add(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
203 elif not belongsInHandler and handlerHasIt: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
204 dels.append(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
205 handlers.remove(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
206 if not handlers and not stmtSources: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
207 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
208 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
209 return Patch(addQuads=adds, delQuads=dels) |
1493 | 210 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
211 def applySourcePatch(self, source: SourceUri, p: Patch): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
212 for stmt in p.addQuads: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
213 sourceUrls, handlers = self.table[stmt] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
214 if source in sourceUrls: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
215 raise ValueError("%s added stmt that it already had: %s" % |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
216 (source, abbrevStmt(stmt))) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
217 sourceUrls.add(source) |
1493 | 218 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
219 with self.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
220 for stmt in p.delQuads: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
221 sourceUrls, handlers = self.table[stmt] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
222 if source not in sourceUrls: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
223 raise ValueError("%s deleting stmt that it didn't have: %s" % |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
224 (source, abbrevStmt(stmt))) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
225 sourceUrls.remove(source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
226 # this is rare, since some handler probably still has |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
227 # the stmt we're deleting, but it can happen e.g. when |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
228 # a handler was just deleted |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
229 if not sourceUrls and not handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
230 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
231 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
232 @STATS.replaceSourceStatements.time() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
233 def replaceSourceStatements(self, source: SourceUri, |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
234 stmts: Sequence[StatementType]): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
235 log.debug('replaceSourceStatements with %s stmts', len(stmts)) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
236 newStmts = set(stmts) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
237 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
238 with self.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
239 for stmt, (sources, handlers) in self.table.items(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
240 if source in sources: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
241 if stmt not in stmts: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
242 sources.remove(source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
243 if not sources and not handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
244 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
245 else: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
246 if stmt in stmts: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
247 sources.add(source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
248 newStmts.discard(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
249 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
250 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
251 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
252 def discardHandler(self, handler: PatchSink): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
253 with self.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
254 for stmt, (sources, handlers) in self.table.items(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
255 handlers.discard(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
256 if not sources and not handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
257 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
258 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
259 def discardSource(self, source: SourceUri): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
260 with self.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
261 for stmt, (sources, handlers) in self.table.items(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
262 sources.discard(source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
263 if not sources and not handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
264 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
265 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
266 |
1493 | 267 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
268 class GraphClients(object): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
269 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
270 All the active PatchSources and SSEHandlers |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
271 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
272 To handle all the overlapping-statement cases, we store a set of |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
273 true statements along with the sources that are currently |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
274 asserting them and the requesters who currently know them. As |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
275 statements come and go, we make patches to send to requesters. |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
276 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
277 def __init__(self): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
278 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
279 self.handlers: Set[PatchSink] = set() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
280 self.statements: ActiveStatements = ActiveStatements() |
1493 | 281 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
282 self._localStatements = LocalStatements(self._onPatch) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
283 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
284 def state(self) -> Dict: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
285 return { |
1451 | 286 'clients': sorted([ps.state() for ps in self.clients.values()], |
287 key=lambda r: r['reconnectedPatchSource']['url']), | |
288 'sseHandlers': sorted([h.state() for h in self.handlers], | |
289 key=lambda r: (r['streamId'], r['created'])), | |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
290 'statements': self.statements.state(), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
291 } |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
292 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
293 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
294 streamId = handler.streamId |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
295 matches = [s for s in config['streams'] if s['id'] == streamId] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
296 if len(matches) != 1: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
297 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
298 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
299 COLLECTOR] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
300 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
301 @STATS.onPatch.time() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
302 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
303 if fullGraph: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
304 # a reconnect may need to resend the full graph even |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
305 # though we've already sent some statements |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
306 self.statements.replaceSourceStatements(source, p.addQuads) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
307 else: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
308 self.statements.applySourcePatch(source, p) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
309 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
310 self._sendUpdatePatch() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
311 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
312 if log.isEnabledFor(logging.DEBUG): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
313 self.statements.pprintTable() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
314 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
315 if source != COLLECTOR: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
316 self._localStatements.setSourceState( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
317 source, |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
318 ROOM['fullGraphReceived'] if fullGraph else |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
319 ROOM['patchesReceived']) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
320 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
321 @STATS.sendUpdatePatch.time() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
322 def _sendUpdatePatch(self, handler: Optional[PatchSink]=None): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
323 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
324 send a patch event out this handler to bring it up to date with |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
325 self.statements |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
326 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
327 now = time.time() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
328 selected = self.handlers |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
329 if handler is not None: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
330 if handler not in self.handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
331 log.error("called _sendUpdatePatch on a handler that's gone") |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
332 return |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
333 selected = {handler} |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
334 # reduce loops here- prepare all patches at once |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
335 for h in selected: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
336 period = .9 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
337 if 'Raspbian' in h.request.headers.get('user-agent', ''): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
338 period = 5 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
339 if h.lastPatchSentTime > now - period: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
340 continue |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
341 p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
342 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
343 if not p.isNoop(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
344 log.debug("send patch %s to %s", p.shortSummary(), h) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
345 # This can be a giant line, which was a problem |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
346 # once. Might be nice for this service to try to break |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
347 # it up into multiple sends, although there's no |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
348 # guarantee at all since any single stmt could be any |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
349 # length. |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
350 h.sendEvent(message=jsonFromPatch(p).encode('utf8'), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
351 event=b'patch') |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
352 h.lastPatchSentTime = now |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
353 else: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
354 log.debug('nothing to send to %s', h) |
1493 | 355 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
356 def addSseHandler(self, handler: PatchSink): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
357 log.info('addSseHandler %r %r', handler, handler.streamId) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
358 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
359 # fail early if id doesn't match |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
360 sources = self._sourcesForHandler(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
361 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
362 self.handlers.add(handler) |
1493 | 363 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
364 for source in sources: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
365 if source not in self.clients and source != COLLECTOR: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
366 log.debug('connect to patch source %s', source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
367 self._localStatements.setSourceState(source, ROOM['connect']) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
368 self.clients[source] = ReconnectingPatchSource( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
369 source, |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
370 listener=lambda p, fullGraph, source=source: self._onPatch( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
371 source, p, fullGraph), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
372 reconnectSecs=10) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
373 log.debug('bring new client up to date') |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
374 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
375 self._sendUpdatePatch(handler) |
1493 | 376 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
377 def removeSseHandler(self, handler: PatchSink): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
378 log.info('removeSseHandler %r', handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
379 self.statements.discardHandler(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
380 for source in self._sourcesForHandler(handler): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
381 for otherHandler in self.handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
382 if (otherHandler != handler and |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
383 source in self._sourcesForHandler(otherHandler)): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
384 # still in use |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
385 break |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
386 else: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
387 self._stopClient(source) |
1493 | 388 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
389 self.handlers.remove(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
390 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
391 def _stopClient(self, url: SourceUri): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
392 if url == COLLECTOR: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
393 return |
1493 | 394 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
395 self.clients[url].stop() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
396 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
397 self.statements.discardSource(url) |
1493 | 398 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
399 self._localStatements.setSourceState(url, None) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
400 if url in self.clients: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
401 del self.clients[url] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
402 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
403 self.cleanup() |
1493 | 404 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
405 def cleanup(self): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
406 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
407 despite the attempts above, we still get useless rows in the table |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
408 sometimes |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
409 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
410 with self.statements.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
411 for stmt, (sources, handlers) in self.statements.table.items(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
412 if not sources and not any(h in self.handlers for h in handlers): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
413 garbage.add(stmt) |
1493 | 414 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
415 |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
416 class State(cyclone.web.RequestHandler): |
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
417 @STATS.getState.time() |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
418 def get(self) -> None: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
419 try: |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
420 state = self.settings.graphClients.state() |
1398
53d7168bbe4c
standardize build. fix /state report
drewp <drewp@bigasterisk.com>
parents:
1273
diff
changeset
|
421 self.write(json.dumps({'graphClients': state}, indent=2, |
53d7168bbe4c
standardize build. fix /state report
drewp <drewp@bigasterisk.com>
parents:
1273
diff
changeset
|
422 default=lambda obj: '<unserializable>')) |
53d7168bbe4c
standardize build. fix /state report
drewp <drewp@bigasterisk.com>
parents:
1273
diff
changeset
|
423 except Exception: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
424 import traceback; traceback.print_exc() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
425 raise |
1494
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
426 |
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
427 class GraphList(cyclone.web.RequestHandler): |
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
428 def get(self) -> None: |
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
429 self.write(json.dumps(config['streams'])) |
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
430 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
431 if __name__ == '__main__': |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
432 arg = docopt(""" |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
433 Usage: sse_collector.py [options] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
434 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
435 -v Verbose |
1494
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
436 -i Info level only |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
437 """) |
1494
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
438 |
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
439 if arg['-v'] or arg['-i']: |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
440 enableTwistedLog() |
1494
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
441 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
442 defer.setDebugging(True) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
443 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
444 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
445 graphClients = GraphClients() |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
446 #exporter = InfluxExporter(... to export some stats values |
1493 | 447 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
448 reactor.listenTCP( |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
449 9072, |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
450 cyclone.web.Application( |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
451 handlers=[ |
1451 | 452 (r"/()", cyclone.web.StaticFileHandler, { |
1516
4322e8b1481f
consolidate debug page into ./index.html for now
drewp <drewp@bigasterisk.com>
parents:
1494
diff
changeset
|
453 "path": ".", "default_filename": "index.html"}), |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
454 (r'/state', State), |
1494
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
455 (r'/graph/', GraphList), |
732e30c509d6
collector: add /graphlist, plus logging updates
drewp <drewp@bigasterisk.com>
parents:
1493
diff
changeset
|
456 (r'/graph/(.+)', PatchSink), |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
457 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}), |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
458 ], |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
459 graphClients=graphClients), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
460 interface='::') |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
461 reactor.run() |