Mercurial > code > home > repos > homeauto
annotate service/collector/sse_collector.py @ 1249:2ddfb2cf95ab
rollback the unicode(source) optimization. it was breaking all output to patch consumers
Ignore-this: 4fabc1d106299a8094b85ff9909c660e
darcs-hash:73268de0ab252948f87dbcabbc90cf0c6f9bd752
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Thu, 18 Apr 2019 16:55:52 -0700 |
parents | 723808d0a92c |
children | 9cfa7f69e41f |
rev | line source |
---|---|
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
1 from __future__ import division |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
2 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
3 requesting /graph/foo returns an SSE patch stream that's the |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
4 result of fetching multiple other SSE patch streams. The result stream |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
5 may include new statements injected by this service. |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
6 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
7 Future: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
8 - filter out unneeded stmts from the sources |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
9 - give a time resolution and concatenate any patches that come faster than that res |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
10 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
11 import sys, logging, collections, json, time |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
12 from twisted.internet import reactor, defer |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
13 import cyclone.web, cyclone.sse |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
14 from rdflib import URIRef, Namespace |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
15 from docopt import docopt |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
16 from greplin import scales |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
17 from greplin.scales.cyclonehandler import StatsHandler |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
18 from logsetup import log, enableTwistedLog |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
19 from logsetup import log |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
20 from patchablegraph import jsonFromPatch |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
21 from rdfdb.patch import Patch |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
22 from patchsource import ReconnectingPatchSource |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
23 from sse_collector_config import config |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
24 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
25 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
26 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
27 |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
28 STATS = scales.collection('/root', |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
29 scales.PmfStat('getState'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
30 scales.PmfStat('localStatementsPatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
31 scales.PmfStat('makeSyncPatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
32 scales.PmfStat('onPatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
33 scales.PmfStat('sendUpdatePatch'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
34 scales.PmfStat('replaceSourceStatements'), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
35 ) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
36 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
37 class LocalStatements(object): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
38 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
39 functions that make statements originating from sse_collector itself |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
40 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
41 def __init__(self, applyPatch): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
42 self.applyPatch = applyPatch |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
43 self._sourceState = {} # source: state URIRef |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
44 |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
45 @STATS.localStatementsPatch.time() |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
46 def setSourceState(self, source, state): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
47 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
48 add a patch to the COLLECTOR graph about the state of this |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
49 source. state=None to remove the source. |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
50 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
51 oldState = self._sourceState.get(source, None) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
52 if state == oldState: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
53 return |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
54 log.info('source state %s -> %s', source, state) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
55 if oldState is None: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
56 self._sourceState[source] = state |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
57 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
58 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
59 (source, ROOM['state'], state, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
60 ])) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
61 elif state is None: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
62 del self._sourceState[source] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
63 self.applyPatch(COLLECTOR, Patch(delQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
64 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
65 (source, ROOM['state'], oldState, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
66 ])) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
67 else: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
68 self._sourceState[source] = state |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
69 self.applyPatch(COLLECTOR, Patch( |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
70 addQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
71 (source, ROOM['state'], state, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
72 ], |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
73 delQuads=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
74 (source, ROOM['state'], oldState, COLLECTOR), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
75 ])) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
76 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
77 def abbrevTerm(t): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
78 if isinstance(t, URIRef): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
79 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') |
1249
2ddfb2cf95ab
rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp <drewp@bigasterisk.com>
parents:
1247
diff
changeset
|
80 .replace('http://projects.bigasterisk.com/device/', 'dev:') |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
81 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
82 return t |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
83 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
84 def abbrevStmt(stmt): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
85 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
86 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
87 class ActiveStatements(object): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
88 def __init__(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
89 # This table holds statements asserted by any of our sources |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
90 # plus local statements that we introduce (source is |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
91 # http://bigasterisk.com/sse_collector/). |
1249
2ddfb2cf95ab
rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp <drewp@bigasterisk.com>
parents:
1247
diff
changeset
|
92 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
93 |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
94 def state(self): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
95 return { |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
96 'len': len(self.statements), |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
97 } |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
98 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
99 def _postDeleteStatements(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
100 statements = self.statements |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
101 class PostDeleter(object): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
102 def __enter__(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
103 self._garbage = [] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
104 return self |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
105 def add(self, stmt): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
106 self._garbage.append(stmt) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
107 def __exit__(self, type, value, traceback): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
108 if type is not None: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
109 raise |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
110 for stmt in self._garbage: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
111 del statements[stmt] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
112 return PostDeleter() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
113 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
114 def pprintTable(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
115 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
116 print "%03d. %-80s from %s to %s" % ( |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
117 i, |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
118 abbrevStmt(stmt), |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
119 [abbrevTerm(s) for s in sources], |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
120 handlers) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
121 |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
122 @STATS.makeSyncPatch.time() |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
123 def makeSyncPatch(self, handler, sources): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
124 # todo: this could run all handlers at once, which is how we use it anyway |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
125 adds = [] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
126 dels = [] |
1247
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
127 |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
128 sources_set = set(sources) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
129 with self._postDeleteStatements() as garbage: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
130 for stmt, (stmtSources, handlers) in self.statements.iteritems(): |
1247
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
131 belongsInHandler = not sources_set.isdisjoint(stmtSources) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
132 handlerHasIt = handler in handlers |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
133 log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
134 if belongsInHandler and not handlerHasIt: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
135 adds.append(stmt) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
136 handlers.add(handler) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
137 elif not belongsInHandler and handlerHasIt: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
138 dels.append(stmt) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
139 handlers.remove(handler) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
140 if not handlers and not stmtSources: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
141 garbage.add(stmt) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
142 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
143 return Patch(addQuads=adds, delQuads=dels) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
144 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
145 def applySourcePatch(self, source, p): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
146 for stmt in p.addQuads: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
147 sourceUrls, handlers = self.statements[stmt] |
1249
2ddfb2cf95ab
rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp <drewp@bigasterisk.com>
parents:
1247
diff
changeset
|
148 if source in sourceUrls: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
149 raise ValueError("%s added stmt that it already had: %s" % |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
150 (source, abbrevStmt(stmt))) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
151 sourceUrls.add(source) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
152 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
153 with self._postDeleteStatements() as garbage: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
154 for stmt in p.delQuads: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
155 sourceUrls, handlers = self.statements[stmt] |
1249
2ddfb2cf95ab
rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp <drewp@bigasterisk.com>
parents:
1247
diff
changeset
|
156 if source not in sourceUrls: |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
157 raise ValueError("%s deleting stmt that it didn't have: %s" % |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
158 (source, abbrevStmt(stmt))) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
159 sourceUrls.remove(source) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
160 # this is rare, since some handler probably still has |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
161 # the stmt we're deleting, but it can happen e.g. when |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
162 # a handler was just deleted |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
163 if not sourceUrls and not handlers: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
164 garbage.add(stmt) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
165 |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
166 @STATS.replaceSourceStatements.time() |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
167 def replaceSourceStatements(self, source, stmts): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
168 log.debug('replaceSourceStatements with %s stmts', len(stmts)) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
169 newStmts = set(stmts) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
170 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
171 with self._postDeleteStatements() as garbage: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
172 for stmt, (sources, handlers) in self.statements.iteritems(): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
173 if source in sources: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
174 if stmt not in stmts: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
175 sources.remove(source) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
176 if not sources and not handlers: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
177 garbage.add(stmt) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
178 else: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
179 if stmt in stmts: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
180 sources.add(source) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
181 newStmts.discard(stmt) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
182 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
183 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
184 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
185 def discardHandler(self, handler): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
186 with self._postDeleteStatements() as garbage: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
187 for stmt, (sources, handlers) in self.statements.iteritems(): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
188 handlers.discard(handler) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
189 if not sources and not handlers: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
190 garbage.add(stmt) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
191 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
192 def discardSource(self, source): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
193 with self._postDeleteStatements() as garbage: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
194 for stmt, (sources, handlers) in self.statements.iteritems(): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
195 sources.discard(source) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
196 if not sources and not handlers: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
197 garbage.add(stmt) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
198 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
199 class GraphClients(object): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
200 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
201 All the active PatchSources and SSEHandlers |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
202 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
203 To handle all the overlapping-statement cases, we store a set of |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
204 true statements along with the sources that are currently |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
205 asserting them and the requesters who currently know them. As |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
206 statements come and go, we make patches to send to requesters. |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
207 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
208 def __init__(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
209 self.clients = {} # url: PatchSource (COLLECTOR is not listed) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
210 self.handlers = set() # handler |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
211 self.statements = ActiveStatements() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
212 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
213 self._localStatements = LocalStatements(self._onPatch) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
214 |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
215 def state(self): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
216 return { |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
217 'clients': [ps.state() for ps in self.clients.values()], |
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
218 'sseHandlers': [h.state() for h in self.handlers], |
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
219 'statements': self.statements.state(), |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
220 } |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
221 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
222 def _sourcesForHandler(self, handler): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
223 streamId = handler.streamId |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
224 matches = [s for s in config['streams'] if s['id'] == streamId] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
225 if len(matches) != 1: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
226 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
227 return map(URIRef, matches[0]['sources']) + [COLLECTOR] |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
228 |
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
229 @STATS.onPatch.time() |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
230 def _onPatch(self, source, p, fullGraph=False): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
231 if fullGraph: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
232 # a reconnect may need to resend the full graph even |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
233 # though we've already sent some statements |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
234 self.statements.replaceSourceStatements(source, p.addQuads) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
235 else: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
236 self.statements.applySourcePatch(source, p) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
237 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
238 self._sendUpdatePatch() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
239 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
240 if log.isEnabledFor(logging.DEBUG): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
241 self.statements.pprintTable() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
242 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
243 if source != COLLECTOR: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
244 self._localStatements.setSourceState( |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
245 source, |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
246 ROOM['fullGraphReceived'] if fullGraph else |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
247 ROOM['patchesReceived']) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
248 |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
249 @STATS.sendUpdatePatch.time() |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
250 def _sendUpdatePatch(self, handler=None): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
251 """ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
252 send a patch event out this handler to bring it up to date with |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
253 self.statements |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
254 """ |
1247
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
255 now = time.time() |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
256 # reduce loops here- prepare all patches at once |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
257 for h in (self.handlers if handler is None else [handler]): |
1247
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
258 period = 1 |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
259 if 'Raspbian' in h.request.headers.get('user-agent'): |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
260 period = 5 |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
261 if h.lastPatchSentTime > now - period: |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
262 continue |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
263 p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h)) |
1246
f18b95f81d43
more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents:
1245
diff
changeset
|
264 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
265 if not p.isNoop(): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
266 log.debug("send patch %s to %s", p.shortSummary(), h) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
267 # This can be a giant line, which was a problem once. Might be |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
268 # nice for this service to try to break it up into multiple sends, |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
269 # although there's no guarantee at all since any single stmt |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
270 # could be any length. |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
271 h.sendEvent(message=jsonFromPatch(p), event='patch') |
1247
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
272 h.lastPatchSentTime = now |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
273 else: |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
274 log.debug('nothing to send to %s', h) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
275 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
276 def addSseHandler(self, handler): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
277 log.info('addSseHandler %r %r', handler, handler.streamId) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
278 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
279 # fail early if id doesn't match |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
280 sources = self._sourcesForHandler(handler) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
281 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
282 self.handlers.add(handler) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
283 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
284 for source in sources: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
285 if source not in self.clients and source != COLLECTOR: |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
286 log.debug('connect to patch source %s', source) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
287 self._localStatements.setSourceState(source, ROOM['connect']) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
288 self.clients[source] = ReconnectingPatchSource( |
1247
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
289 source, |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
290 listener=lambda p, fullGraph, source=source: self._onPatch( |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
291 source, p, fullGraph), |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
292 reconnectSecs=10) |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
293 log.debug('bring new client up to date') |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
294 handler.sendEvent(message='hello', event='greets') |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
295 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
296 self._sendUpdatePatch(handler) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
297 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
298 def removeSseHandler(self, handler): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
299 log.info('removeSseHandler %r', handler) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
300 self.statements.discardHandler(handler) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
301 for source in self._sourcesForHandler(handler): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
302 for otherHandler in self.handlers: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
303 if (otherHandler != handler and |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
304 source in self._sourcesForHandler(otherHandler)): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
305 break |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
306 else: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
307 self._stopClient(source) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
308 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
309 self.handlers.remove(handler) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
310 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
311 def _stopClient(self, url): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
312 if url == COLLECTOR: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
313 return |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
314 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
315 self.clients[url].stop() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
316 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
317 self.statements.discardSource(url) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
318 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
319 self._localStatements.setSourceState(url, None) |
1247
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
320 if url in self.clients: |
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
321 del self.clients[url] |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
322 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
323 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
324 class SomeGraph(cyclone.sse.SSEHandler): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
325 _handlerSerial = 0 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
326 def __init__(self, application, request): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
327 cyclone.sse.SSEHandler.__init__(self, application, request) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
328 self.streamId = request.uri[len('/graph/'):] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
329 self.graphClients = self.settings.graphClients |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
330 self.created = time.time() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
331 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
332 self._serial = SomeGraph._handlerSerial |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
333 SomeGraph._handlerSerial += 1 |
1247
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
334 self.lastPatchSentTime = 0 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
335 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
336 def __repr__(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
337 return '<Handler #%s>' % self._serial |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
338 |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
339 def state(self): |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
340 return { |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
341 'created': round(self.created, 2), |
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
342 'ageHours': round((time.time() - self.created) / 3600, 2), |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
343 'streamId': self.streamId, |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
344 'remoteIp': self.request.remote_ip, |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
345 'userAgent': self.request.headers.get('user-agent'), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
346 } |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
347 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
348 def bind(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
349 self.graphClients.addSseHandler(self) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
350 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
351 def unbind(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
352 self.graphClients.removeSseHandler(self) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
353 |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
354 class State(cyclone.web.RequestHandler): |
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
355 @STATS.getState.time() |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
356 def get(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
357 try: |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
358 state = self.settings.graphClients.state() |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
359 except: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
360 import traceback; traceback.print_exc() |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
361 raise |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
362 |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
363 self.write(json.dumps({'graphClients': state}, indent=2)) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
364 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
365 class Root(cyclone.web.RequestHandler): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
366 def get(self): |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
367 self.write('<html><body>sse_collector</body></html>') |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
368 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
369 if __name__ == '__main__': |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
370 arg = docopt(""" |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
371 Usage: sse_collector.py [options] |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
372 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
373 -v Verbose |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
374 """) |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
375 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
376 if arg['-v']: |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
377 import twisted.python.log |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
378 twisted.python.log.startLogging(sys.stdout) |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
379 log.setLevel(logging.DEBUG) |
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
380 defer.setDebugging(True) |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
381 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
382 |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
383 graphClients = GraphClients() |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
384 #exporter = InfluxExporter(... to export some stats values |
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
385 |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
386 reactor.listenTCP( |
1247
723808d0a92c
WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents:
1246
diff
changeset
|
387 19072, |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
388 cyclone.web.Application( |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
389 handlers=[ |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
390 (r'/', Root), |
1242
24c004aac998
stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents:
1158
diff
changeset
|
391 (r'/state', State), |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
392 (r'/graph/(.*)', SomeGraph), |
1245
4c123099e5b6
collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents:
1242
diff
changeset
|
393 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}), |
1156
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
394 ], |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
395 graphClients=graphClients), |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
396 interface='::') |
ee168d55524a
reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
397 reactor.run() |