annotate service/reasoning/sse_collector.py @ 1106:fe53ca09febc

big rewrites in sse_collector Ignore-this: 3b6278a0cfc57aa686ed39d411fdc35f darcs-hash:d25124b5e0d3c4729ea55530cd3b3064f2af68a7
author drewp <drewp@bigasterisk.com>
date Sun, 28 Aug 2016 18:11:34 -0700
parents c8233f4b59cb
children 46c5fae89823
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
1 """
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
2 requesting /graph/foo returns an SSE patch stream that's the
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
3 result of fetching multiple other SSE patch streams. The result stream
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
4 may include new statements injected by this service.
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
5
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
6 Future:
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
7 - filter out unneeded stmts from the sources
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
8 - give a time resolution and concatenate any patches that come faster than that res
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
9 """
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
10
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
11 config = {
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
12 'streams': [
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
13 {'id': 'home',
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
14 'sources': [
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
15 #'http://bang:9059/graph/events',
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
16 'http://plus:9075/graph/events',
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
17 ]
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
18 },
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
19 ]
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
20 }
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
21
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
22 from crochet import no_setup
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
23 no_setup()
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
24
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
25 import sys, logging, traceback, json, collections
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
26 from twisted.internet import reactor, defer
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
27 import cyclone.web, cyclone.sse
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
28 from rdflib import ConjunctiveGraph, URIRef, Namespace
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
29 from rdflib.parser import StringInputSource
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
30 from docopt import docopt
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
31
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
32 from twisted_sse_demo.eventsource import EventSource
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
33
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
34 sys.path.append("../../lib")
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
35 from logsetup import log
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
36 from patchablegraph import jsonFromPatch, PatchableGraph, patchFromJson
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
37
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
38 sys.path.append("/my/proj/light9")
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
39 from light9.rdfdb.patch import Patch
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
40
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
41 ROOM = Namespace("http://projects.bigasterisk.com/room/")
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
42 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
43
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
44 class PatchSource(object):
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
45 """wrap EventSource so it emits Patch objects and has an explicit stop method."""
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
46 def __init__(self, url):
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
47 self.url = url
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
48
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
49 # add callbacks to these to learn if we failed to connect
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
50 # (approximately) or if the ccnnection was unexpectedly lost
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
51 self.connectionFailed = defer.Deferred()
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
52 self.connectionLost = defer.Deferred()
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
53
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
54 self._listeners = set()
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
55 log.info('start read from %s', url)
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
56 self._fullGraphReceived = False
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
57 self._eventSource = EventSource(url.toPython().encode('utf8'))
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
58 self._eventSource.protocol.delimiter = '\n'
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
59
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
60 self._eventSource.addEventListener('fullGraph', self._onFullGraph)
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
61 self._eventSource.addEventListener('patch', self._onPatch)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
62 self._eventSource.onerror(self._onError)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
63
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
64 origSet = self._eventSource.protocol.setFinishedDeferred
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
65 def sfd(d):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
66 origSet(d)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
67 d.addCallback(self._onDisconnect)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
68 self._eventSource.protocol.setFinishedDeferred = sfd
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
69
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
70 def addPatchListener(self, func):
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
71 """
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
72 func(patch, fullGraph=[true if the patch is the initial fullgraph])
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
73 """
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
74 self._listeners.add(func)
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
75
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
76 def stop(self):
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
77 log.info('stop read from %s', self.url)
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
78 try:
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
79 self._eventSource.protocol.stopProducing() # needed?
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
80 except AttributeError:
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
81 pass
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
82 self._eventSource = None
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
83
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
84 def _onDisconnect(self, a):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
85 log.debug('PatchSource._onDisconnect from %s', self.url)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
86 # skip this if we're doing a stop?
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
87 self.connectionLost.callback(None)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
88
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
89 def _onError(self, msg):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
90 log.debug('PatchSource._onError from %s %r', self.url, msg)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
91 if not self._fullGraphReceived:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
92 self.connectionFailed.callback(msg)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
93 else:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
94 self.connectionLost.callback(msg)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
95
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
96 def _onFullGraph(self, message):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
97 try:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
98 g = ConjunctiveGraph()
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
99 g.parse(StringInputSource(message), format='json-ld')
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
100 p = Patch(addGraph=g)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
101 self._sendPatch(p, fullGraph=True)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
102 except:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
103 log.error(traceback.format_exc())
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
104 raise
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
105 self._fullGraphReceived = True
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
107 def _onPatch(self, message):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
108 try:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
109 p = patchFromJson(message)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
110 self._sendPatch(p, fullGraph=False)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
111 except:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
112 log.error(traceback.format_exc())
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
113 raise
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
114
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
115 def _sendPatch(self, p, fullGraph):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
116 log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
117 for lis in self._listeners:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
118 lis(p, fullGraph=fullGraph)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
119
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
120 def __del__(self):
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
121 if self._eventSource:
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
122 raise ValueError
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
123
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
124 class ReconnectingPatchSource(object):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
125 """
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
126 PatchSource api, but auto-reconnects internally and takes listener
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
127 at init time to not miss any patches. You'll get another
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
128 fullGraph=True patch if we have to reconnect.
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
129
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
130 todo: generate connection stmts in here
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
131 """
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
132 def __init__(self, url, listener):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
133 self.url = url
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
134 self._stopped = False
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
135 self._listener = listener
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
136 self._reconnect()
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
137
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
138 def _reconnect(self):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
139 if self._stopped:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
140 return
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
141 self._ps = PatchSource(self.url)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
142 self._ps.addPatchListener(self._onPatch)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
143 self._ps.connectionFailed.addCallback(self._onConnectionFailed)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
144 self._ps.connectionLost.addCallback(self._onConnectionLost)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
145
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
146 def _onPatch(self, p, fullGraph):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
147 self._listener(p, fullGraph=fullGraph)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
148
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
149 def stop(self):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
150 self._stopped = True
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
151 self._ps.stop()
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
152
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
153 def _onConnectionFailed(self, arg):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
154 reactor.callLater(1, self._reconnect)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
155
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
156 def _onConnectionLost(self, arg):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
157 reactor.callLater(1, self._reconnect)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
158
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
159 class LocalStatements(object):
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
160 """
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
161 functions that make statements originating from sse_collector itself
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
162 """
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
163 def __init__(self, applyPatch):
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
164 self.applyPatch = applyPatch
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
165 self._sourceState = {} # source: state URIRef
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
166
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
167 def setSourceState(self, source, state):
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
168 """
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
169 add a patch to the COLLECTOR graph about the state of this
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
170 source. state=None to remove the source.
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
171 """
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
172 oldState = self._sourceState.get(source, None)
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
173 if state == oldState:
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
174 return
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
175 log.info('source state %s -> %s', source, state)
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
176 if oldState is None:
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
177 self._sourceState[source] = state
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
178 self.applyPatch(COLLECTOR, Patch(addQuads=[
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
179 (COLLECTOR, ROOM['source'], source, COLLECTOR),
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
180 (source, ROOM['state'], state, COLLECTOR),
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
181 ]))
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
182 elif state is None:
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
183 del self._sourceState[source]
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
184 self.applyPatch(COLLECTOR, Patch(delQuads=[
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
185 (COLLECTOR, ROOM['source'], source, COLLECTOR),
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
186 (source, ROOM['state'], oldState, COLLECTOR),
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
187 ]))
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
188 else:
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
189 self._sourceState[source] = state
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
190 self.applyPatch(COLLECTOR, Patch(
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
191 addQuads=[
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
192 (source, ROOM['state'], state, COLLECTOR),
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
193 ],
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
194 delQuads=[
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
195 (source, ROOM['state'], oldState, COLLECTOR),
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
196 ]))
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
197
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
198 def abbrevTerm(t):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
199 if isinstance(t, URIRef):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
200 return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
201 .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
202 return t
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
203
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
204 def abbrevStmt(stmt):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
205 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt))
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
206
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
207 class ActiveStatements(object):
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
208 def __init__(self):
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
209
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
210 # This table holds statements asserted by any of our sources
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
211 # plus local statements that we introduce (source is
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
212 # http://bigasterisk.com/sse_collector/).
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
213 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)`
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
214
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
215 def _postDeleteStatements(self):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
216 statements = self.statements
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
217 class PostDeleter(object):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
218 def __enter__(self):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
219 self._garbage = []
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
220 return self
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
221 def add(self, stmt):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
222 self._garbage.append(stmt)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
223 def __exit__(self, type, value, traceback):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
224 if type is not None:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
225 raise
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
226 for stmt in self._garbage:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
227 del statements[stmt]
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
228 return PostDeleter()
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
229
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
230 def pprintTable(self):
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
231 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
232 print "%03d. %-80s from %s to %s" % (
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
233 i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
234
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
235 def makeSyncPatch(self, handler, sources):
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
236 # todo: this could run all handlers at once, which is how we use it anyway
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
237 adds = []
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
238 dels = []
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
239
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
240 with self._postDeleteStatements() as garbage:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
241 for stmt, (stmtSources, handlers) in self.statements.iteritems():
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
242 belongsInHandler = not set(sources).isdisjoint(stmtSources)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
243 handlerHasIt = handler in handlers
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
244 #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
245 if belongsInHandler and not handlerHasIt:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
246 adds.append(stmt)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
247 handlers.add(handler)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
248 elif not belongsInHandler and handlerHasIt:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
249 dels.append(stmt)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
250 handlers.remove(handler)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
251 if not handlers and not stmtSources:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
252 garbage.add(stmt)
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
253
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
254 return Patch(addQuads=adds, delQuads=dels)
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
255
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
256 def applySourcePatch(self, source, p):
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
257 for stmt in p.addQuads:
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
258 sourceUrls, handlers = self.statements[stmt]
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
259 if source in sourceUrls:
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
260 raise ValueError("%s added stmt that it already had: %s" %
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
261 (source, abbrevStmt(stmt)))
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
262 sourceUrls.add(source)
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
263
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
264 with self._postDeleteStatements() as garbage:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
265 for stmt in p.delQuads:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
266 sourceUrls, handlers = self.statements[stmt]
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
267 if source not in sourceUrls:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
268 raise ValueError("%s deleting stmt that it didn't have: %s" %
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
269 (source, abbrevStmt(stmt)))
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
270 sourceUrls.remove(source)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
271 # this is rare, since some handler probably still has
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
272 # the stmt we're deleting, but it can happen e.g. when
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
273 # a handler was just deleted
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
274 if not sourceUrls and not handlers:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
275 garbage.add(stmt)
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
276
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
277 def replaceSourceStatements(self, source, stmts):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
278 log.debug('replaceSourceStatements with %s stmts', len(stmts))
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
279 newStmts = set(stmts)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
280
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
281 with self._postDeleteStatements() as garbage:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
282 for stmt, (sources, handlers) in self.statements.iteritems():
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
283 if source in sources:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
284 if stmt not in stmts:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
285 sources.remove(source)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
286 if not sources and not handlers:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
287 garbage.add(stmt)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
288 else:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
289 if stmt in stmts:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
290 sources.add(source)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
291 newStmts.discard(stmt)
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
292
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
293 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
294
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
295 def discardHandler(self, handler):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
296 with self._postDeleteStatements() as garbage:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
297 for stmt, (sources, handlers) in self.statements.iteritems():
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
298 handlers.discard(handler)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
299 if not sources and not handlers:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
300 garbage.add(stmt)
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
301
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
302 def discardSource(self, source):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
303 with self._postDeleteStatements() as garbage:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
304 for stmt, (sources, handlers) in self.statements.iteritems():
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
305 sources.discard(source)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
306 if not sources and not handlers:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
307 garbage.add(stmt)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
308
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
309 class GraphClients(object):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
310 """
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
311 All the active PatchSources and SSEHandlers
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
312
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
313 To handle all the overlapping-statement cases, we store a set of
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
314 true statements along with the sources that are currently
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
315 asserting them and the requesters who currently know them. As
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
316 statements come and go, we make patches to send to requesters.
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
317 """
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
318 def __init__(self):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
319 self.clients = {} # url: PatchSource (COLLECTOR is not listed)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
320 self.handlers = set() # handler
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
321 self.statements = ActiveStatements()
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
322
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
323 self._localStatements = LocalStatements(self._onPatch)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
324
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
325 def _sourcesForHandler(self, handler):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
326 streamId = handler.streamId
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
327 matches = [s for s in config['streams'] if s['id'] == streamId]
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
328 if len(matches) != 1:
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
329 raise ValueError("%s matches for %r" % (len(matches), streamId))
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
330 return map(URIRef, matches[0]['sources']) + [COLLECTOR]
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
331
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
332 def _onPatch(self, source, p, fullGraph=False):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
333 if fullGraph:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
334 # a reconnect may need to resend the full graph even
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
335 # though we've already sent some statements
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
336 self.statements.replaceSourceStatements(source, p.addQuads)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
337 else:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
338 self.statements.applySourcePatch(source, p)
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
339
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
340 self._sendUpdatePatch()
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
341
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
342 if log.isEnabledFor(logging.DEBUG):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
343 self.statements.pprintTable()
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
344
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
345 if source != COLLECTOR:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
346 self._localStatements.setSourceState(
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
347 source,
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
348 ROOM['fullGraphReceived'] if fullGraph else
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
349 ROOM['patchesReceived'])
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
350
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
351 def _sendUpdatePatch(self, handler=None):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
352 """
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
353 send a patch event out this handler to bring it up to date with
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
354 self.statements
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
355 """
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
356 # reduce loops here- prepare all patches at once
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
357 for h in (self.handlers if handler is None else [handler]):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
358 p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h))
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
359 if not p.isNoop():
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
360 log.debug("send patch %s to %s", p.shortSummary(), h)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
361 h.sendEvent(message=jsonFromPatch(p), event='patch')
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
362
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
363 def addSseHandler(self, handler):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
364 log.info('addSseHandler %r %r', handler, handler.streamId)
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
365 self.handlers.add(handler)
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
366
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
367 for source in self._sourcesForHandler(handler):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
368 if source not in self.clients and source != COLLECTOR:
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
369 self._localStatements.setSourceState(source, ROOM['connect'])
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
370 ps = self.clients[source] = ReconnectingPatchSource(
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
371 source, listener=lambda p, fullGraph, source=source: self._onPatch(
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
372 source, p, fullGraph))
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
373 self._sendUpdatePatch(handler)
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
374
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
375 def removeSseHandler(self, handler):
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
376 log.info('removeSseHandler %r', handler)
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
377
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
378 self.statements.discardHandler(handler)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
379
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
380 for source in self._sourcesForHandler(handler):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
381 for otherHandler in self.handlers:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
382 if (otherHandler != handler and
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
383 source in self._sourcesForHandler(otherHandler)):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
384 break
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
385 else:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
386 self._stopClient(source)
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
387
1103
b84e956771fc sse_collector now kind of gets concurrent requests right
drewp <drewp@bigasterisk.com>
parents: 1101
diff changeset
388 self.handlers.remove(handler)
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
389
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
390 def _stopClient(self, url):
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
391 if url == COLLECTOR:
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
392 return
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
393
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
394 self.clients[url].stop()
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
395
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
396 self.statements.discardSource(url)
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
397
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
398 self._localStatements.setSourceState(url, None)
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
399 del self.clients[url]
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
400
1105
c8233f4b59cb local state statements and self.statements rewrite
drewp <drewp@bigasterisk.com>
parents: 1104
diff changeset
401
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
402 class SomeGraph(cyclone.sse.SSEHandler):
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
403 _handlerSerial = 0
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
404 def __init__(self, application, request):
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
405 cyclone.sse.SSEHandler.__init__(self, application, request)
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
406 self.streamId = request.uri[len('/graph/'):]
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
407 self.graphClients = self.settings.graphClients
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
408
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
409 self._serial = SomeGraph._handlerSerial
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
410 SomeGraph._handlerSerial += 1
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
411
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
412 def __repr__(self):
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
413 return '<Handler #%s>' % self._serial
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
414
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
415 def bind(self):
1106
fe53ca09febc big rewrites in sse_collector
drewp <drewp@bigasterisk.com>
parents: 1105
diff changeset
416 self.graphClients.addSseHandler(self)
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
417
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
418 def unbind(self):
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
419 self.graphClients.removeSseHandler(self)
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
420
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
421 if __name__ == '__main__':
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
422
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
423 arg = docopt("""
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
424 Usage: sse_collector.py [options]
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
425
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
426 -v Verbose
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
427 """)
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
428
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
429 if arg['-v']:
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
430 import twisted.python.log
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
431 twisted.python.log.startLogging(sys.stdout)
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
432 log.setLevel(logging.DEBUG)
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
433
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
434
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
435 graphClients = GraphClients()
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
436
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
437 reactor.listenTCP(
1104
78f5fdec0c75 port 9072
drewp <drewp@bigasterisk.com>
parents: 1103
diff changeset
438 9072,
1101
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
439 cyclone.web.Application(
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
440 handlers=[
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
441 (r'/graph/(.*)', SomeGraph),
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
442 ],
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
443 graphClients=graphClients),
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
444 interface='::')
2c7fd4e246ed start sse_collector
drewp <drewp@bigasterisk.com>
parents:
diff changeset
445 reactor.run()