changeset 1493:391140f53dfd

reformat Ignore-this: 823c997b62c14b717c614c3205c23bd darcs-hash:1ce5bace80bbdad6fb904bd3dfcf32ed0f1fa127
author drewp <drewp@bigasterisk.com>
date Wed, 29 Jan 2020 01:04:09 -0800
parents 7c7415cfbc02
children 732e30c509d6
files service/collector/sse_collector.py
diffstat 1 files changed, 24 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- a/service/collector/sse_collector.py	Wed Jan 29 01:03:40 2020 -0800
+++ b/service/collector/sse_collector.py	Wed Jan 29 01:04:09 2020 -0800
@@ -18,7 +18,7 @@
 else:
     class StatementType: pass  # type: ignore
 
-    
+
 from rdflib.term import Node
 from twisted.internet import reactor, defer
 from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional
@@ -107,7 +107,7 @@
         self.bound = False
         self.created = time.time()
         self.graphClients = self.settings.graphClients
-        
+
         self._serial = PatchSink._handlerSerial
         PatchSink._handlerSerial += 1
         self.lastPatchSentTime: float = 0.0
@@ -124,7 +124,7 @@
             'foafAgent': self.request.headers.get('X-Foaf-Agent'),
             'userAgent': self.request.headers.get('user-agent'),
         }
-        
+
     def bind(self, *args, **kwargs):
         self.streamId = args[0]
 
@@ -132,12 +132,12 @@
         # If something goes wrong with addSseHandler, I don't want to
         # try removeSseHandler.
         self.bound = True
-        
+
     def unbind(self) -> None:
         if self.bound:
             self.graphClients.removeSseHandler(self)
 
-            
+
 StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[PatchSink]]]
 
 
@@ -148,17 +148,17 @@
     def __enter__(self):
         self._garbage: List[StatementType] = []
         return self
-        
+
     def add(self, stmt: StatementType):
         self._garbage.append(stmt)
-        
+
     def __exit__(self, type, value, traceback):
         if type is not None:
             raise
         for stmt in self._garbage:
             del self.statements[stmt]
 
-            
+
 class ActiveStatements(object):
     def __init__(self):
         # This table holds statements asserted by any of our sources
@@ -171,10 +171,10 @@
         return {
             'len': len(self.table),
             }
-        
+
     def postDeleteStatements(self) -> PostDeleter:
         return PostDeleter(self.table)
-        
+
     def pprintTable(self) -> None:
         for i, (stmt, (sources, handlers)) in enumerate(
                 sorted(self.table.items())):
@@ -207,7 +207,7 @@
                         garbage.add(stmt)
 
         return Patch(addQuads=adds, delQuads=dels)
-        
+
     def applySourcePatch(self, source: SourceUri, p: Patch):
         for stmt in p.addQuads:
             sourceUrls, handlers = self.table[stmt]
@@ -215,7 +215,7 @@
                 raise ValueError("%s added stmt that it already had: %s" %
                                  (source, abbrevStmt(stmt)))
             sourceUrls.add(source)
-            
+
         with self.postDeleteStatements() as garbage:
             for stmt in p.delQuads:
                 sourceUrls, handlers = self.table[stmt]
@@ -264,7 +264,7 @@
                     garbage.add(stmt)
 
 
-                    
+
 class GraphClients(object):
     """
     All the active PatchSources and SSEHandlers
@@ -278,7 +278,7 @@
         self.clients: Dict[SourceUri, PatchSource] = {}  # (COLLECTOR is not listed)
         self.handlers: Set[PatchSink] = set()
         self.statements: ActiveStatements = ActiveStatements()
-        
+
         self._localStatements = LocalStatements(self._onPatch)
 
     def state(self) -> Dict:
@@ -352,7 +352,7 @@
                 h.lastPatchSentTime = now
             else:
                 log.debug('nothing to send to %s', h)
-                
+
     def addSseHandler(self, handler: PatchSink):
         log.info('addSseHandler %r %r', handler, handler.streamId)
 
@@ -360,7 +360,7 @@
         sources = self._sourcesForHandler(handler)
 
         self.handlers.add(handler)
-        
+
         for source in sources:
             if source not in self.clients and source != COLLECTOR:
                 log.debug('connect to patch source %s', source)
@@ -373,7 +373,7 @@
         log.debug('bring new client up to date')
 
         self._sendUpdatePatch(handler)
-        
+
     def removeSseHandler(self, handler: PatchSink):
         log.info('removeSseHandler %r', handler)
         self.statements.discardHandler(handler)
@@ -385,23 +385,23 @@
                     break
             else:
                 self._stopClient(source)
-            
+
         self.handlers.remove(handler)
 
     def _stopClient(self, url: SourceUri):
         if url == COLLECTOR:
             return
-            
+
         self.clients[url].stop()
 
         self.statements.discardSource(url)
-        
+
         self._localStatements.setSourceState(url, None)
         if url in self.clients:
             del self.clients[url]
 
         self.cleanup()
-        
+
     def cleanup(self):
         """
         despite the attempts above, we still get useless rows in the table
@@ -411,7 +411,7 @@
             for stmt, (sources, handlers) in self.statements.table.items():
                 if not sources and not any(h in self.handlers for h in handlers):
                     garbage.add(stmt)
-                    
+
 
 class State(cyclone.web.RequestHandler):
     @STATS.getState.time()
@@ -439,14 +439,14 @@
 
     graphClients = GraphClients()
     #exporter = InfluxExporter(... to export some stats values
-    
+
     reactor.listenTCP(
         9072,
         cyclone.web.Application(
             handlers=[
                 (r"/()", cyclone.web.StaticFileHandler, {
                     "path": "static", "default_filename": "index.html"}),
-                (r'/static/(.*)',cyclone.web.StaticFileHandler, {"path": "static"}), 
+                (r'/static/(.*)',cyclone.web.StaticFileHandler, {"path": "static"}),
                 (r'/state', State),
                 (r'/graph/(.*)', PatchSink),
                 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}),