annotate service/collector/sse_collector.py @ 443:2f7bc2ecf6b5

more of the stats and logging patch for collector Ignore-this: 3c69ae831f7759282c64ce7204f424ea
author drewp@bigasterisk.com
date Thu, 18 Apr 2019 09:17:00 -0700
parents ee74dc3b58fb
children 96d712dccf28
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
1 from __future__ import division
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
2 """
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
3 requesting /graph/foo returns an SSE patch stream that's the
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
4 result of fetching multiple other SSE patch streams. The result stream
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
5 may include new statements injected by this service.
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
6
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
7 Future:
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
8 - filter out unneeded stmts from the sources
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
9 - give a time resolution and concatenate any patches that come faster than that res
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
10 """
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
11 from crochet import no_setup
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
12 no_setup()
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
13
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
14 import sys, logging, collections, json, time
442
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
15 from twisted.internet import reactor, defer
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
16 import cyclone.web, cyclone.sse
302
46c5fae89823 factor out patchsource
drewp@bigasterisk.com
parents: 301
diff changeset
17 from rdflib import URIRef, Namespace
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
18 from docopt import docopt
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
19 from greplin import scales
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
20 from greplin.scales.cyclonehandler import StatsHandler
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
21 from logsetup import log, enableTwistedLog
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
22 from logsetup import log
302
46c5fae89823 factor out patchsource
drewp@bigasterisk.com
parents: 301
diff changeset
23 from patchablegraph import jsonFromPatch
351
7716b1810d6c reasoning & collector move into docker images
drewp@bigasterisk.com
parents: 316
diff changeset
24 from rdfdb.patch import Patch
302
46c5fae89823 factor out patchsource
drewp@bigasterisk.com
parents: 301
diff changeset
25 from patchsource import ReconnectingPatchSource
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
26 from sse_collector_config import config
302
46c5fae89823 factor out patchsource
drewp@bigasterisk.com
parents: 301
diff changeset
27
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
28 ROOM = Namespace("http://projects.bigasterisk.com/room/")
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
29 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
30
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
31 STATS = scales.collection('/root',
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
32 scales.PmfStat('getState'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
33 scales.PmfStat('localStatementsPatch'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
34 scales.PmfStat('makeSyncPatch'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
35 scales.PmfStat('onPatch'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
36 scales.PmfStat('sendUpdatePatch'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
37 scales.PmfStat('replaceSourceStatements'),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
38 )
351
7716b1810d6c reasoning & collector move into docker images
drewp@bigasterisk.com
parents: 316
diff changeset
39
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
40 class LocalStatements(object):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
41 """
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
42 functions that make statements originating from sse_collector itself
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
43 """
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
44 def __init__(self, applyPatch):
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
45 self.applyPatch = applyPatch
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
46 self._sourceState = {} # source: state URIRef
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
47
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
48 @STATS.localStatementsPatch.time()
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
49 def setSourceState(self, source, state):
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
50 """
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
51 add a patch to the COLLECTOR graph about the state of this
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
52 source. state=None to remove the source.
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
53 """
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
54 oldState = self._sourceState.get(source, None)
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
55 if state == oldState:
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
56 return
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
57 log.info('source state %s -> %s', source, state)
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
58 if oldState is None:
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
59 self._sourceState[source] = state
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
60 self.applyPatch(COLLECTOR, Patch(addQuads=[
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
61 (COLLECTOR, ROOM['source'], source, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
62 (source, ROOM['state'], state, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
63 ]))
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
64 elif state is None:
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
65 del self._sourceState[source]
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
66 self.applyPatch(COLLECTOR, Patch(delQuads=[
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
67 (COLLECTOR, ROOM['source'], source, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
68 (source, ROOM['state'], oldState, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
69 ]))
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
70 else:
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
71 self._sourceState[source] = state
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
72 self.applyPatch(COLLECTOR, Patch(
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
73 addQuads=[
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
74 (source, ROOM['state'], state, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
75 ],
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
76 delQuads=[
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
77 (source, ROOM['state'], oldState, COLLECTOR),
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
78 ]))
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
79
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
80 def abbrevTerm(t):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
81 if isinstance(t, URIRef):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
82 return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
83 .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
84 return t
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
85
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
86 def abbrevStmt(stmt):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
87 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt))
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
88
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
89 class ActiveStatements(object):
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
90 def __init__(self):
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
91
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
92 # This table holds statements asserted by any of our sources
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
93 # plus local statements that we introduce (source is
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
94 # http://bigasterisk.com/sse_collector/).
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
95 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)`
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
96
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
97 def state(self):
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
98 return {
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
99 'len': len(self.statements),
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
100 }
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
101
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
102 def _postDeleteStatements(self):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
103 statements = self.statements
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
104 class PostDeleter(object):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
105 def __enter__(self):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
106 self._garbage = []
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
107 return self
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
108 def add(self, stmt):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
109 self._garbage.append(stmt)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
110 def __exit__(self, type, value, traceback):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
111 if type is not None:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
112 raise
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
113 for stmt in self._garbage:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
114 del statements[stmt]
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
115 return PostDeleter()
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
116
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
117 def pprintTable(self):
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
118 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
119 print "%03d. %-80s from %s to %s" % (
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
120 i,
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
121 abbrevStmt(stmt),
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
122 [abbrevTerm(s) for s in sources],
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
123 handlers)
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
124
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
125 @STATS.makeSyncPatch.time()
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
126 def makeSyncPatch(self, handler, sources):
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
127 # todo: this could run all handlers at once, which is how we use it anyway
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
128 adds = []
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
129 dels = []
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
130
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
131 with self._postDeleteStatements() as garbage:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
132 for stmt, (stmtSources, handlers) in self.statements.iteritems():
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
133 belongsInHandler = not set(sources).isdisjoint(stmtSources)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
134 handlerHasIt = handler in handlers
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
135 log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
136 if belongsInHandler and not handlerHasIt:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
137 adds.append(stmt)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
138 handlers.add(handler)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
139 elif not belongsInHandler and handlerHasIt:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
140 dels.append(stmt)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
141 handlers.remove(handler)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
142 if not handlers and not stmtSources:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
143 garbage.add(stmt)
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
144
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
145 return Patch(addQuads=adds, delQuads=dels)
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
146
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
147 def applySourcePatch(self, source, p):
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
148 for stmt in p.addQuads:
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
149 sourceUrls, handlers = self.statements[stmt]
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
150 if source in sourceUrls:
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
151 raise ValueError("%s added stmt that it already had: %s" %
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
152 (source, abbrevStmt(stmt)))
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
153 sourceUrls.add(source)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
154
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
155 with self._postDeleteStatements() as garbage:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
156 for stmt in p.delQuads:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
157 sourceUrls, handlers = self.statements[stmt]
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
158 if source not in sourceUrls:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
159 raise ValueError("%s deleting stmt that it didn't have: %s" %
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
160 (source, abbrevStmt(stmt)))
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
161 sourceUrls.remove(source)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
162 # this is rare, since some handler probably still has
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
163 # the stmt we're deleting, but it can happen e.g. when
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
164 # a handler was just deleted
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
165 if not sourceUrls and not handlers:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
166 garbage.add(stmt)
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
167
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
168 @STATS.replaceSourceStatements.time()
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
169 def replaceSourceStatements(self, source, stmts):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
170 log.debug('replaceSourceStatements with %s stmts', len(stmts))
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
171 newStmts = set(stmts)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
172
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
173 with self._postDeleteStatements() as garbage:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
174 for stmt, (sources, handlers) in self.statements.iteritems():
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
175 if source in sources:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
176 if stmt not in stmts:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
177 sources.remove(source)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
178 if not sources and not handlers:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
179 garbage.add(stmt)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
180 else:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
181 if stmt in stmts:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
182 sources.add(source)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
183 newStmts.discard(stmt)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
184
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
185 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
186
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
187 def discardHandler(self, handler):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
188 with self._postDeleteStatements() as garbage:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
189 for stmt, (sources, handlers) in self.statements.iteritems():
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
190 handlers.discard(handler)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
191 if not sources and not handlers:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
192 garbage.add(stmt)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
193
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
194 def discardSource(self, source):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
195 with self._postDeleteStatements() as garbage:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
196 for stmt, (sources, handlers) in self.statements.iteritems():
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
197 sources.discard(source)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
198 if not sources and not handlers:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
199 garbage.add(stmt)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
200
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
201 class GraphClients(object):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
202 """
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
203 All the active PatchSources and SSEHandlers
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
204
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
205 To handle all the overlapping-statement cases, we store a set of
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
206 true statements along with the sources that are currently
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
207 asserting them and the requesters who currently know them. As
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
208 statements come and go, we make patches to send to requesters.
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
209 """
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
210 def __init__(self):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
211 self.clients = {} # url: PatchSource (COLLECTOR is not listed)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
212 self.handlers = set() # handler
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
213 self.statements = ActiveStatements()
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
214
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
215 self._localStatements = LocalStatements(self._onPatch)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
216
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
217 def state(self):
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
218 return {
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
219 'clients': [ps.state() for ps in self.clients.values()],
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
220 'sseHandlers': [h.state() for h in self.handlers],
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
221 'statements': self.statements.state(),
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
222 }
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
223
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
224 def _sourcesForHandler(self, handler):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
225 streamId = handler.streamId
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
226 matches = [s for s in config['streams'] if s['id'] == streamId]
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
227 if len(matches) != 1:
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
228 raise ValueError("%s matches for %r" % (len(matches), streamId))
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
229 return map(URIRef, matches[0]['sources']) + [COLLECTOR]
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
230
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
231 @STATS.onPatch.time()
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
232 def _onPatch(self, source, p, fullGraph=False):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
233 if fullGraph:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
234 # a reconnect may need to resend the full graph even
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
235 # though we've already sent some statements
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
236 self.statements.replaceSourceStatements(source, p.addQuads)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
237 else:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
238 self.statements.applySourcePatch(source, p)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
239
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
240 self._sendUpdatePatch()
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
241
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
242 if log.isEnabledFor(logging.DEBUG):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
243 self.statements.pprintTable()
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
244
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
245 if source != COLLECTOR:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
246 self._localStatements.setSourceState(
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
247 source,
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
248 ROOM['fullGraphReceived'] if fullGraph else
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
249 ROOM['patchesReceived'])
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
250
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
251 @STATS.sendUpdatePatch.time()
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
252 def _sendUpdatePatch(self, handler=None):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
253 """
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
254 send a patch event out this handler to bring it up to date with
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
255 self.statements
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
256 """
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
257 # reduce loops here- prepare all patches at once
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
258 for h in (self.handlers if handler is None else [handler]):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
259 p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h))
443
2f7bc2ecf6b5 more of the stats and logging patch for collector
drewp@bigasterisk.com
parents: 442
diff changeset
260 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
261 if not p.isNoop():
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
262 log.debug("send patch %s to %s", p.shortSummary(), h)
316
02d9915b3bbb patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents: 313
diff changeset
263 # This can be a giant line, which was a problem once. Might be
02d9915b3bbb patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents: 313
diff changeset
264 # nice for this service to try to break it up into multiple sends,
02d9915b3bbb patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents: 313
diff changeset
265 # although there's no guarantee at all since any single stmt
02d9915b3bbb patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents: 313
diff changeset
266 # could be any length.
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
267 h.sendEvent(message=jsonFromPatch(p), event='patch')
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
268
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
269 def addSseHandler(self, handler):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
270 log.info('addSseHandler %r %r', handler, handler.streamId)
313
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
271
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
272 # fail early if id doesn't match
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
273 sources = self._sourcesForHandler(handler)
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
274
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
275 self.handlers.add(handler)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
276
313
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
277 for source in sources:
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
278 if source not in self.clients and source != COLLECTOR:
442
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
279 log.debug('connect to patch source %s', source)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
280 self._localStatements.setSourceState(source, ROOM['connect'])
302
46c5fae89823 factor out patchsource
drewp@bigasterisk.com
parents: 301
diff changeset
281 self.clients[source] = ReconnectingPatchSource(
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
282 source, listener=lambda p, fullGraph, source=source: self._onPatch(
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
283 source, p, fullGraph))
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
284 self._sendUpdatePatch(handler)
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
285
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
286 def removeSseHandler(self, handler):
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
287 log.info('removeSseHandler %r', handler)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
288 self.statements.discardHandler(handler)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
289 for source in self._sourcesForHandler(handler):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
290 for otherHandler in self.handlers:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
291 if (otherHandler != handler and
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
292 source in self._sourcesForHandler(otherHandler)):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
293 break
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
294 else:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
295 self._stopClient(source)
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
296
298
8d89da1915df sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents: 296
diff changeset
297 self.handlers.remove(handler)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
298
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
299 def _stopClient(self, url):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
300 if url == COLLECTOR:
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
301 return
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
302
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
303 self.clients[url].stop()
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
304
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
305 self.statements.discardSource(url)
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
306
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
307 self._localStatements.setSourceState(url, None)
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
308 del self.clients[url]
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
309
300
371af6e92b5e local state statements and self.statements rewrite
drewp@bigasterisk.com
parents: 299
diff changeset
310
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
311 class SomeGraph(cyclone.sse.SSEHandler):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
312 _handlerSerial = 0
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
313 def __init__(self, application, request):
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
314 cyclone.sse.SSEHandler.__init__(self, application, request)
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
315 self.streamId = request.uri[len('/graph/'):]
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
316 self.graphClients = self.settings.graphClients
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
317 self.created = time.time()
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
318
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
319 self._serial = SomeGraph._handlerSerial
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
320 SomeGraph._handlerSerial += 1
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
321
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
322 def __repr__(self):
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
323 return '<Handler #%s>' % self._serial
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
324
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
325 def state(self):
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
326 return {
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
327 'created': round(self.created, 2),
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
328 'ageHours': round((time.time() - self.created) / 3600, 2),
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
329 'streamId': self.streamId,
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
330 'remoteIp': self.request.remote_ip,
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
331 'userAgent': self.request.headers.get('user-agent'),
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
332 }
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
333
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
334 def bind(self):
301
29f593aee67b big rewrites in sse_collector
drewp@bigasterisk.com
parents: 300
diff changeset
335 self.graphClients.addSseHandler(self)
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
336
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
337 def unbind(self):
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
338 self.graphClients.removeSseHandler(self)
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
339
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
340 class State(cyclone.web.RequestHandler):
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
341 @STATS.getState.time()
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
342 def get(self):
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
343 try:
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
344 state = self.settings.graphClients.state()
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
345 except:
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
346 import traceback; traceback.print_exc()
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
347 raise
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
348
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
349 self.write(json.dumps({'graphClients': state}, indent=2))
313
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
350
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
351 class Root(cyclone.web.RequestHandler):
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
352 def get(self):
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
353 self.write('<html><body>sse_collector</body></html>')
306
6aad04b34231 sse_collector stats page
drewp@bigasterisk.com
parents: 303
diff changeset
354
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
355 if __name__ == '__main__':
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
356 arg = docopt("""
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
357 Usage: sse_collector.py [options]
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
358
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
359 -v Verbose
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
360 """)
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
361
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
362 if arg['-v']:
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
363 import twisted.python.log
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
364 twisted.python.log.startLogging(sys.stdout)
442
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
365 log.setLevel(logging.DEBUG)
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
366 defer.setDebugging(True)
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
367
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
368
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
369 graphClients = GraphClients()
442
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
370 #exporter = InfluxExporter(... to export some stats values
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
371
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
372 reactor.listenTCP(
299
5084a1f719c9 port 9072
drewp@bigasterisk.com
parents: 298
diff changeset
373 9072,
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
374 cyclone.web.Application(
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
375 handlers=[
313
bfc3f246e77e new / page
drewp@bigasterisk.com
parents: 306
diff changeset
376 (r'/', Root),
439
124c921ad52d stats->state to make room for greplin stats
drewp@bigasterisk.com
parents: 353
diff changeset
377 (r'/state', State),
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
378 (r'/graph/(.*)', SomeGraph),
442
ee74dc3b58fb collector build improvements; stats and logging
drewp@bigasterisk.com
parents: 439
diff changeset
379 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}),
296
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
380 ],
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
381 graphClients=graphClients),
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
382 interface='::')
233b81cf2712 start sse_collector
drewp@bigasterisk.com
parents:
diff changeset
383 reactor.run()