annotate service/collector/sse_collector.py @ 1249:2ddfb2cf95ab

rollback the unicode(source) optimization. it was breaking all output to patch consumers Ignore-this: 4fabc1d106299a8094b85ff9909c660e darcs-hash:73268de0ab252948f87dbcabbc90cf0c6f9bd752
author drewp <drewp@bigasterisk.com>
date Thu, 18 Apr 2019 16:55:52 -0700
parents 723808d0a92c
children 9cfa7f69e41f
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
1 from __future__ import division
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
2 """
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
3 requesting /graph/foo returns an SSE patch stream that's the
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
4 result of fetching multiple other SSE patch streams. The result stream
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
5 may include new statements injected by this service.
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
6
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
7 Future:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
8 - filter out unneeded stmts from the sources
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
9 - give a time resolution and concatenate any patches that come faster than that res
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
10 """
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
11 import sys, logging, collections, json, time
1245
4c123099e5b6 collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents: 1242
diff changeset
12 from twisted.internet import reactor, defer
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
13 import cyclone.web, cyclone.sse
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
14 from rdflib import URIRef, Namespace
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
15 from docopt import docopt
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
16 from greplin import scales
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
17 from greplin.scales.cyclonehandler import StatsHandler
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
18 from logsetup import log, enableTwistedLog
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
19 from logsetup import log
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
20 from patchablegraph import jsonFromPatch
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
21 from rdfdb.patch import Patch
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
22 from patchsource import ReconnectingPatchSource
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
23 from sse_collector_config import config
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
24
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
25 ROOM = Namespace("http://projects.bigasterisk.com/room/")
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
26 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
27
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
28 STATS = scales.collection('/root',
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
29 scales.PmfStat('getState'),
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
30 scales.PmfStat('localStatementsPatch'),
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
31 scales.PmfStat('makeSyncPatch'),
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
32 scales.PmfStat('onPatch'),
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
33 scales.PmfStat('sendUpdatePatch'),
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
34 scales.PmfStat('replaceSourceStatements'),
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
35 )
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
36
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
37 class LocalStatements(object):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
38 """
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
39 functions that make statements originating from sse_collector itself
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
40 """
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
41 def __init__(self, applyPatch):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
42 self.applyPatch = applyPatch
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
43 self._sourceState = {} # source: state URIRef
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
44
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
45 @STATS.localStatementsPatch.time()
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
46 def setSourceState(self, source, state):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
47 """
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
48 add a patch to the COLLECTOR graph about the state of this
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
49 source. state=None to remove the source.
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
50 """
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
51 oldState = self._sourceState.get(source, None)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
52 if state == oldState:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
53 return
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
54 log.info('source state %s -> %s', source, state)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
55 if oldState is None:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
56 self._sourceState[source] = state
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
57 self.applyPatch(COLLECTOR, Patch(addQuads=[
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
58 (COLLECTOR, ROOM['source'], source, COLLECTOR),
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
59 (source, ROOM['state'], state, COLLECTOR),
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
60 ]))
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
61 elif state is None:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
62 del self._sourceState[source]
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
63 self.applyPatch(COLLECTOR, Patch(delQuads=[
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
64 (COLLECTOR, ROOM['source'], source, COLLECTOR),
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
65 (source, ROOM['state'], oldState, COLLECTOR),
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
66 ]))
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
67 else:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
68 self._sourceState[source] = state
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
69 self.applyPatch(COLLECTOR, Patch(
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
70 addQuads=[
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
71 (source, ROOM['state'], state, COLLECTOR),
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
72 ],
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
73 delQuads=[
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
74 (source, ROOM['state'], oldState, COLLECTOR),
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
75 ]))
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
76
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
77 def abbrevTerm(t):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
78 if isinstance(t, URIRef):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
79 return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
1249
2ddfb2cf95ab rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp <drewp@bigasterisk.com>
parents: 1247
diff changeset
80 .replace('http://projects.bigasterisk.com/device/', 'dev:')
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
81 .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
82 return t
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
83
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
84 def abbrevStmt(stmt):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
85 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt))
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
86
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
87 class ActiveStatements(object):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
88 def __init__(self):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
89 # This table holds statements asserted by any of our sources
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
90 # plus local statements that we introduce (source is
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
91 # http://bigasterisk.com/sse_collector/).
1249
2ddfb2cf95ab rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp <drewp@bigasterisk.com>
parents: 1247
diff changeset
92 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
93
1242
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
94 def state(self):
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
95 return {
1242
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
96 'len': len(self.statements),
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
97 }
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
98
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
99 def _postDeleteStatements(self):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
100 statements = self.statements
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
101 class PostDeleter(object):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
102 def __enter__(self):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
103 self._garbage = []
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
104 return self
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
105 def add(self, stmt):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
106 self._garbage.append(stmt)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
107 def __exit__(self, type, value, traceback):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
108 if type is not None:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
109 raise
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
110 for stmt in self._garbage:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
111 del statements[stmt]
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
112 return PostDeleter()
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
113
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
114 def pprintTable(self):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
115 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
116 print "%03d. %-80s from %s to %s" % (
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
117 i,
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
118 abbrevStmt(stmt),
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
119 [abbrevTerm(s) for s in sources],
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
120 handlers)
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
121
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
122 @STATS.makeSyncPatch.time()
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
123 def makeSyncPatch(self, handler, sources):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
124 # todo: this could run all handlers at once, which is how we use it anyway
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
125 adds = []
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
126 dels = []
1247
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
127
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
128 sources_set = set(sources)
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
129 with self._postDeleteStatements() as garbage:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
130 for stmt, (stmtSources, handlers) in self.statements.iteritems():
1247
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
131 belongsInHandler = not sources_set.isdisjoint(stmtSources)
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
132 handlerHasIt = handler in handlers
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
133 log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
134 if belongsInHandler and not handlerHasIt:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
135 adds.append(stmt)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
136 handlers.add(handler)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
137 elif not belongsInHandler and handlerHasIt:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
138 dels.append(stmt)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
139 handlers.remove(handler)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
140 if not handlers and not stmtSources:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
141 garbage.add(stmt)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
142
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
143 return Patch(addQuads=adds, delQuads=dels)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
144
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
145 def applySourcePatch(self, source, p):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
146 for stmt in p.addQuads:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
147 sourceUrls, handlers = self.statements[stmt]
1249
2ddfb2cf95ab rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp <drewp@bigasterisk.com>
parents: 1247
diff changeset
148 if source in sourceUrls:
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
149 raise ValueError("%s added stmt that it already had: %s" %
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
150 (source, abbrevStmt(stmt)))
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
151 sourceUrls.add(source)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
152
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
153 with self._postDeleteStatements() as garbage:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
154 for stmt in p.delQuads:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
155 sourceUrls, handlers = self.statements[stmt]
1249
2ddfb2cf95ab rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp <drewp@bigasterisk.com>
parents: 1247
diff changeset
156 if source not in sourceUrls:
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
157 raise ValueError("%s deleting stmt that it didn't have: %s" %
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
158 (source, abbrevStmt(stmt)))
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
159 sourceUrls.remove(source)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
160 # this is rare, since some handler probably still has
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
161 # the stmt we're deleting, but it can happen e.g. when
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
162 # a handler was just deleted
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
163 if not sourceUrls and not handlers:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
164 garbage.add(stmt)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
165
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
166 @STATS.replaceSourceStatements.time()
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
167 def replaceSourceStatements(self, source, stmts):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
168 log.debug('replaceSourceStatements with %s stmts', len(stmts))
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
169 newStmts = set(stmts)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
170
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
171 with self._postDeleteStatements() as garbage:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
172 for stmt, (sources, handlers) in self.statements.iteritems():
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
173 if source in sources:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
174 if stmt not in stmts:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
175 sources.remove(source)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
176 if not sources and not handlers:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
177 garbage.add(stmt)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
178 else:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
179 if stmt in stmts:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
180 sources.add(source)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
181 newStmts.discard(stmt)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
182
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
183 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
184
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
185 def discardHandler(self, handler):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
186 with self._postDeleteStatements() as garbage:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
187 for stmt, (sources, handlers) in self.statements.iteritems():
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
188 handlers.discard(handler)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
189 if not sources and not handlers:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
190 garbage.add(stmt)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
191
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
192 def discardSource(self, source):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
193 with self._postDeleteStatements() as garbage:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
194 for stmt, (sources, handlers) in self.statements.iteritems():
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
195 sources.discard(source)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
196 if not sources and not handlers:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
197 garbage.add(stmt)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
198
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
199 class GraphClients(object):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
200 """
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
201 All the active PatchSources and SSEHandlers
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
202
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
203 To handle all the overlapping-statement cases, we store a set of
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
204 true statements along with the sources that are currently
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
205 asserting them and the requesters who currently know them. As
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
206 statements come and go, we make patches to send to requesters.
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
207 """
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
208 def __init__(self):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
209 self.clients = {} # url: PatchSource (COLLECTOR is not listed)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
210 self.handlers = set() # handler
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
211 self.statements = ActiveStatements()
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
212
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
213 self._localStatements = LocalStatements(self._onPatch)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
214
1242
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
215 def state(self):
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
216 return {
1242
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
217 'clients': [ps.state() for ps in self.clients.values()],
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
218 'sseHandlers': [h.state() for h in self.handlers],
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
219 'statements': self.statements.state(),
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
220 }
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
221
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
222 def _sourcesForHandler(self, handler):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
223 streamId = handler.streamId
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
224 matches = [s for s in config['streams'] if s['id'] == streamId]
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
225 if len(matches) != 1:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
226 raise ValueError("%s matches for %r" % (len(matches), streamId))
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
227 return map(URIRef, matches[0]['sources']) + [COLLECTOR]
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
228
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
229 @STATS.onPatch.time()
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
230 def _onPatch(self, source, p, fullGraph=False):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
231 if fullGraph:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
232 # a reconnect may need to resend the full graph even
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
233 # though we've already sent some statements
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
234 self.statements.replaceSourceStatements(source, p.addQuads)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
235 else:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
236 self.statements.applySourcePatch(source, p)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
237
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
238 self._sendUpdatePatch()
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
239
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
240 if log.isEnabledFor(logging.DEBUG):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
241 self.statements.pprintTable()
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
242
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
243 if source != COLLECTOR:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
244 self._localStatements.setSourceState(
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
245 source,
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
246 ROOM['fullGraphReceived'] if fullGraph else
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
247 ROOM['patchesReceived'])
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
248
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
249 @STATS.sendUpdatePatch.time()
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
250 def _sendUpdatePatch(self, handler=None):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
251 """
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
252 send a patch event out this handler to bring it up to date with
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
253 self.statements
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
254 """
1247
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
255 now = time.time()
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
256 # reduce loops here- prepare all patches at once
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
257 for h in (self.handlers if handler is None else [handler]):
1247
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
258 period = 1
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
259 if 'Raspbian' in h.request.headers.get('user-agent'):
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
260 period = 5
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
261 if h.lastPatchSentTime > now - period:
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
262 continue
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
263 p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h))
1246
f18b95f81d43 more of the stats and logging patch for collector
drewp <drewp@bigasterisk.com>
parents: 1245
diff changeset
264 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr)
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
265 if not p.isNoop():
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
266 log.debug("send patch %s to %s", p.shortSummary(), h)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
267 # This can be a giant line, which was a problem once. Might be
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
268 # nice for this service to try to break it up into multiple sends,
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
269 # although there's no guarantee at all since any single stmt
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
270 # could be any length.
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
271 h.sendEvent(message=jsonFromPatch(p), event='patch')
1247
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
272 h.lastPatchSentTime = now
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
273 else:
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
274 log.debug('nothing to send to %s', h)
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
275
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
276 def addSseHandler(self, handler):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
277 log.info('addSseHandler %r %r', handler, handler.streamId)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
278
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
279 # fail early if id doesn't match
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
280 sources = self._sourcesForHandler(handler)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
281
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
282 self.handlers.add(handler)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
283
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
284 for source in sources:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
285 if source not in self.clients and source != COLLECTOR:
1245
4c123099e5b6 collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents: 1242
diff changeset
286 log.debug('connect to patch source %s', source)
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
287 self._localStatements.setSourceState(source, ROOM['connect'])
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
288 self.clients[source] = ReconnectingPatchSource(
1247
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
289 source,
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
290 listener=lambda p, fullGraph, source=source: self._onPatch(
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
291 source, p, fullGraph),
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
292 reconnectSecs=10)
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
293 log.debug('bring new client up to date')
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
294 handler.sendEvent(message='hello', event='greets')
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
295
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
296 self._sendUpdatePatch(handler)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
297
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
298 def removeSseHandler(self, handler):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
299 log.info('removeSseHandler %r', handler)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
300 self.statements.discardHandler(handler)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
301 for source in self._sourcesForHandler(handler):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
302 for otherHandler in self.handlers:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
303 if (otherHandler != handler and
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
304 source in self._sourcesForHandler(otherHandler)):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
305 break
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
306 else:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
307 self._stopClient(source)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
308
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
309 self.handlers.remove(handler)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
310
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
311 def _stopClient(self, url):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
312 if url == COLLECTOR:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
313 return
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
314
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
315 self.clients[url].stop()
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
316
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
317 self.statements.discardSource(url)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
318
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
319 self._localStatements.setSourceState(url, None)
1247
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
320 if url in self.clients:
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
321 del self.clients[url]
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
322
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
323
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
324 class SomeGraph(cyclone.sse.SSEHandler):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
325 _handlerSerial = 0
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
326 def __init__(self, application, request):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
327 cyclone.sse.SSEHandler.__init__(self, application, request)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
328 self.streamId = request.uri[len('/graph/'):]
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
329 self.graphClients = self.settings.graphClients
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
330 self.created = time.time()
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
331
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
332 self._serial = SomeGraph._handlerSerial
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
333 SomeGraph._handlerSerial += 1
1247
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
334 self.lastPatchSentTime = 0
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
335
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
336 def __repr__(self):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
337 return '<Handler #%s>' % self._serial
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
338
1242
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
339 def state(self):
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
340 return {
1242
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
341 'created': round(self.created, 2),
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
342 'ageHours': round((time.time() - self.created) / 3600, 2),
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
343 'streamId': self.streamId,
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
344 'remoteIp': self.request.remote_ip,
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
345 'userAgent': self.request.headers.get('user-agent'),
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
346 }
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
347
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
348 def bind(self):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
349 self.graphClients.addSseHandler(self)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
350
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
351 def unbind(self):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
352 self.graphClients.removeSseHandler(self)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
353
1242
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
354 class State(cyclone.web.RequestHandler):
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
355 @STATS.getState.time()
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
356 def get(self):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
357 try:
1242
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
358 state = self.settings.graphClients.state()
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
359 except:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
360 import traceback; traceback.print_exc()
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
361 raise
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
362
1242
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
363 self.write(json.dumps({'graphClients': state}, indent=2))
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
364
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
365 class Root(cyclone.web.RequestHandler):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
366 def get(self):
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
367 self.write('<html><body>sse_collector</body></html>')
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
368
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
369 if __name__ == '__main__':
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
370 arg = docopt("""
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
371 Usage: sse_collector.py [options]
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
372
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
373 -v Verbose
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
374 """)
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
375
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
376 if arg['-v']:
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
377 import twisted.python.log
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
378 twisted.python.log.startLogging(sys.stdout)
1245
4c123099e5b6 collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents: 1242
diff changeset
379 log.setLevel(logging.DEBUG)
4c123099e5b6 collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents: 1242
diff changeset
380 defer.setDebugging(True)
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
381
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
382
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
383 graphClients = GraphClients()
1245
4c123099e5b6 collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents: 1242
diff changeset
384 #exporter = InfluxExporter(... to export some stats values
4c123099e5b6 collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents: 1242
diff changeset
385
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
386 reactor.listenTCP(
1247
723808d0a92c WIP collector; not sure why it never sends out any patches
drewp <drewp@bigasterisk.com>
parents: 1246
diff changeset
387 19072,
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
388 cyclone.web.Application(
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
389 handlers=[
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
390 (r'/', Root),
1242
24c004aac998 stats->state to make room for greplin stats
drewp <drewp@bigasterisk.com>
parents: 1158
diff changeset
391 (r'/state', State),
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
392 (r'/graph/(.*)', SomeGraph),
1245
4c123099e5b6 collector build improvements; stats and logging
drewp <drewp@bigasterisk.com>
parents: 1242
diff changeset
393 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}),
1156
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
394 ],
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
395 graphClients=graphClients),
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
396 interface='::')
ee168d55524a reasoning & collector move into docker images
drewp <drewp@bigasterisk.com>
parents:
diff changeset
397 reactor.run()