Mercurial > code > home > repos > homeauto
annotate service/reasoning/sse_collector.py @ 1108:8caf62030955
reasoning uses sse_collector
Ignore-this: 5b3787bd354b9bc82968c76ba262b725
darcs-hash:5ab617abe4bdfcdc8ad94e4272beb2cf548bb896
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Mon, 29 Aug 2016 00:27:46 -0700 |
parents | e68f6e5712c6 |
children | 6aad04b34231 |
rev | line source |
---|---|
1101 | 1 """ |
2 requesting /graph/foo returns an SSE patch stream that's the | |
3 result of fetching multiple other SSE patch streams. The result stream | |
4 may include new statements injected by this service. | |
5 | |
6 Future: | |
7 - filter out unneeded stmts from the sources | |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
8 - give a time resolution and concatenate any patches that come faster than that res |
1101 | 9 """ |
10 | |
11 config = { | |
12 'streams': [ | |
13 {'id': 'home', | |
14 'sources': [ | |
1108
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
15 # should be from :reasoning :source ?s |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
16 'http://garage:9059/graph/events', # "garage pi" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
17 'http://kitchen:9059/graph/events', # "kitchen pi" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
18 'http://living:9059/graph/events', # "living room pi" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
19 'http://slash:9059/graph/events', # "slash arduino" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
20 'http://bed:9059/graph/events', # "bed pi" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
21 'http://brace6:9059/graph/events', # "brace arduino" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
22 'http://changing:9059/graph/events', # "changing pi" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
23 'http://bang:9075/graph/events', # "env" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
24 'http://bang:9070/graph/events', # "wifi usage" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
25 'http://bang:9099/graph/events', # "trails" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
26 'http://dash:9095/graph/events', # "dash monitor" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
27 'http://dash:9107/graph/events', # "dash x idle" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
28 'http://brace6:9095/graph/events', # "brace monitor" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
29 'http://brace6:9107/graph/events', # "brace x idle" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
30 'http://slash:9095/graph/events', # "slash monitor" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
31 'http://slash:9107/graph/events', # "slash x idle" |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
32 |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
33 |
8caf62030955
reasoning uses sse_collector
drewp <drewp@bigasterisk.com>
parents:
1107
diff
changeset
|
34 |
1101 | 35 ] |
36 }, | |
37 ] | |
38 } | |
39 | |
40 from crochet import no_setup | |
41 no_setup() | |
42 | |
1107 | 43 import sys, logging, collections |
44 from twisted.internet import reactor | |
1101 | 45 import cyclone.web, cyclone.sse |
1107 | 46 from rdflib import URIRef, Namespace |
1101 | 47 from docopt import docopt |
48 | |
49 | |
50 sys.path.append("../../lib") | |
51 from logsetup import log | |
1107 | 52 from patchablegraph import jsonFromPatch |
1101 | 53 |
54 sys.path.append("/my/proj/light9") | |
55 from light9.rdfdb.patch import Patch | |
56 | |
1107 | 57 from patchsource import ReconnectingPatchSource |
58 | |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
59 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
60 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
61 |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
62 class LocalStatements(object): |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
63 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
64 functions that make statements originating from sse_collector itself |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
65 """ |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
66 def __init__(self, applyPatch): |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
67 self.applyPatch = applyPatch |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
68 self._sourceState = {} # source: state URIRef |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
69 |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
70 def setSourceState(self, source, state): |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
71 """ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
72 add a patch to the COLLECTOR graph about the state of this |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
73 source. state=None to remove the source. |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
74 """ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
75 oldState = self._sourceState.get(source, None) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
76 if state == oldState: |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
77 return |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
78 log.info('source state %s -> %s', source, state) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
79 if oldState is None: |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
80 self._sourceState[source] = state |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
81 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
82 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
83 (source, ROOM['state'], state, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
84 ])) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
85 elif state is None: |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
86 del self._sourceState[source] |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
87 self.applyPatch(COLLECTOR, Patch(delQuads=[ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
88 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
89 (source, ROOM['state'], oldState, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
90 ])) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
91 else: |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
92 self._sourceState[source] = state |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
93 self.applyPatch(COLLECTOR, Patch( |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
94 addQuads=[ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
95 (source, ROOM['state'], state, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
96 ], |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
97 delQuads=[ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
98 (source, ROOM['state'], oldState, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
99 ])) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
100 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
101 def abbrevTerm(t): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
102 if isinstance(t, URIRef): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
103 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
104 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
105 return t |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
106 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
107 def abbrevStmt(stmt): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
108 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
109 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
110 class ActiveStatements(object): |
1101 | 111 def __init__(self): |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
112 |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
113 # This table holds statements asserted by any of our sources |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
114 # plus local statements that we introduce (source is |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
115 # http://bigasterisk.com/sse_collector/). |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
116 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
117 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
118 def _postDeleteStatements(self): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
119 statements = self.statements |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
120 class PostDeleter(object): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
121 def __enter__(self): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
122 self._garbage = [] |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
123 return self |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
124 def add(self, stmt): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
125 self._garbage.append(stmt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
126 def __exit__(self, type, value, traceback): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
127 if type is not None: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
128 raise |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
129 for stmt in self._garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
130 del statements[stmt] |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
131 return PostDeleter() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
132 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
133 def pprintTable(self): |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
134 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
135 print "%03d. %-80s from %s to %s" % ( |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
136 i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
137 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
138 def makeSyncPatch(self, handler, sources): |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
139 # todo: this could run all handlers at once, which is how we use it anyway |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
140 adds = [] |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
141 dels = [] |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
142 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
143 with self._postDeleteStatements() as garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
144 for stmt, (stmtSources, handlers) in self.statements.iteritems(): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
145 belongsInHandler = not set(sources).isdisjoint(stmtSources) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
146 handlerHasIt = handler in handlers |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
147 #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
148 if belongsInHandler and not handlerHasIt: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
149 adds.append(stmt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
150 handlers.add(handler) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
151 elif not belongsInHandler and handlerHasIt: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
152 dels.append(stmt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
153 handlers.remove(handler) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
154 if not handlers and not stmtSources: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
155 garbage.add(stmt) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
156 |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
157 return Patch(addQuads=adds, delQuads=dels) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
158 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
159 def applySourcePatch(self, source, p): |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
160 for stmt in p.addQuads: |
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
161 sourceUrls, handlers = self.statements[stmt] |
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
162 if source in sourceUrls: |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
163 raise ValueError("%s added stmt that it already had: %s" % |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
164 (source, abbrevStmt(stmt))) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
165 sourceUrls.add(source) |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
166 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
167 with self._postDeleteStatements() as garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
168 for stmt in p.delQuads: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
169 sourceUrls, handlers = self.statements[stmt] |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
170 if source not in sourceUrls: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
171 raise ValueError("%s deleting stmt that it didn't have: %s" % |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
172 (source, abbrevStmt(stmt))) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
173 sourceUrls.remove(source) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
174 # this is rare, since some handler probably still has |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
175 # the stmt we're deleting, but it can happen e.g. when |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
176 # a handler was just deleted |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
177 if not sourceUrls and not handlers: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
178 garbage.add(stmt) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
179 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
180 def replaceSourceStatements(self, source, stmts): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
181 log.debug('replaceSourceStatements with %s stmts', len(stmts)) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
182 newStmts = set(stmts) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
183 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
184 with self._postDeleteStatements() as garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
185 for stmt, (sources, handlers) in self.statements.iteritems(): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
186 if source in sources: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
187 if stmt not in stmts: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
188 sources.remove(source) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
189 if not sources and not handlers: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
190 garbage.add(stmt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
191 else: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
192 if stmt in stmts: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
193 sources.add(source) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
194 newStmts.discard(stmt) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
195 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
196 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
197 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
198 def discardHandler(self, handler): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
199 with self._postDeleteStatements() as garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
200 for stmt, (sources, handlers) in self.statements.iteritems(): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
201 handlers.discard(handler) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
202 if not sources and not handlers: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
203 garbage.add(stmt) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
204 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
205 def discardSource(self, source): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
206 with self._postDeleteStatements() as garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
207 for stmt, (sources, handlers) in self.statements.iteritems(): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
208 sources.discard(source) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
209 if not sources and not handlers: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
210 garbage.add(stmt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
211 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
212 class GraphClients(object): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
213 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
214 All the active PatchSources and SSEHandlers |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
215 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
216 To handle all the overlapping-statement cases, we store a set of |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
217 true statements along with the sources that are currently |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
218 asserting them and the requesters who currently know them. As |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
219 statements come and go, we make patches to send to requesters. |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
220 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
221 def __init__(self): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
222 self.clients = {} # url: PatchSource (COLLECTOR is not listed) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
223 self.handlers = set() # handler |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
224 self.statements = ActiveStatements() |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
225 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
226 self._localStatements = LocalStatements(self._onPatch) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
227 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
228 def _sourcesForHandler(self, handler): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
229 streamId = handler.streamId |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
230 matches = [s for s in config['streams'] if s['id'] == streamId] |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
231 if len(matches) != 1: |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
232 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
233 return map(URIRef, matches[0]['sources']) + [COLLECTOR] |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
234 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
235 def _onPatch(self, source, p, fullGraph=False): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
236 if fullGraph: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
237 # a reconnect may need to resend the full graph even |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
238 # though we've already sent some statements |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
239 self.statements.replaceSourceStatements(source, p.addQuads) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
240 else: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
241 self.statements.applySourcePatch(source, p) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
242 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
243 self._sendUpdatePatch() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
244 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
245 if log.isEnabledFor(logging.DEBUG): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
246 self.statements.pprintTable() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
247 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
248 if source != COLLECTOR: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
249 self._localStatements.setSourceState( |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
250 source, |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
251 ROOM['fullGraphReceived'] if fullGraph else |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
252 ROOM['patchesReceived']) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
253 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
254 def _sendUpdatePatch(self, handler=None): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
255 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
256 send a patch event out this handler to bring it up to date with |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
257 self.statements |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
258 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
259 # reduce loops here- prepare all patches at once |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
260 for h in (self.handlers if handler is None else [handler]): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
261 p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h)) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
262 if not p.isNoop(): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
263 log.debug("send patch %s to %s", p.shortSummary(), h) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
264 h.sendEvent(message=jsonFromPatch(p), event='patch') |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
265 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
266 def addSseHandler(self, handler): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
267 log.info('addSseHandler %r %r', handler, handler.streamId) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
268 self.handlers.add(handler) |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
269 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
270 for source in self._sourcesForHandler(handler): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
271 if source not in self.clients and source != COLLECTOR: |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
272 self._localStatements.setSourceState(source, ROOM['connect']) |
1107 | 273 self.clients[source] = ReconnectingPatchSource( |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
274 source, listener=lambda p, fullGraph, source=source: self._onPatch( |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
275 source, p, fullGraph)) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
276 self._sendUpdatePatch(handler) |
1101 | 277 |
278 def removeSseHandler(self, handler): | |
279 log.info('removeSseHandler %r', handler) | |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
280 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
281 self.statements.discardHandler(handler) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
282 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
283 for source in self._sourcesForHandler(handler): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
284 for otherHandler in self.handlers: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
285 if (otherHandler != handler and |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
286 source in self._sourcesForHandler(otherHandler)): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
287 break |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
288 else: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
289 self._stopClient(source) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
290 |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
291 self.handlers.remove(handler) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
292 |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
293 def _stopClient(self, url): |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
294 if url == COLLECTOR: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
295 return |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
296 |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
297 self.clients[url].stop() |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
298 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
299 self.statements.discardSource(url) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
300 |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
301 self._localStatements.setSourceState(url, None) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
302 del self.clients[url] |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
303 |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
304 |
1101 | 305 class SomeGraph(cyclone.sse.SSEHandler): |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
306 _handlerSerial = 0 |
1101 | 307 def __init__(self, application, request): |
308 cyclone.sse.SSEHandler.__init__(self, application, request) | |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
309 self.streamId = request.uri[len('/graph/'):] |
1101 | 310 self.graphClients = self.settings.graphClients |
311 | |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
312 self._serial = SomeGraph._handlerSerial |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
313 SomeGraph._handlerSerial += 1 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
314 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
315 def __repr__(self): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
316 return '<Handler #%s>' % self._serial |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
317 |
1101 | 318 def bind(self): |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
319 self.graphClients.addSseHandler(self) |
1101 | 320 |
321 def unbind(self): | |
322 self.graphClients.removeSseHandler(self) | |
323 | |
324 if __name__ == '__main__': | |
325 | |
326 arg = docopt(""" | |
327 Usage: sse_collector.py [options] | |
328 | |
329 -v Verbose | |
330 """) | |
331 | |
332 if arg['-v']: | |
333 import twisted.python.log | |
334 twisted.python.log.startLogging(sys.stdout) | |
335 log.setLevel(logging.DEBUG) | |
336 | |
337 | |
338 graphClients = GraphClients() | |
339 | |
340 reactor.listenTCP( | |
1104 | 341 9072, |
1101 | 342 cyclone.web.Application( |
343 handlers=[ | |
344 (r'/graph/(.*)', SomeGraph), | |
345 ], | |
346 graphClients=graphClients), | |
347 interface='::') | |
348 reactor.run() |