Mercurial > code > home > repos > homeauto
annotate service/collector/sse_collector.py @ 1398:53d7168bbe4c
standardize build. fix /state report
Ignore-this: 5a9bc82de9f0d7398c9290fc2c7ecbf9
darcs-hash:76d1c973c00084e248d881159ee90a3ae4dee655
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sat, 06 Jul 2019 13:56:07 -0700 |
parents | 6f27fe20f6eb |
children | 71684fc9c692 |
rev | line source |
---|---|
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
1 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
2 requesting /graph/foo returns an SSE patch stream that's the |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
3 result of fetching multiple other SSE patch streams. The result stream |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
4 may include new statements injected by this service. |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
5 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
6 Future: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
7 - filter out unneeded stmts from the sources |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
8 - give a time resolution and concatenate any patches that come faster than that res |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
9 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
10 from docopt import docopt |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
11 from greplin import scales |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
12 from greplin.scales.cyclonehandler import StatsHandler |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
13 from rdflib import Namespace, URIRef |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
14 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
15 from typing import TYPE_CHECKING |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
16 if TYPE_CHECKING: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
17 from rdflib import StatementType |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
18 else: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
19 class StatementType: pass # type: ignore |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
20 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
21 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
22 from rdflib.term import Node |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
23 from twisted.internet import reactor, defer |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
24 from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
25 import cyclone.web, cyclone.sse |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
26 import logging, collections, json, time |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
27 |
1398
53d7168bbe4c
standardize build. fix /state report
drewp <drewp@bigasterisk.com>
parents:
1273
diff
changeset
|
28 from standardservice.logsetup import log, enableTwistedLog |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
29 from patchablegraph import jsonFromPatch |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
30 from rdfdb.patch import Patch |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
31 |
1398
53d7168bbe4c
standardize build. fix /state report
drewp <drewp@bigasterisk.com>
parents:
1273
diff
changeset
|
32 from patchablegraph.patchsource import ReconnectingPatchSource |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
33 |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
34 from sse_collector_config import config |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
35 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
36 #SourceUri = NewType('SourceUri', URIRef) # doesn't work |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
37 class SourceUri(URIRef): pass |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
38 |
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
39 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
40 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
41 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
42 |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
43 STATS = scales.collection('/root', |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
44 scales.PmfStat('getState'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
45 scales.PmfStat('localStatementsPatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
46 scales.PmfStat('makeSyncPatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
47 scales.PmfStat('onPatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
48 scales.PmfStat('sendUpdatePatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
49 scales.PmfStat('replaceSourceStatements'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
50 ) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
51 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
52 class LocalStatements(object): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
53 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
54 functions that make statements originating from sse_collector itself |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
55 """ |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
56 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
57 self.applyPatch = applyPatch |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
58 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
59 |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
60 @STATS.localStatementsPatch.time() |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
61 def setSourceState(self, source: SourceUri, state: URIRef): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
62 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
63 add a patch to the COLLECTOR graph about the state of this |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
64 source. state=None to remove the source. |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
65 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
66 oldState = self._sourceState.get(source, None) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
67 if state == oldState: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
68 return |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
69 log.info('source state %s -> %s', source, state) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
70 if oldState is None: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
71 self._sourceState[source] = state |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
72 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
73 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
74 (source, ROOM['state'], state, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
75 ])) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
76 elif state is None: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
77 del self._sourceState[source] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
78 self.applyPatch(COLLECTOR, Patch(delQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
79 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
80 (source, ROOM['state'], oldState, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
81 ])) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
82 else: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
83 self._sourceState[source] = state |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
84 self.applyPatch(COLLECTOR, Patch( |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
85 addQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
86 (source, ROOM['state'], state, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
87 ], |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
88 delQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
89 (source, ROOM['state'], oldState, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
90 ])) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
91 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
92 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
93 if isinstance(t, URIRef): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
94 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') |
1249
2ddfb2cf95ab
rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp <drewp@bigasterisk.com>
parents:
1247
diff
changeset
|
95 .replace('http://projects.bigasterisk.com/device/', 'dev:') |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
96 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
97 return t |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
98 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
99 def abbrevStmt(stmt: StatementType) -> str: |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
100 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
101 abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
102 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
103 class PatchSink(cyclone.sse.SSEHandler): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
104 _handlerSerial = 0 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
105 def __init__(self, application: cyclone.web.Application, request): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
106 cyclone.sse.SSEHandler.__init__(self, application, request) |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
107 self.bound = False |
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
108 self.created = time.time() |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
109 self.graphClients = self.settings.graphClients |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
110 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
111 self._serial = PatchSink._handlerSerial |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
112 PatchSink._handlerSerial += 1 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
113 self.lastPatchSentTime: float = 0.0 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
114 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
115 def __repr__(self) -> str: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
116 return '<Handler #%s>' % self._serial |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
117 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
118 def state(self) -> Dict: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
119 return { |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
120 'created': round(self.created, 2), |
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
121 'ageHours': round((time.time() - self.created) / 3600, 2), |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
122 'streamId': self.streamId, |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
123 'remoteIp': self.request.remote_ip, |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
124 'userAgent': self.request.headers.get('user-agent'), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
125 } |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
126 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
127 def bind(self, *args, **kwargs): |
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
128 self.streamId = args[0] |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
129 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
130 self.graphClients.addSseHandler(self) |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
131 # If something goes wrong with addSseHandler, I don't want to |
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
132 # try removeSseHandler. |
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
133 self.bound = True |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
134 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
135 def unbind(self) -> None: |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
136 if self.bound: |
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
137 self.graphClients.removeSseHandler(self) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
138 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
139 |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
140 StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[PatchSink]]] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
141 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
142 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
143 class PostDeleter(object): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
144 def __init__(self, statements: StatementTable): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
145 self.statements = statements |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
146 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
147 def __enter__(self): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
148 self._garbage: List[StatementType] = [] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
149 return self |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
150 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
151 def add(self, stmt: StatementType): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
152 self._garbage.append(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
153 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
154 def __exit__(self, type, value, traceback): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
155 if type is not None: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
156 raise |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
157 for stmt in self._garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
158 del self.statements[stmt] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
159 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
160 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
161 class ActiveStatements(object): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
162 def __init__(self): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
163 # This table holds statements asserted by any of our sources |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
164 # plus local statements that we introduce (source is |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
165 # http://bigasterisk.com/sse_collector/). |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
166 self.table: StatementTable = collections.defaultdict( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
167 lambda: (set(), set())) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
168 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
169 def state(self) -> Dict: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
170 return { |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
171 'len': len(self.table), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
172 } |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
173 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
174 def postDeleteStatements(self) -> PostDeleter: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
175 return PostDeleter(self.table) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
176 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
177 def pprintTable(self) -> None: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
178 for i, (stmt, (sources, handlers)) in enumerate( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
179 sorted(self.table.items())): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
180 print("%03d. %-80s from %s to %s" % ( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
181 i, |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
182 abbrevStmt(stmt), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
183 [abbrevTerm(s) for s in sources], |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
184 handlers)) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
185 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
186 @STATS.makeSyncPatch.time() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
187 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
188 # todo: this could run all handlers at once, which is how we |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
189 # use it anyway |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
190 adds = [] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
191 dels = [] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
192 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
193 with self.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
194 for stmt, (stmtSources, handlers) in self.table.items(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
195 belongsInHandler = not sources.isdisjoint(stmtSources) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
196 handlerHasIt = handler in handlers |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
197 #log.debug("%s belong=%s has=%s", |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
198 # abbrevStmt(stmt), belongsInHandler, handlerHasIt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
199 if belongsInHandler and not handlerHasIt: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
200 adds.append(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
201 handlers.add(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
202 elif not belongsInHandler and handlerHasIt: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
203 dels.append(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
204 handlers.remove(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
205 if not handlers and not stmtSources: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
206 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
207 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
208 return Patch(addQuads=adds, delQuads=dels) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
209 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
210 def applySourcePatch(self, source: SourceUri, p: Patch): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
211 for stmt in p.addQuads: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
212 sourceUrls, handlers = self.table[stmt] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
213 if source in sourceUrls: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
214 raise ValueError("%s added stmt that it already had: %s" % |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
215 (source, abbrevStmt(stmt))) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
216 sourceUrls.add(source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
217 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
218 with self.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
219 for stmt in p.delQuads: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
220 sourceUrls, handlers = self.table[stmt] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
221 if source not in sourceUrls: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
222 raise ValueError("%s deleting stmt that it didn't have: %s" % |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
223 (source, abbrevStmt(stmt))) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
224 sourceUrls.remove(source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
225 # this is rare, since some handler probably still has |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
226 # the stmt we're deleting, but it can happen e.g. when |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
227 # a handler was just deleted |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
228 if not sourceUrls and not handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
229 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
230 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
231 @STATS.replaceSourceStatements.time() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
232 def replaceSourceStatements(self, source: SourceUri, |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
233 stmts: Sequence[StatementType]): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
234 log.debug('replaceSourceStatements with %s stmts', len(stmts)) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
235 newStmts = set(stmts) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
236 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
237 with self.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
238 for stmt, (sources, handlers) in self.table.items(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
239 if source in sources: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
240 if stmt not in stmts: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
241 sources.remove(source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
242 if not sources and not handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
243 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
244 else: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
245 if stmt in stmts: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
246 sources.add(source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
247 newStmts.discard(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
248 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
249 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
250 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
251 def discardHandler(self, handler: PatchSink): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
252 with self.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
253 for stmt, (sources, handlers) in self.table.items(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
254 handlers.discard(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
255 if not sources and not handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
256 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
257 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
258 def discardSource(self, source: SourceUri): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
259 with self.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
260 for stmt, (sources, handlers) in self.table.items(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
261 sources.discard(source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
262 if not sources and not handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
263 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
264 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
265 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
266 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
267 class GraphClients(object): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
268 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
269 All the active PatchSources and SSEHandlers |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
270 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
271 To handle all the overlapping-statement cases, we store a set of |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
272 true statements along with the sources that are currently |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
273 asserting them and the requesters who currently know them. As |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
274 statements come and go, we make patches to send to requesters. |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
275 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
276 def __init__(self): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
277 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
278 self.handlers: Set[PatchSink] = set() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
279 self.statements: ActiveStatements = ActiveStatements() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
280 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
281 self._localStatements = LocalStatements(self._onPatch) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
282 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
283 def state(self) -> Dict: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
284 return { |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
285 'clients': [ps.state() for ps in self.clients.values()], |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
286 'sseHandlers': [h.state() for h in self.handlers], |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
287 'statements': self.statements.state(), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
288 } |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
289 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
290 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
291 streamId = handler.streamId |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
292 matches = [s for s in config['streams'] if s['id'] == streamId] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
293 if len(matches) != 1: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
294 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
295 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
296 COLLECTOR] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
297 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
298 @STATS.onPatch.time() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
299 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
300 if fullGraph: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
301 # a reconnect may need to resend the full graph even |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
302 # though we've already sent some statements |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
303 self.statements.replaceSourceStatements(source, p.addQuads) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
304 else: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
305 self.statements.applySourcePatch(source, p) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
306 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
307 self._sendUpdatePatch() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
308 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
309 if log.isEnabledFor(logging.DEBUG): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
310 self.statements.pprintTable() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
311 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
312 if source != COLLECTOR: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
313 self._localStatements.setSourceState( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
314 source, |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
315 ROOM['fullGraphReceived'] if fullGraph else |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
316 ROOM['patchesReceived']) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
317 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
318 @STATS.sendUpdatePatch.time() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
319 def _sendUpdatePatch(self, handler: Optional[PatchSink]=None): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
320 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
321 send a patch event out this handler to bring it up to date with |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
322 self.statements |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
323 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
324 now = time.time() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
325 selected = self.handlers |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
326 if handler is not None: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
327 if handler not in self.handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
328 log.error("called _sendUpdatePatch on a handler that's gone") |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
329 return |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
330 selected = {handler} |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
331 # reduce loops here- prepare all patches at once |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
332 for h in selected: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
333 period = .9 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
334 if 'Raspbian' in h.request.headers.get('user-agent', ''): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
335 period = 5 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
336 if h.lastPatchSentTime > now - period: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
337 continue |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
338 p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
339 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
340 if not p.isNoop(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
341 log.debug("send patch %s to %s", p.shortSummary(), h) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
342 # This can be a giant line, which was a problem |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
343 # once. Might be nice for this service to try to break |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
344 # it up into multiple sends, although there's no |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
345 # guarantee at all since any single stmt could be any |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
346 # length. |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
347 h.sendEvent(message=jsonFromPatch(p).encode('utf8'), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
348 event=b'patch') |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
349 h.lastPatchSentTime = now |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
350 else: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
351 log.debug('nothing to send to %s', h) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
352 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
353 def addSseHandler(self, handler: PatchSink): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
354 log.info('addSseHandler %r %r', handler, handler.streamId) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
355 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
356 # fail early if id doesn't match |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
357 sources = self._sourcesForHandler(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
358 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
359 self.handlers.add(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
360 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
361 for source in sources: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
362 if source not in self.clients and source != COLLECTOR: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
363 log.debug('connect to patch source %s', source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
364 self._localStatements.setSourceState(source, ROOM['connect']) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
365 self.clients[source] = ReconnectingPatchSource( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
366 source, |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
367 listener=lambda p, fullGraph, source=source: self._onPatch( |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
368 source, p, fullGraph), |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
369 reconnectSecs=10) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
370 log.debug('bring new client up to date') |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
371 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
372 self._sendUpdatePatch(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
373 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
374 def removeSseHandler(self, handler: PatchSink): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
375 log.info('removeSseHandler %r', handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
376 self.statements.discardHandler(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
377 for source in self._sourcesForHandler(handler): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
378 for otherHandler in self.handlers: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
379 if (otherHandler != handler and |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
380 source in self._sourcesForHandler(otherHandler)): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
381 # still in use |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
382 break |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
383 else: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
384 self._stopClient(source) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
385 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
386 self.handlers.remove(handler) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
387 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
388 def _stopClient(self, url: SourceUri): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
389 if url == COLLECTOR: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
390 return |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
391 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
392 self.clients[url].stop() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
393 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
394 self.statements.discardSource(url) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
395 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
396 self._localStatements.setSourceState(url, None) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
397 if url in self.clients: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
398 del self.clients[url] |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
399 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
400 self.cleanup() |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
401 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
402 def cleanup(self): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
403 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
404 despite the attempts above, we still get useless rows in the table |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
405 sometimes |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
406 """ |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
407 with self.statements.postDeleteStatements() as garbage: |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
408 for stmt, (sources, handlers) in self.statements.table.items(): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
409 if not sources and not any(h in self.handlers for h in handlers): |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
410 garbage.add(stmt) |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
411 |
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
412 |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
413 class State(cyclone.web.RequestHandler): |
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
414 @STATS.getState.time() |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
415 def get(self) -> None: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
416 try: |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
417 state = self.settings.graphClients.state() |
1398
53d7168bbe4c
standardize build. fix /state report
drewp <drewp@bigasterisk.com>
parents:
1273
diff
changeset
|
418 self.write(json.dumps({'graphClients': state}, indent=2, |
53d7168bbe4c
standardize build. fix /state report
drewp <drewp@bigasterisk.com>
parents:
1273
diff
changeset
|
419 default=lambda obj: '<unserializable>')) |
53d7168bbe4c
standardize build. fix /state report
drewp <drewp@bigasterisk.com>
parents:
1273
diff
changeset
|
420 except Exception: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
421 import traceback; traceback.print_exc() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
422 raise |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
423 |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
424 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
425 class Root(cyclone.web.RequestHandler): |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
426 def get(self) -> None: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
427 self.write('<html><body>sse_collector</body></html>') |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
428 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
429 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
430 if __name__ == '__main__': |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
431 arg = docopt(""" |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
432 Usage: sse_collector.py [options] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
433 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
434 -v Verbose |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
435 """) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
436 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
437 if arg['-v']: |
1252
9cfa7f69e41f
collector partial py3+types update. WIP
drewp <drewp@bigasterisk.com>
parents:
1249
diff
changeset
|
438 enableTwistedLog() |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
439 log.setLevel(logging.DEBUG) |
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
440 defer.setDebugging(True) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
441 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
442 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
443 graphClients = GraphClients() |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
444 #exporter = InfluxExporter(... to export some stats values |
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
445 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
446 reactor.listenTCP( |
1254
666f9a2198a7
add types to sse_collector.py. Surprisingly few bugs found.
drewp <drewp@bigasterisk.com>
parents:
1252
diff
changeset
|
447 9072, |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
448 cyclone.web.Application( |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
449 handlers=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
450 (r'/', Root), |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
451 (r'/state', State), |
1273
6f27fe20f6eb
py3 updates. some other refactors.
drewp <drewp@bigasterisk.com>
parents:
1254
diff
changeset
|
452 (r'/graph/(.*)', PatchSink), |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
453 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}), |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
454 ], |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
455 graphClients=graphClients), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
456 interface='::') |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
457 reactor.run() |