Mercurial > code > home > repos > rdfdb
changeset 114:bd3ae09e3312
rough and untested support for async handler funcs
author | drewp@bigasterisk.com |
---|---|
date | Wed, 01 Jun 2022 16:55:05 -0700 |
parents | a9d49b297529 |
children | 489e341f765d |
files | rdfdb/syncedgraph/autodepgraphapi.py rdfdb/syncedgraph/syncedgraph_base.py |
diffstat | 2 files changed, 41 insertions(+), 17 deletions(-) [+] |
line wrap: on
line diff
--- a/rdfdb/syncedgraph/autodepgraphapi.py Wed Jun 01 16:54:12 2022 -0700 +++ b/rdfdb/syncedgraph/autodepgraphapi.py Wed Jun 01 16:55:05 2022 -0700 @@ -1,5 +1,5 @@ import logging -from typing import Callable, Dict, List, Set, Tuple +from typing import Callable, Coroutine, Dict, List, Set, Tuple, Union from rdfdb.syncedgraph.currentstategraphapi import \ contextsForStatementNoWildcards @@ -9,7 +9,13 @@ log = logging.getLogger('autodepgraphapi') +class AsyncHandlerFunc: + def __init__(self, afunc: Callable[[], Coroutine[None,None,None]]): + self.f = afunc + async def call(self): + await self.f() +StackFunc = Union[Callable[[], None], AsyncHandlerFunc] class AutoDepGraphApi(SyncedGraphBase): """ knockoutjs-inspired API for automatically building a dependency @@ -27,7 +33,7 @@ def __init__(self): self._watchers = _GraphWatchers() - self.currentFuncs: List[Callable[[], None]] = [] # stack of addHandler callers + self.currentFuncs: List[StackFunc] = [] # stack of addHandler callers def addHandler(self, func: Callable[[], None]) -> None: """ @@ -62,7 +68,22 @@ self.currentFuncs.pop() log.debug('graph.currentFuncs pop %s. stack now has %s', func, len(self.currentFuncs)) - def runDepsOnNewPatch(self, p: Patch): + async def addAsyncHandler(self, afunc: Callable[[], Coroutine[None,None,None]]): + self.currentFuncs.append(AsyncHandlerFunc(afunc)) + log.debug('graph.currentFuncs push-=a %s', afunc) + try: + try: + await afunc() + except: + import traceback + traceback.print_exc() + raise + finally: + self.currentFuncs.pop() + log.debug('graph.currentFuncs pop %s. stack now has %s', afunc, len(self.currentFuncs)) + + + async def runDepsOnNewPatch(self, p: Patch): """ patch p just happened to the graph; call everyone back who might care, and then notice what data they depend on now @@ -72,9 +93,12 @@ for func in whoCares: # todo: forget the old handlers for this func log.debug('runDepsOnNewPatch calling watcher %s', p.shortSummary()) - self.addHandler(func) + if isinstance(func, AsyncHandlerFunc): + await self.addAsyncHandler(func.f) + else: + self.addHandler(func) - def _getCurrentFunc(self): + def _getCurrentFunc(self) -> StackFunc: if not self.currentFuncs: # this may become a warning later raise ValueError("asked for graph data outside of a handler") @@ -150,7 +174,7 @@ # subjects(RDF.type, t) call -HandlerSet = Set[Callable[[], None]] +HandlerSet = Set[StackFunc] class _GraphWatchers(object): @@ -164,7 +188,7 @@ self._handlersSpo: Dict[Tuple[URIRef, URIRef, URIRef], HandlerSet] = {} # (s,p,o): set(handlers) self._handlersS: Dict[URIRef, HandlerSet] = {} # s: set(handlers) - def addSubjPredWatcher(self, func, s, p): + def addSubjPredWatcher(self, func:StackFunc, s, p): if func is None: return key = s, p @@ -183,13 +207,13 @@ def addSubjectWatcher(self, func, s): self._handlersS.setdefault(s, set()).add(func) - def whoCares(self, patch): + def whoCares(self, patch) ->Set[StackFunc]: """what handler functions would care about the changes in this patch? this removes the handlers that it gives you """ - # self.dependencies() - ret: Set[Callable[[], None]] = set() + #self.dependencies() + ret: Set[StackFunc] = set() affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads] + [(s, p) for s, p, o, c in patch.delQuads]) for (s, p), funcs in self._handlersSp.items(): if (s, p) in affectedSubjPreds:
--- a/rdfdb/syncedgraph/syncedgraph_base.py Wed Jun 01 16:54:12 2022 -0700 +++ b/rdfdb/syncedgraph/syncedgraph_base.py Wed Jun 01 16:55:05 2022 -0700 @@ -109,7 +109,7 @@ self.ws = ws async for msg in ws: try: - self._onIncomingMsg(msg.data) + await self._onIncomingMsg(msg.data) except Exception: traceback.print_exc() raise @@ -118,7 +118,7 @@ log.info("lost connection- retry") await asyncio.sleep(4) - def _onIncomingMsg(self, body: str): + async def _onIncomingMsg(self, body: str): j = json.loads(body) if 'connectedAs' in j: self.connectionId = j['connectedAs'] @@ -126,7 +126,7 @@ elif 'patch' in j: p = Patch(jsonRepr=body) # todo: repeated parse log.debug("received patch %s", p.shortSummary()) - self._onPatchFromDb(p) + await self._onPatchFromDb(p) else: log.warn('unknown msg from websocket: %s...', body[:32]) @@ -156,7 +156,7 @@ # diff against old entire graph # broadcast that change - def runDepsOnNewPatch(self, p): + async def runDepsOnNewPatch(self, p): # See AutoDepGraphApi pass @@ -184,7 +184,7 @@ await self._resync() return log.debug('runDepsOnNewPatch') - self.runDepsOnNewPatch(p) + await self.runDepsOnNewPatch(p) log.debug('sendPatch') await self.ws.send_str(p.jsonRepr) log.debug('patch is done %s', debugKey) @@ -202,7 +202,7 @@ patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) log.debug("graph now has %s statements" % len(self._graph)) - def _onPatchFromDb(self, p: Patch): + async def _onPatchFromDb(self, p: Patch): """ central server has sent us a patch """ @@ -214,7 +214,7 @@ self._applyPatchLocally(p) try: - self.runDepsOnNewPatch(p) + await self.runDepsOnNewPatch(p) except Exception: # don't reflect this error back to the server; we did # receive its patch correctly. However, we're in a bad