comparison service/collector/sse_collector.py @ 446:346b85a9adbb

rollback the unicode(source) optimization. it was breaking all output to patch consumers Ignore-this: 4fabc1d106299a8094b85ff9909c660e
author drewp@bigasterisk.com
date Thu, 18 Apr 2019 16:55:52 -0700
parents 96d712dccf28
children ef7eba0551f2
comparison
equal deleted inserted replaced
445:975384ebd88e 446:346b85a9adbb
75 ])) 75 ]))
76 76
77 def abbrevTerm(t): 77 def abbrevTerm(t):
78 if isinstance(t, URIRef): 78 if isinstance(t, URIRef):
79 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') 79 return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
80 .replace('http://projects.bigasterisk.com/device/', 'dev:')
80 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) 81 .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
81 return t 82 return t
82 83
83 def abbrevStmt(stmt): 84 def abbrevStmt(stmt):
84 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) 85 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt))
85 86
86 class ActiveStatements(object): 87 class ActiveStatements(object):
87 def __init__(self): 88 def __init__(self):
88
89 # This table holds statements asserted by any of our sources 89 # This table holds statements asserted by any of our sources
90 # plus local statements that we introduce (source is 90 # plus local statements that we introduce (source is
91 # http://bigasterisk.com/sse_collector/). 91 # http://bigasterisk.com/sse_collector/).
92 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` 92 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)
93 93
94 def state(self): 94 def state(self):
95 return { 95 return {
96 'len': len(self.statements), 96 'len': len(self.statements),
97 } 97 }
141 garbage.add(stmt) 141 garbage.add(stmt)
142 142
143 return Patch(addQuads=adds, delQuads=dels) 143 return Patch(addQuads=adds, delQuads=dels)
144 144
145 def applySourcePatch(self, source, p): 145 def applySourcePatch(self, source, p):
146 source = unicode(source)
147 for stmt in p.addQuads: 146 for stmt in p.addQuads:
148 sourceUrls, handlers = self.statements[stmt] 147 sourceUrls, handlers = self.statements[stmt]
149 if unicode(source) in sourceUrls: 148 if source in sourceUrls:
150 raise ValueError("%s added stmt that it already had: %s" % 149 raise ValueError("%s added stmt that it already had: %s" %
151 (source, abbrevStmt(stmt))) 150 (source, abbrevStmt(stmt)))
152 sourceUrls.add(source) 151 sourceUrls.add(source)
153 152
154 with self._postDeleteStatements() as garbage: 153 with self._postDeleteStatements() as garbage:
155 for stmt in p.delQuads: 154 for stmt in p.delQuads:
156 sourceUrls, handlers = self.statements[stmt] 155 sourceUrls, handlers = self.statements[stmt]
157 if unicode(source) not in sourceUrls: 156 if source not in sourceUrls:
158 raise ValueError("%s deleting stmt that it didn't have: %s" % 157 raise ValueError("%s deleting stmt that it didn't have: %s" %
159 (source, abbrevStmt(stmt))) 158 (source, abbrevStmt(stmt)))
160 sourceUrls.remove(source) 159 sourceUrls.remove(source)
161 # this is rare, since some handler probably still has 160 # this is rare, since some handler probably still has
162 # the stmt we're deleting, but it can happen e.g. when 161 # the stmt we're deleting, but it can happen e.g. when
164 if not sourceUrls and not handlers: 163 if not sourceUrls and not handlers:
165 garbage.add(stmt) 164 garbage.add(stmt)
166 165
167 @STATS.replaceSourceStatements.time() 166 @STATS.replaceSourceStatements.time()
168 def replaceSourceStatements(self, source, stmts): 167 def replaceSourceStatements(self, source, stmts):
169 source = unicode(source)
170 log.debug('replaceSourceStatements with %s stmts', len(stmts)) 168 log.debug('replaceSourceStatements with %s stmts', len(stmts))
171 newStmts = set(stmts) 169 newStmts = set(stmts)
172 170
173 with self._postDeleteStatements() as garbage: 171 with self._postDeleteStatements() as garbage:
174 for stmt, (sources, handlers) in self.statements.iteritems(): 172 for stmt, (sources, handlers) in self.statements.iteritems():
190 handlers.discard(handler) 188 handlers.discard(handler)
191 if not sources and not handlers: 189 if not sources and not handlers:
192 garbage.add(stmt) 190 garbage.add(stmt)
193 191
194 def discardSource(self, source): 192 def discardSource(self, source):
195 source = unicode(source)
196 with self._postDeleteStatements() as garbage: 193 with self._postDeleteStatements() as garbage:
197 for stmt, (sources, handlers) in self.statements.iteritems(): 194 for stmt, (sources, handlers) in self.statements.iteritems():
198 sources.discard(source) 195 sources.discard(source)
199 if not sources and not handlers: 196 if not sources and not handlers:
200 garbage.add(stmt) 197 garbage.add(stmt)