Mercurial > code > home > repos > homeauto
comparison service/collector/sse_collector.py @ 446:346b85a9adbb
rollback the unicode(source) optimization. it was breaking all output to patch consumers
Ignore-this: 4fabc1d106299a8094b85ff9909c660e
author | drewp@bigasterisk.com |
---|---|
date | Thu, 18 Apr 2019 16:55:52 -0700 |
parents | 96d712dccf28 |
children | ef7eba0551f2 |
comparison
equal
deleted
inserted
replaced
445:975384ebd88e | 446:346b85a9adbb |
---|---|
75 ])) | 75 ])) |
76 | 76 |
77 def abbrevTerm(t): | 77 def abbrevTerm(t): |
78 if isinstance(t, URIRef): | 78 if isinstance(t, URIRef): |
79 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') | 79 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') |
80 .replace('http://projects.bigasterisk.com/device/', 'dev:') | |
80 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) | 81 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) |
81 return t | 82 return t |
82 | 83 |
83 def abbrevStmt(stmt): | 84 def abbrevStmt(stmt): |
84 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) | 85 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) |
85 | 86 |
86 class ActiveStatements(object): | 87 class ActiveStatements(object): |
87 def __init__(self): | 88 def __init__(self): |
88 | |
89 # This table holds statements asserted by any of our sources | 89 # This table holds statements asserted by any of our sources |
90 # plus local statements that we introduce (source is | 90 # plus local statements that we introduce (source is |
91 # http://bigasterisk.com/sse_collector/). | 91 # http://bigasterisk.com/sse_collector/). |
92 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` | 92 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers) |
93 | 93 |
94 def state(self): | 94 def state(self): |
95 return { | 95 return { |
96 'len': len(self.statements), | 96 'len': len(self.statements), |
97 } | 97 } |
141 garbage.add(stmt) | 141 garbage.add(stmt) |
142 | 142 |
143 return Patch(addQuads=adds, delQuads=dels) | 143 return Patch(addQuads=adds, delQuads=dels) |
144 | 144 |
145 def applySourcePatch(self, source, p): | 145 def applySourcePatch(self, source, p): |
146 source = unicode(source) | |
147 for stmt in p.addQuads: | 146 for stmt in p.addQuads: |
148 sourceUrls, handlers = self.statements[stmt] | 147 sourceUrls, handlers = self.statements[stmt] |
149 if unicode(source) in sourceUrls: | 148 if source in sourceUrls: |
150 raise ValueError("%s added stmt that it already had: %s" % | 149 raise ValueError("%s added stmt that it already had: %s" % |
151 (source, abbrevStmt(stmt))) | 150 (source, abbrevStmt(stmt))) |
152 sourceUrls.add(source) | 151 sourceUrls.add(source) |
153 | 152 |
154 with self._postDeleteStatements() as garbage: | 153 with self._postDeleteStatements() as garbage: |
155 for stmt in p.delQuads: | 154 for stmt in p.delQuads: |
156 sourceUrls, handlers = self.statements[stmt] | 155 sourceUrls, handlers = self.statements[stmt] |
157 if unicode(source) not in sourceUrls: | 156 if source not in sourceUrls: |
158 raise ValueError("%s deleting stmt that it didn't have: %s" % | 157 raise ValueError("%s deleting stmt that it didn't have: %s" % |
159 (source, abbrevStmt(stmt))) | 158 (source, abbrevStmt(stmt))) |
160 sourceUrls.remove(source) | 159 sourceUrls.remove(source) |
161 # this is rare, since some handler probably still has | 160 # this is rare, since some handler probably still has |
162 # the stmt we're deleting, but it can happen e.g. when | 161 # the stmt we're deleting, but it can happen e.g. when |
164 if not sourceUrls and not handlers: | 163 if not sourceUrls and not handlers: |
165 garbage.add(stmt) | 164 garbage.add(stmt) |
166 | 165 |
167 @STATS.replaceSourceStatements.time() | 166 @STATS.replaceSourceStatements.time() |
168 def replaceSourceStatements(self, source, stmts): | 167 def replaceSourceStatements(self, source, stmts): |
169 source = unicode(source) | |
170 log.debug('replaceSourceStatements with %s stmts', len(stmts)) | 168 log.debug('replaceSourceStatements with %s stmts', len(stmts)) |
171 newStmts = set(stmts) | 169 newStmts = set(stmts) |
172 | 170 |
173 with self._postDeleteStatements() as garbage: | 171 with self._postDeleteStatements() as garbage: |
174 for stmt, (sources, handlers) in self.statements.iteritems(): | 172 for stmt, (sources, handlers) in self.statements.iteritems(): |
190 handlers.discard(handler) | 188 handlers.discard(handler) |
191 if not sources and not handlers: | 189 if not sources and not handlers: |
192 garbage.add(stmt) | 190 garbage.add(stmt) |
193 | 191 |
194 def discardSource(self, source): | 192 def discardSource(self, source): |
195 source = unicode(source) | |
196 with self._postDeleteStatements() as garbage: | 193 with self._postDeleteStatements() as garbage: |
197 for stmt, (sources, handlers) in self.statements.iteritems(): | 194 for stmt, (sources, handlers) in self.statements.iteritems(): |
198 sources.discard(source) | 195 sources.discard(source) |
199 if not sources and not handlers: | 196 if not sources and not handlers: |
200 garbage.add(stmt) | 197 garbage.add(stmt) |