changeset 12:032e59be8fe9

refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
author drewp@bigasterisk.com
date Fri, 25 Nov 2022 20:58:08 -0800
parents baf886e01ed1
children bfd95926be6e
files collector.py merge.py patchsink.py
diffstat 3 files changed, 235 insertions(+), 223 deletions(-) [+]
line wrap: on
line diff
--- a/collector.py	Fri Nov 25 20:57:38 2022 -0800
+++ b/collector.py	Fri Nov 25 20:58:08 2022 -0800
@@ -7,11 +7,10 @@
 - filter out unneeded stmts from the sources
 - give a time resolution and concatenate any patches that come faster than that res
 """
-import collections
 import json
 import logging
 import time
-from typing import (Callable, Dict, List, Optional, Sequence, Set, Tuple, Union)
+from typing import Dict, List, Optional, Set, Union
 
 import cyclone.sse
 import cyclone.web
@@ -23,11 +22,12 @@
 from prometheus_client.registry import REGISTRY
 from rdfdb.patch import Patch
 from rdflib import Namespace, URIRef
-from rdflib.term import Node
 from standardservice.logsetup import enableTwistedLog, log
 from twisted.internet import defer, reactor
 
 from collector_config import config
+from merge import SourceUri, ActiveStatements, LocalStatements
+from patchsink import PatchSink
 
 import cyclone.sse
 def py3_sendEvent(self, message, event=None, eid=None, retry=None):
@@ -44,26 +44,17 @@
     if retry:
         self.transport.write(b"retry: %s\n" % retry)
     self.transport.write(b"data: %s\n\n" % message)
-cyclone.sse.SSEHandler.sendEvent = py3_sendEvent
 
 
-Statement = Tuple[Node, Node, Node, Node]
-
-
-# SourceUri = NewType('SourceUri', URIRef) # doesn't work
-class SourceUri(URIRef):
-    pass
+cyclone.sse.SSEHandler.sendEvent = py3_sendEvent
 
 
 ROOM = Namespace("http://projects.bigasterisk.com/room/")
 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
 
 GET_STATE_CALLS = Summary("get_state_calls", 'calls')
-LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls')
-MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls')
 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls')
 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls')
-REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls')
 
 
 class Metrics(cyclone.web.RequestHandler):
@@ -73,214 +64,6 @@
         self.write(generate_latest(REGISTRY))
 
 
-class LocalStatements(object):
-    """
-    functions that make statements originating from sse_collector itself
-    """
-
-    def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]):
-        self.applyPatch = applyPatch
-        self._sourceState: Dict[SourceUri, Optional[URIRef]] = {}  # source: state URIRef
-
-    @LOCAL_STATEMENTS_PATCH_CALLS.time()
-    def setSourceState(self, source: SourceUri, state: Optional[URIRef]):
-        """
-        add a patch to the COLLECTOR graph about the state of this
-        source. state=None to remove the source.
-        """
-        oldState = self._sourceState.get(source, None)
-        if state == oldState:
-            return
-        log.info('source state %s -> %s', source, state)
-        if oldState is None:
-            self._sourceState[source] = state
-            self.applyPatch(COLLECTOR, Patch(addQuads=[
-                (COLLECTOR, ROOM['source'], source, COLLECTOR),
-                (source, ROOM['state'], state, COLLECTOR),
-            ]))
-        elif state is None:
-            del self._sourceState[source]
-            self.applyPatch(COLLECTOR, Patch(delQuads=[
-                (COLLECTOR, ROOM['source'], source, COLLECTOR),
-                (source, ROOM['state'], oldState, COLLECTOR),
-            ]))
-        else:
-            self._sourceState[source] = state
-            self.applyPatch(COLLECTOR, Patch(addQuads=[
-                (source, ROOM['state'], state, COLLECTOR),
-            ], delQuads=[
-                (source, ROOM['state'], oldState, COLLECTOR),
-            ]))
-
-
-def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]:
-    if isinstance(t, URIRef):
-        return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/',
-                                                                                    'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:'))
-    return t
-
-
-def abbrevStmt(stmt: Statement) -> str:
-    return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3]))
-
-
-class PatchSink(cyclone.sse.SSEHandler):
-    _handlerSerial = 0
-
-    def __init__(self, application: cyclone.web.Application, request):
-        cyclone.sse.SSEHandler.__init__(self, application, request)
-        self.bound = False
-        self.created = time.time()
-        self.graphClients = self.settings.graphClients
-
-        self._serial = PatchSink._handlerSerial
-        PatchSink._handlerSerial += 1
-        self.lastPatchSentTime: float = 0.0
-
-    def __repr__(self) -> str:
-        return '<Handler #%s>' % self._serial
-
-    def state(self) -> Dict:
-        return {
-            'created': round(self.created, 2),
-            'ageHours': round((time.time() - self.created) / 3600, 2),
-            'streamId': self.streamId,
-            'remoteIp': self.request.remote_ip,  # wrong, need some forwarded-for thing
-            'foafAgent': self.request.headers.get('X-Foaf-Agent'),
-            'userAgent': self.request.headers.get('user-agent'),
-        }
-
-    def bind(self, *args, **kwargs):
-        self.streamId = args[0]
-
-        self.graphClients.addSseHandler(self)
-        # If something goes wrong with addSseHandler, I don't want to
-        # try removeSseHandler.
-        self.bound = True
-
-    def unbind(self) -> None:
-        if self.bound:
-            self.graphClients.removeSseHandler(self)
-
-
-StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]]
-
-
-class PostDeleter(object):
-
-    def __init__(self, statements: StatementTable):
-        self.statements = statements
-
-    def __enter__(self):
-        self._garbage: List[Statement] = []
-        return self
-
-    def add(self, stmt: Statement):
-        self._garbage.append(stmt)
-
-    def __exit__(self, type, value, traceback):
-        if type is not None:
-            raise NotImplementedError()
-        for stmt in self._garbage:
-            del self.statements[stmt]
-
-
-class ActiveStatements(object):
-
-    def __init__(self):
-        # This table holds statements asserted by any of our sources
-        # plus local statements that we introduce (source is
-        # http://bigasterisk.com/sse_collector/).
-        self.table: StatementTable = collections.defaultdict(lambda: (set(), set()))
-
-    def state(self) -> Dict:
-        return {
-            'len': len(self.table),
-        }
-
-    def postDeleteStatements(self) -> PostDeleter:
-        return PostDeleter(self.table)
-
-    def pprintTable(self) -> None:
-        for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())):
-            print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers))
-
-    @MAKE_SYNC_PATCH_CALLS.time()
-    def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]):
-        # todo: this could run all handlers at once, which is how we
-        # use it anyway
-        adds = []
-        dels = []
-
-        with self.postDeleteStatements() as garbage:
-            for stmt, (stmtSources, handlers) in self.table.items():
-                belongsInHandler = not sources.isdisjoint(stmtSources)
-                handlerHasIt = handler in handlers
-                # log.debug("%s belong=%s has=%s",
-                #           abbrevStmt(stmt), belongsInHandler, handlerHasIt)
-                if belongsInHandler and not handlerHasIt:
-                    adds.append(stmt)
-                    handlers.add(handler)
-                elif not belongsInHandler and handlerHasIt:
-                    dels.append(stmt)
-                    handlers.remove(handler)
-                    if not handlers and not stmtSources:
-                        garbage.add(stmt)
-
-        return Patch(addQuads=adds, delQuads=dels)
-
-    def applySourcePatch(self, source: SourceUri, p: Patch):
-        for stmt in p.addQuads:
-            sourceUrls, handlers = self.table[stmt]
-            if source in sourceUrls:
-                raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt)))
-            sourceUrls.add(source)
-
-        with self.postDeleteStatements() as garbage:
-            for stmt in p.delQuads:
-                sourceUrls, handlers = self.table[stmt]
-                if source not in sourceUrls:
-                    raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt)))
-                sourceUrls.remove(source)
-                # this is rare, since some handler probably still has
-                # the stmt we're deleting, but it can happen e.g. when
-                # a handler was just deleted
-                if not sourceUrls and not handlers:
-                    garbage.add(stmt)
-
-    @REPLACE_SOURCE_STATEMENTS_CALLS.time()
-    def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]):
-        log.debug('replaceSourceStatements with %s stmts', len(stmts))
-        newStmts = set(stmts)
-
-        with self.postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.table.items():
-                if source in sources:
-                    if stmt not in stmts:
-                        sources.remove(source)
-                        if not sources and not handlers:
-                            garbage.add(stmt)
-                else:
-                    if stmt in stmts:
-                        sources.add(source)
-                newStmts.discard(stmt)
-
-        self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
-
-    def discardHandler(self, handler: PatchSink):
-        with self.postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.table.items():
-                handlers.discard(handler)
-                if not sources and not handlers:
-                    garbage.add(stmt)
-
-    def discardSource(self, source: SourceUri):
-        with self.postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.table.items():
-                sources.discard(source)
-                if not sources and not handlers:
-                    garbage.add(stmt)
-
 
 class GraphClients(object):
     """
@@ -465,7 +248,6 @@
                 (r'/graph/', GraphList),
                 (r'/graph/(.+)', PatchSink),
                 (r'/metrics', Metrics),
-            ],
-            graphClients=graphClients),
+            ], graphClients=graphClients),
         interface='::')
     reactor.run()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/merge.py	Fri Nov 25 20:58:08 2022 -0800
@@ -0,0 +1,187 @@
+import collections
+from typing import Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, NewType
+from prometheus_client import Summary
+
+from rdfdb.patch import Patch
+from rdflib import Namespace, URIRef
+from rdflib.term import Node
+from standardservice.logsetup import enableTwistedLog, log
+from patchsink import PatchSink
+LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls')
+MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls')
+REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls')
+
+SourceUri = NewType('SourceUri', URIRef)
+
+Statement = Tuple[Node, Node, Node, Node]
+StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]]
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
+
+
+def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]:
+    if isinstance(t, URIRef):
+        return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/',
+                                                                                    'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:'))
+    return t
+
+
+def abbrevStmt(stmt: Statement) -> str:
+    return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3]))
+
+
+class LocalStatements(object):
+    """
+    functions that make statements originating from sse_collector itself
+    """
+
+    def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]):
+        self.applyPatch = applyPatch
+        self._sourceState: Dict[SourceUri, Optional[URIRef]] = {}  # source: state URIRef
+
+    @LOCAL_STATEMENTS_PATCH_CALLS.time()
+    def setSourceState(self, source: SourceUri, state: Optional[URIRef]):
+        """
+        add a patch to the COLLECTOR graph about the state of this
+        source. state=None to remove the source.
+        """
+        oldState = self._sourceState.get(source, None)
+        if state == oldState:
+            return
+        log.info('source state %s -> %s', source, state)
+        if oldState is None:
+            self._sourceState[source] = state
+            self.applyPatch(COLLECTOR, Patch(addQuads=[
+                (COLLECTOR, ROOM['source'], source, COLLECTOR),
+                (source, ROOM['state'], state, COLLECTOR),
+            ]))
+        elif state is None:
+            del self._sourceState[source]
+            self.applyPatch(COLLECTOR, Patch(delQuads=[
+                (COLLECTOR, ROOM['source'], source, COLLECTOR),
+                (source, ROOM['state'], oldState, COLLECTOR),
+            ]))
+        else:
+            self._sourceState[source] = state
+            self.applyPatch(COLLECTOR, Patch(addQuads=[
+                (source, ROOM['state'], state, COLLECTOR),
+            ], delQuads=[
+                (source, ROOM['state'], oldState, COLLECTOR),
+            ]))
+
+
+class PostDeleter(object):
+
+    def __init__(self, statements: StatementTable):
+        self.statements = statements
+
+    def __enter__(self):
+        self._garbage: List[Statement] = []
+        return self
+
+    def add(self, stmt: Statement):
+        self._garbage.append(stmt)
+
+    def __exit__(self, type, value, traceback):
+        if type is not None:
+            raise NotImplementedError()
+        for stmt in self._garbage:
+            del self.statements[stmt]
+
+
+class ActiveStatements(object):
+
+    def __init__(self):
+        # This table holds statements asserted by any of our sources
+        # plus local statements that we introduce (source is
+        # http://bigasterisk.com/sse_collector/).
+        self.table: StatementTable = collections.defaultdict(lambda: (set(), set()))
+
+    def state(self) -> Dict:
+        return {
+            'len': len(self.table),
+        }
+
+    def postDeleteStatements(self) -> PostDeleter:
+        return PostDeleter(self.table)
+
+    def pprintTable(self) -> None:
+        for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())):
+            print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers))
+
+    @MAKE_SYNC_PATCH_CALLS.time()
+    def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]):
+        # todo: this could run all handlers at once, which is how we
+        # use it anyway
+        adds = []
+        dels = []
+
+        with self.postDeleteStatements() as garbage:
+            for stmt, (stmtSources, handlers) in self.table.items():
+                belongsInHandler = not sources.isdisjoint(stmtSources)
+                handlerHasIt = handler in handlers
+                # log.debug("%s belong=%s has=%s",
+                #           abbrevStmt(stmt), belongsInHandler, handlerHasIt)
+                if belongsInHandler and not handlerHasIt:
+                    adds.append(stmt)
+                    handlers.add(handler)
+                elif not belongsInHandler and handlerHasIt:
+                    dels.append(stmt)
+                    handlers.remove(handler)
+                    if not handlers and not stmtSources:
+                        garbage.add(stmt)
+
+        return Patch(addQuads=adds, delQuads=dels)
+
+    def applySourcePatch(self, source: SourceUri, p: Patch):
+        for stmt in p.addQuads:
+            sourceUrls, handlers = self.table[stmt]
+            if source in sourceUrls:
+                raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt)))
+            sourceUrls.add(source)
+
+        with self.postDeleteStatements() as garbage:
+            for stmt in p.delQuads:
+                sourceUrls, handlers = self.table[stmt]
+                if source not in sourceUrls:
+                    raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt)))
+                sourceUrls.remove(source)
+                # this is rare, since some handler probably still has
+                # the stmt we're deleting, but it can happen e.g. when
+                # a handler was just deleted
+                if not sourceUrls and not handlers:
+                    garbage.add(stmt)
+
+    @REPLACE_SOURCE_STATEMENTS_CALLS.time()
+    def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]):
+        log.debug('replaceSourceStatements with %s stmts', len(stmts))
+        newStmts = set(stmts)
+
+        with self.postDeleteStatements() as garbage:
+            for stmt, (sources, handlers) in self.table.items():
+                if source in sources:
+                    if stmt not in stmts:
+                        sources.remove(source)
+                        if not sources and not handlers:
+                            garbage.add(stmt)
+                else:
+                    if stmt in stmts:
+                        sources.add(source)
+                newStmts.discard(stmt)
+
+        self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
+
+    def discardHandler(self, handler: PatchSink):
+        with self.postDeleteStatements() as garbage:
+            for stmt, (sources, handlers) in self.table.items():
+                handlers.discard(handler)
+                if not sources and not handlers:
+                    garbage.add(stmt)
+
+    def discardSource(self, source: SourceUri):
+        with self.postDeleteStatements() as garbage:
+            for stmt, (sources, handlers) in self.table.items():
+                sources.discard(source)
+                if not sources and not handlers:
+                    garbage.add(stmt)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/patchsink.py	Fri Nov 25 20:58:08 2022 -0800
@@ -0,0 +1,43 @@
+import time
+from typing import Dict
+
+import cyclone.sse
+import cyclone.web
+
+class PatchSink(cyclone.sse.SSEHandler):
+    _handlerSerial = 0
+
+    def __init__(self, application: cyclone.web.Application, request):
+        cyclone.sse.SSEHandler.__init__(self, application, request)
+        self.bound = False
+        self.created = time.time()
+        self.graphClients = self.settings.graphClients
+
+        self._serial = PatchSink._handlerSerial
+        PatchSink._handlerSerial += 1
+        self.lastPatchSentTime: float = 0.0
+
+    def __repr__(self) -> str:
+        return '<Handler #%s>' % self._serial
+
+    def state(self) -> Dict:
+        return {
+            'created': round(self.created, 2),
+            'ageHours': round((time.time() - self.created) / 3600, 2),
+            'streamId': self.streamId,
+            'remoteIp': self.request.remote_ip,  # wrong, need some forwarded-for thing
+            'foafAgent': self.request.headers.get('X-Foaf-Agent'),
+            'userAgent': self.request.headers.get('user-agent'),
+        }
+
+    def bind(self, *args, **kwargs):
+        self.streamId = args[0]
+
+        self.graphClients.addSseHandler(self)
+        # If something goes wrong with addSseHandler, I don't want to
+        # try removeSseHandler.
+        self.bound = True
+
+    def unbind(self) -> None:
+        if self.bound:
+            self.graphClients.removeSseHandler(self)