Mercurial > code > home > repos > homeauto
annotate service/reasoning/sse_collector.py @ 1106:fe53ca09febc
big rewrites in sse_collector
Ignore-this: 3b6278a0cfc57aa686ed39d411fdc35f
darcs-hash:d25124b5e0d3c4729ea55530cd3b3064f2af68a7
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sun, 28 Aug 2016 18:11:34 -0700 |
parents | c8233f4b59cb |
children | 46c5fae89823 |
rev | line source |
---|---|
1101 | 1 """ |
2 requesting /graph/foo returns an SSE patch stream that's the | |
3 result of fetching multiple other SSE patch streams. The result stream | |
4 may include new statements injected by this service. | |
5 | |
6 Future: | |
7 - filter out unneeded stmts from the sources | |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
8 - give a time resolution and concatenate any patches that come faster than that res |
1101 | 9 """ |
10 | |
11 config = { | |
12 'streams': [ | |
13 {'id': 'home', | |
14 'sources': [ | |
15 #'http://bang:9059/graph/events', | |
16 'http://plus:9075/graph/events', | |
17 ] | |
18 }, | |
19 ] | |
20 } | |
21 | |
22 from crochet import no_setup | |
23 no_setup() | |
24 | |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
25 import sys, logging, traceback, json, collections |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
26 from twisted.internet import reactor, defer |
1101 | 27 import cyclone.web, cyclone.sse |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
28 from rdflib import ConjunctiveGraph, URIRef, Namespace |
1101 | 29 from rdflib.parser import StringInputSource |
30 from docopt import docopt | |
31 | |
32 from twisted_sse_demo.eventsource import EventSource | |
33 | |
34 sys.path.append("../../lib") | |
35 from logsetup import log | |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
36 from patchablegraph import jsonFromPatch, PatchableGraph, patchFromJson |
1101 | 37 |
38 sys.path.append("/my/proj/light9") | |
39 from light9.rdfdb.patch import Patch | |
40 | |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
41 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
42 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
43 |
1101 | 44 class PatchSource(object): |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
45 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" |
1101 | 46 def __init__(self, url): |
47 self.url = url | |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
48 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
49 # add callbacks to these to learn if we failed to connect |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
50 # (approximately) or if the ccnnection was unexpectedly lost |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
51 self.connectionFailed = defer.Deferred() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
52 self.connectionLost = defer.Deferred() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
53 |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
54 self._listeners = set() |
1101 | 55 log.info('start read from %s', url) |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
56 self._fullGraphReceived = False |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
57 self._eventSource = EventSource(url.toPython().encode('utf8')) |
1101 | 58 self._eventSource.protocol.delimiter = '\n' |
59 | |
60 self._eventSource.addEventListener('fullGraph', self._onFullGraph) | |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
61 self._eventSource.addEventListener('patch', self._onPatch) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
62 self._eventSource.onerror(self._onError) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
63 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
64 origSet = self._eventSource.protocol.setFinishedDeferred |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
65 def sfd(d): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
66 origSet(d) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
67 d.addCallback(self._onDisconnect) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
68 self._eventSource.protocol.setFinishedDeferred = sfd |
1101 | 69 |
70 def addPatchListener(self, func): | |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
71 """ |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
72 func(patch, fullGraph=[true if the patch is the initial fullgraph]) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
73 """ |
1101 | 74 self._listeners.add(func) |
75 | |
76 def stop(self): | |
77 log.info('stop read from %s', self.url) | |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
78 try: |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
79 self._eventSource.protocol.stopProducing() # needed? |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
80 except AttributeError: |
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
81 pass |
1101 | 82 self._eventSource = None |
83 | |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
84 def _onDisconnect(self, a): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
85 log.debug('PatchSource._onDisconnect from %s', self.url) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
86 # skip this if we're doing a stop? |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
87 self.connectionLost.callback(None) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
88 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
89 def _onError(self, msg): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
90 log.debug('PatchSource._onError from %s %r', self.url, msg) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
91 if not self._fullGraphReceived: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
92 self.connectionFailed.callback(msg) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
93 else: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
94 self.connectionLost.callback(msg) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
95 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
96 def _onFullGraph(self, message): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
97 try: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
98 g = ConjunctiveGraph() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
99 g.parse(StringInputSource(message), format='json-ld') |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
100 p = Patch(addGraph=g) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
101 self._sendPatch(p, fullGraph=True) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
102 except: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
103 log.error(traceback.format_exc()) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
104 raise |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
105 self._fullGraphReceived = True |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
106 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
107 def _onPatch(self, message): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
108 try: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
109 p = patchFromJson(message) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
110 self._sendPatch(p, fullGraph=False) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
111 except: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
112 log.error(traceback.format_exc()) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
113 raise |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
114 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
115 def _sendPatch(self, p, fullGraph): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
116 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
117 for lis in self._listeners: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
118 lis(p, fullGraph=fullGraph) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
119 |
1101 | 120 def __del__(self): |
121 if self._eventSource: | |
122 raise ValueError | |
123 | |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
124 class ReconnectingPatchSource(object): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
125 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
126 PatchSource api, but auto-reconnects internally and takes listener |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
127 at init time to not miss any patches. You'll get another |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
128 fullGraph=True patch if we have to reconnect. |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
129 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
130 todo: generate connection stmts in here |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
131 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
132 def __init__(self, url, listener): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
133 self.url = url |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
134 self._stopped = False |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
135 self._listener = listener |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
136 self._reconnect() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
137 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
138 def _reconnect(self): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
139 if self._stopped: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
140 return |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
141 self._ps = PatchSource(self.url) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
142 self._ps.addPatchListener(self._onPatch) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
143 self._ps.connectionFailed.addCallback(self._onConnectionFailed) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
144 self._ps.connectionLost.addCallback(self._onConnectionLost) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
145 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
146 def _onPatch(self, p, fullGraph): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
147 self._listener(p, fullGraph=fullGraph) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
148 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
149 def stop(self): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
150 self._stopped = True |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
151 self._ps.stop() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
152 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
153 def _onConnectionFailed(self, arg): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
154 reactor.callLater(1, self._reconnect) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
155 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
156 def _onConnectionLost(self, arg): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
157 reactor.callLater(1, self._reconnect) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
158 |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
159 class LocalStatements(object): |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
160 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
161 functions that make statements originating from sse_collector itself |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
162 """ |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
163 def __init__(self, applyPatch): |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
164 self.applyPatch = applyPatch |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
165 self._sourceState = {} # source: state URIRef |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
166 |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
167 def setSourceState(self, source, state): |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
168 """ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
169 add a patch to the COLLECTOR graph about the state of this |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
170 source. state=None to remove the source. |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
171 """ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
172 oldState = self._sourceState.get(source, None) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
173 if state == oldState: |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
174 return |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
175 log.info('source state %s -> %s', source, state) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
176 if oldState is None: |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
177 self._sourceState[source] = state |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
178 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
179 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
180 (source, ROOM['state'], state, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
181 ])) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
182 elif state is None: |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
183 del self._sourceState[source] |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
184 self.applyPatch(COLLECTOR, Patch(delQuads=[ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
185 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
186 (source, ROOM['state'], oldState, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
187 ])) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
188 else: |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
189 self._sourceState[source] = state |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
190 self.applyPatch(COLLECTOR, Patch( |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
191 addQuads=[ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
192 (source, ROOM['state'], state, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
193 ], |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
194 delQuads=[ |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
195 (source, ROOM['state'], oldState, COLLECTOR), |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
196 ])) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
197 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
198 def abbrevTerm(t): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
199 if isinstance(t, URIRef): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
200 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
201 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
202 return t |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
203 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
204 def abbrevStmt(stmt): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
205 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
206 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
207 class ActiveStatements(object): |
1101 | 208 def __init__(self): |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
209 |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
210 # This table holds statements asserted by any of our sources |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
211 # plus local statements that we introduce (source is |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
212 # http://bigasterisk.com/sse_collector/). |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
213 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
214 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
215 def _postDeleteStatements(self): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
216 statements = self.statements |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
217 class PostDeleter(object): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
218 def __enter__(self): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
219 self._garbage = [] |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
220 return self |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
221 def add(self, stmt): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
222 self._garbage.append(stmt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
223 def __exit__(self, type, value, traceback): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
224 if type is not None: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
225 raise |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
226 for stmt in self._garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
227 del statements[stmt] |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
228 return PostDeleter() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
229 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
230 def pprintTable(self): |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
231 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
232 print "%03d. %-80s from %s to %s" % ( |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
233 i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
234 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
235 def makeSyncPatch(self, handler, sources): |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
236 # todo: this could run all handlers at once, which is how we use it anyway |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
237 adds = [] |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
238 dels = [] |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
239 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
240 with self._postDeleteStatements() as garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
241 for stmt, (stmtSources, handlers) in self.statements.iteritems(): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
242 belongsInHandler = not set(sources).isdisjoint(stmtSources) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
243 handlerHasIt = handler in handlers |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
244 #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
245 if belongsInHandler and not handlerHasIt: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
246 adds.append(stmt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
247 handlers.add(handler) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
248 elif not belongsInHandler and handlerHasIt: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
249 dels.append(stmt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
250 handlers.remove(handler) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
251 if not handlers and not stmtSources: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
252 garbage.add(stmt) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
253 |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
254 return Patch(addQuads=adds, delQuads=dels) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
255 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
256 def applySourcePatch(self, source, p): |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
257 for stmt in p.addQuads: |
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
258 sourceUrls, handlers = self.statements[stmt] |
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
259 if source in sourceUrls: |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
260 raise ValueError("%s added stmt that it already had: %s" % |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
261 (source, abbrevStmt(stmt))) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
262 sourceUrls.add(source) |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
263 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
264 with self._postDeleteStatements() as garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
265 for stmt in p.delQuads: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
266 sourceUrls, handlers = self.statements[stmt] |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
267 if source not in sourceUrls: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
268 raise ValueError("%s deleting stmt that it didn't have: %s" % |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
269 (source, abbrevStmt(stmt))) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
270 sourceUrls.remove(source) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
271 # this is rare, since some handler probably still has |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
272 # the stmt we're deleting, but it can happen e.g. when |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
273 # a handler was just deleted |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
274 if not sourceUrls and not handlers: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
275 garbage.add(stmt) |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
276 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
277 def replaceSourceStatements(self, source, stmts): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
278 log.debug('replaceSourceStatements with %s stmts', len(stmts)) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
279 newStmts = set(stmts) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
280 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
281 with self._postDeleteStatements() as garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
282 for stmt, (sources, handlers) in self.statements.iteritems(): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
283 if source in sources: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
284 if stmt not in stmts: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
285 sources.remove(source) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
286 if not sources and not handlers: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
287 garbage.add(stmt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
288 else: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
289 if stmt in stmts: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
290 sources.add(source) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
291 newStmts.discard(stmt) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
292 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
293 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
294 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
295 def discardHandler(self, handler): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
296 with self._postDeleteStatements() as garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
297 for stmt, (sources, handlers) in self.statements.iteritems(): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
298 handlers.discard(handler) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
299 if not sources and not handlers: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
300 garbage.add(stmt) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
301 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
302 def discardSource(self, source): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
303 with self._postDeleteStatements() as garbage: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
304 for stmt, (sources, handlers) in self.statements.iteritems(): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
305 sources.discard(source) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
306 if not sources and not handlers: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
307 garbage.add(stmt) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
308 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
309 class GraphClients(object): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
310 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
311 All the active PatchSources and SSEHandlers |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
312 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
313 To handle all the overlapping-statement cases, we store a set of |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
314 true statements along with the sources that are currently |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
315 asserting them and the requesters who currently know them. As |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
316 statements come and go, we make patches to send to requesters. |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
317 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
318 def __init__(self): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
319 self.clients = {} # url: PatchSource (COLLECTOR is not listed) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
320 self.handlers = set() # handler |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
321 self.statements = ActiveStatements() |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
322 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
323 self._localStatements = LocalStatements(self._onPatch) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
324 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
325 def _sourcesForHandler(self, handler): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
326 streamId = handler.streamId |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
327 matches = [s for s in config['streams'] if s['id'] == streamId] |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
328 if len(matches) != 1: |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
329 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
330 return map(URIRef, matches[0]['sources']) + [COLLECTOR] |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
331 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
332 def _onPatch(self, source, p, fullGraph=False): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
333 if fullGraph: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
334 # a reconnect may need to resend the full graph even |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
335 # though we've already sent some statements |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
336 self.statements.replaceSourceStatements(source, p.addQuads) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
337 else: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
338 self.statements.applySourcePatch(source, p) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
339 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
340 self._sendUpdatePatch() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
341 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
342 if log.isEnabledFor(logging.DEBUG): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
343 self.statements.pprintTable() |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
344 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
345 if source != COLLECTOR: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
346 self._localStatements.setSourceState( |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
347 source, |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
348 ROOM['fullGraphReceived'] if fullGraph else |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
349 ROOM['patchesReceived']) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
350 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
351 def _sendUpdatePatch(self, handler=None): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
352 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
353 send a patch event out this handler to bring it up to date with |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
354 self.statements |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
355 """ |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
356 # reduce loops here- prepare all patches at once |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
357 for h in (self.handlers if handler is None else [handler]): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
358 p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h)) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
359 if not p.isNoop(): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
360 log.debug("send patch %s to %s", p.shortSummary(), h) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
361 h.sendEvent(message=jsonFromPatch(p), event='patch') |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
362 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
363 def addSseHandler(self, handler): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
364 log.info('addSseHandler %r %r', handler, handler.streamId) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
365 self.handlers.add(handler) |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
366 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
367 for source in self._sourcesForHandler(handler): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
368 if source not in self.clients and source != COLLECTOR: |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
369 self._localStatements.setSourceState(source, ROOM['connect']) |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
370 ps = self.clients[source] = ReconnectingPatchSource( |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
371 source, listener=lambda p, fullGraph, source=source: self._onPatch( |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
372 source, p, fullGraph)) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
373 self._sendUpdatePatch(handler) |
1101 | 374 |
375 def removeSseHandler(self, handler): | |
376 log.info('removeSseHandler %r', handler) | |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
377 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
378 self.statements.discardHandler(handler) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
379 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
380 for source in self._sourcesForHandler(handler): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
381 for otherHandler in self.handlers: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
382 if (otherHandler != handler and |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
383 source in self._sourcesForHandler(otherHandler)): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
384 break |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
385 else: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
386 self._stopClient(source) |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
387 |
1103
b84e956771fc
sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents:
1101
diff
changeset
|
388 self.handlers.remove(handler) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
389 |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
390 def _stopClient(self, url): |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
391 if url == COLLECTOR: |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
392 return |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
393 |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
394 self.clients[url].stop() |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
395 |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
396 self.statements.discardSource(url) |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
397 |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
398 self._localStatements.setSourceState(url, None) |
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
399 del self.clients[url] |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
400 |
1105
c8233f4b59cb
local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents:
1104
diff
changeset
|
401 |
1101 | 402 class SomeGraph(cyclone.sse.SSEHandler): |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
403 _handlerSerial = 0 |
1101 | 404 def __init__(self, application, request): |
405 cyclone.sse.SSEHandler.__init__(self, application, request) | |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
406 self.streamId = request.uri[len('/graph/'):] |
1101 | 407 self.graphClients = self.settings.graphClients |
408 | |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
409 self._serial = SomeGraph._handlerSerial |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
410 SomeGraph._handlerSerial += 1 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
411 |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
412 def __repr__(self): |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
413 return '<Handler #%s>' % self._serial |
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
414 |
1101 | 415 def bind(self): |
1106
fe53ca09febc
big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents:
1105
diff
changeset
|
416 self.graphClients.addSseHandler(self) |
1101 | 417 |
418 def unbind(self): | |
419 self.graphClients.removeSseHandler(self) | |
420 | |
421 if __name__ == '__main__': | |
422 | |
423 arg = docopt(""" | |
424 Usage: sse_collector.py [options] | |
425 | |
426 -v Verbose | |
427 """) | |
428 | |
429 if arg['-v']: | |
430 import twisted.python.log | |
431 twisted.python.log.startLogging(sys.stdout) | |
432 log.setLevel(logging.DEBUG) | |
433 | |
434 | |
435 graphClients = GraphClients() | |
436 | |
437 reactor.listenTCP( | |
1104 | 438 9072, |
1101 | 439 cyclone.web.Application( |
440 handlers=[ | |
441 (r'/graph/(.*)', SomeGraph), | |
442 ], | |
443 graphClients=graphClients), | |
444 interface='::') | |
445 reactor.run() |