changeset 114:bd3ae09e3312

rough and untested support for async handler funcs
author drewp@bigasterisk.com
date Wed, 01 Jun 2022 16:55:05 -0700
parents a9d49b297529
children 489e341f765d
files rdfdb/syncedgraph/autodepgraphapi.py rdfdb/syncedgraph/syncedgraph_base.py
diffstat 2 files changed, 41 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/syncedgraph/autodepgraphapi.py	Wed Jun 01 16:54:12 2022 -0700
+++ b/rdfdb/syncedgraph/autodepgraphapi.py	Wed Jun 01 16:55:05 2022 -0700
@@ -1,5 +1,5 @@
 import logging
-from typing import Callable, Dict, List, Set, Tuple
+from typing import Callable, Coroutine, Dict, List, Set, Tuple, Union
 
 from rdfdb.syncedgraph.currentstategraphapi import \
     contextsForStatementNoWildcards
@@ -9,7 +9,13 @@
 
 log = logging.getLogger('autodepgraphapi')
 
+class AsyncHandlerFunc:
+    def __init__(self, afunc: Callable[[], Coroutine[None,None,None]]):
+        self.f = afunc
+    async def call(self):
+        await self.f()
 
+StackFunc = Union[Callable[[], None], AsyncHandlerFunc]
 class AutoDepGraphApi(SyncedGraphBase):
     """
     knockoutjs-inspired API for automatically building a dependency
@@ -27,7 +33,7 @@
 
     def __init__(self):
         self._watchers = _GraphWatchers()
-        self.currentFuncs: List[Callable[[], None]] = []  # stack of addHandler callers
+        self.currentFuncs: List[StackFunc] = []  # stack of addHandler callers
 
     def addHandler(self, func: Callable[[], None]) -> None:
         """
@@ -62,7 +68,22 @@
             self.currentFuncs.pop()
             log.debug('graph.currentFuncs pop %s. stack now has %s', func, len(self.currentFuncs))
 
-    def runDepsOnNewPatch(self, p: Patch):
+    async def addAsyncHandler(self, afunc: Callable[[], Coroutine[None,None,None]]):
+        self.currentFuncs.append(AsyncHandlerFunc(afunc))
+        log.debug('graph.currentFuncs push-=a %s', afunc)
+        try:
+            try:
+                await afunc()
+            except:
+                import traceback
+                traceback.print_exc()
+                raise
+        finally:
+            self.currentFuncs.pop()
+            log.debug('graph.currentFuncs pop %s. stack now has %s', afunc, len(self.currentFuncs))
+
+
+    async def runDepsOnNewPatch(self, p: Patch):
         """
         patch p just happened to the graph; call everyone back who
         might care, and then notice what data they depend on now
@@ -72,9 +93,12 @@
         for func in whoCares:
             # todo: forget the old handlers for this func
             log.debug('runDepsOnNewPatch calling watcher %s', p.shortSummary())
-            self.addHandler(func)
+            if isinstance(func, AsyncHandlerFunc):
+                await self.addAsyncHandler(func.f)
+            else:
+                self.addHandler(func)
 
-    def _getCurrentFunc(self):
+    def _getCurrentFunc(self) -> StackFunc:
         if not self.currentFuncs:
             # this may become a warning later
             raise ValueError("asked for graph data outside of a handler")
@@ -150,7 +174,7 @@
     # subjects(RDF.type, t) call
 
 
-HandlerSet = Set[Callable[[], None]]
+HandlerSet = Set[StackFunc]
 
 
 class _GraphWatchers(object):
@@ -164,7 +188,7 @@
         self._handlersSpo: Dict[Tuple[URIRef, URIRef, URIRef], HandlerSet] = {}  # (s,p,o): set(handlers)
         self._handlersS: Dict[URIRef, HandlerSet] = {}  # s: set(handlers)
 
-    def addSubjPredWatcher(self, func, s, p):
+    def addSubjPredWatcher(self, func:StackFunc, s, p):
         if func is None:
             return
         key = s, p
@@ -183,13 +207,13 @@
     def addSubjectWatcher(self, func, s):
         self._handlersS.setdefault(s, set()).add(func)
 
-    def whoCares(self, patch):
+    def whoCares(self, patch) ->Set[StackFunc]:
         """what handler functions would care about the changes in this patch?
 
         this removes the handlers that it gives you
         """
-        # self.dependencies()
-        ret: Set[Callable[[], None]] = set()
+        #self.dependencies()
+        ret: Set[StackFunc] = set()
         affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads] + [(s, p) for s, p, o, c in patch.delQuads])
         for (s, p), funcs in self._handlersSp.items():
             if (s, p) in affectedSubjPreds:
--- a/rdfdb/syncedgraph/syncedgraph_base.py	Wed Jun 01 16:54:12 2022 -0700
+++ b/rdfdb/syncedgraph/syncedgraph_base.py	Wed Jun 01 16:55:05 2022 -0700
@@ -109,7 +109,7 @@
                     self.ws = ws
                     async for msg in ws:
                         try:
-                            self._onIncomingMsg(msg.data)
+                            await self._onIncomingMsg(msg.data)
                         except Exception:
                             traceback.print_exc()
                             raise
@@ -118,7 +118,7 @@
             log.info("lost connection- retry")
             await asyncio.sleep(4)
 
-    def _onIncomingMsg(self, body: str):
+    async def _onIncomingMsg(self, body: str):
         j = json.loads(body)
         if 'connectedAs' in j:
             self.connectionId = j['connectedAs']
@@ -126,7 +126,7 @@
         elif 'patch' in j:
             p = Patch(jsonRepr=body)  # todo: repeated parse
             log.debug("received patch %s", p.shortSummary())
-            self._onPatchFromDb(p)
+            await self._onPatchFromDb(p)
         else:
             log.warn('unknown msg from websocket: %s...', body[:32])
 
@@ -156,7 +156,7 @@
         # diff against old entire graph
         # broadcast that change
 
-    def runDepsOnNewPatch(self, p):
+    async def runDepsOnNewPatch(self, p):
         # See AutoDepGraphApi
         pass
 
@@ -184,7 +184,7 @@
             await self._resync()
             return
         log.debug('runDepsOnNewPatch')
-        self.runDepsOnNewPatch(p)
+        await self.runDepsOnNewPatch(p)
         log.debug('sendPatch')
         await self.ws.send_str(p.jsonRepr)
         log.debug('patch is done %s', debugKey)
@@ -202,7 +202,7 @@
         patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
         log.debug("graph now has %s statements" % len(self._graph))
 
-    def _onPatchFromDb(self, p: Patch):
+    async def _onPatchFromDb(self, p: Patch):
         """
         central server has sent us a patch
         """
@@ -214,7 +214,7 @@
 
         self._applyPatchLocally(p)
         try:
-            self.runDepsOnNewPatch(p)
+            await self.runDepsOnNewPatch(p)
         except Exception:
             # don't reflect this error back to the server; we did
             # receive its patch correctly. However, we're in a bad