Changeset - 51adfea492a5
[Not reviewed]
default
0 6 0
drewp@bigasterisk.com - 12 years ago 2013-03-26 07:59:48
drewp@bigasterisk.com
clarify some private api
Ignore-this: ddb72cc0dc9b8fe6c2448b29bab678f5
6 files changed with 50 insertions and 28 deletions:
0 comments (0 inline, 0 general)
light9/rdfdb/autodepgraphapi.py
Show inline comments
 
@@ -2,17 +2,20 @@ import logging
 
from rdflib import RDF, RDFS
 
from light9.rdfdb.currentstategraphapi import contextsForStatementNoWildcards
 
log = logging.getLogger('syncedgraph')
 

	
 
class AutoDepGraphApi(object):
 
    """
 
    knockoutjs-inspired API for automatically building a dependency
 
    tree while reading the graph. See addHandler().
 
    
 
    mixin for SyncedGraph, separated here because these methods work together
 
    """
 

	
 
    def __init__(self):
 
        self._watchers = GraphWatchers()
 
        self._watchers = _GraphWatchers()
 
        self.currentFuncs = [] # stack of addHandler callers
 
    
 
    def addHandler(self, func):
 
        """
 
        run this (idempotent) func, noting what graph values it
 
        uses. Run it again in the future if there are changes to those
 
@@ -99,13 +102,13 @@ class AutoDepGraphApi(object):
 
    # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell
 
    # you only what results have just appeared or disappeared. I think
 
    # I'm going to be repeating that logic a lot. Maybe just for the
 
    # subjects(RDF.type, t) call
 

	
 

	
 
class GraphWatchers(object):
 
class _GraphWatchers(object):
 
    """
 
    store the current handlers that care about graph changes
 
    """
 
    def __init__(self):
 
        self._handlersSp = {} # (s,p): set(handlers)
 
        self._handlersPo = {} # (p,o): set(handlers)
light9/rdfdb/currentstategraphapi.py
Show inline comments
 
from rdflib import ConjunctiveGraph
 
from light9.rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement
 

	
 
class CurrentStateGraphApi(object):
 
    
 
    """
 
    mixin for SyncedGraph, separated here because these methods work together
 
    """
 

	
 
    def currentState(self, context=None):
 
        """
 
        a graph you can read without being in an addHandler
 
        """
 
        if context is not None:
 
            raise NotImplementedError("currentState with context arg")
light9/rdfdb/grapheditapi.py
Show inline comments
 
@@ -2,12 +2,14 @@ import random
 
from itertools import chain
 
from rdflib import URIRef, RDF
 
from light9.rdfdb.patch import Patch
 

	
 
class GraphEditApi(object):
 
    """
 
    fancier graph edits
 
    
 
    mixin for SyncedGraph, separated here because these methods work together
 
    """
 

	
 
    def patchObject(self, context, subject, predicate, newObject):
 
        """send a patch which removes existing values for (s,p,*,c)
 
        and adds (s,p,newObject,c). Values in other graphs are not affected.
light9/rdfdb/patchreceiver.py
Show inline comments
 
@@ -7,25 +7,31 @@ log = logging.getLogger('syncedgraph')
 
class PatchReceiver(object):
 
    """
 
    runs a web server in this process and registers it with the rdfdb
 
    master. See onPatch for what happens when the rdfdb master sends
 
    us a patch
 
    """
 
    def __init__(self, label, graph, initiallySynced):
 
    def __init__(self, graph, label, initiallySynced):
 
        """
 
        label is what we'll call ourselves to the rdfdb server
 

	
 
        initiallySynced is a deferred that we'll call back when we get
 
        the first patch from the server
 
        """
 
        self.graph = graph
 
        self.initiallySynced = initiallySynced
 
        
 
        listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
 
            (r'/update', makePatchEndpoint(self.onPatch)),
 
            (r'/update', makePatchEndpoint(self._onPatch)),
 
        ]))
 
        port = listen._realPortNumber  # what's the right call for this?
 
        self.updateResource = 'http://localhost:%s/update' % port
 
        log.info("listening on %s" % port)
 
        self.register(label)
 
        self._register(label)
 

	
 
    def onPatch(self, p):
 
    def _onPatch(self, p):
 
        """
 
        central server has sent us a patch
 
        """
 
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
 
        log.info("graph now has %s statements" % len(self.graph))
 
        try:
 
@@ -36,13 +42,13 @@ class PatchReceiver(object):
 
            traceback.print_exc()
 

	
 
        if self.initiallySynced:
 
            self.initiallySynced.callback(None)
 
            self.initiallySynced = None
 

	
 
    def register(self, label):
 
    def _register(self, label):
 

	
 
        def done(x):
 
            log.debug("registered with rdfdb")
 

	
 
        cyclone.httpclient.fetch(
 
            url='http://localhost:8051/graphClients',
light9/rdfdb/patchsender.py
Show inline comments
 
import logging
 
import cyclone.httpclient
 
from twisted.internet import defer
 
log = logging.getLogger('syncedgraph')
 

	
 
def sendPatch(putUri, patch, **kw):
 
    """
 
    kwargs will become extra attributes in the toplevel json object
 
    """
 
    body = patch.makeJsonRepr(kw)
 
    log.debug("send body: %r", body)
 
    def ok(done):
 
        if not str(done.code).startswith('2'):
 
            raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body))
 
        log.debug("sendPatch finished, response: %s" % done.body)
 
        return done
 

	
 
    return cyclone.httpclient.fetch(
 
        url=putUri,
 
        method='PUT',
 
        headers={'Content-Type': ['application/json']},
 
        postdata=body,
 
        ).addCallback(ok)
 

	
 
class PatchSender(object):
 
    """
 
    SyncedGraph may generate patches faster than we can send
 
    them. This object buffers and may even collapse patches before
 
    they go the server
 
    """
 
    def __init__(self, target, myUpdateResource):
 
        """
 
        target is the URI we'll send patches to
 

	
 
        myUpdateResource is the URI for this sender of patches, which
 
        maybe needs to be the actual requestable update URI for
 
        sending updates back to us
 
        """
 
        self.target = target
 
        self.myUpdateResource = myUpdateResource
 
        self._patchesToSend = []
 
        self._currentSendPatchRequest = None
 

	
 
    def sendPatch(self, p):
 
@@ -90,6 +78,26 @@ class PatchSender(object):
 

	
 
        # this code is going away; we're going to raise an exception that contains all the pending patches
 
        log.error("_sendPatchErr")
 
        log.error(e)
 
        self._continueSending()
 

	
 
def sendPatch(putUri, patch, **kw):
 
    """
 
    PUT a patch as json to an http server. Returns deferred.
 
    
 
    kwargs will become extra attributes in the toplevel json object
 
    """
 
    body = patch.makeJsonRepr(kw)
 
    log.debug("send body: %r", body)
 
    def ok(done):
 
        if not str(done.code).startswith('2'):
 
            raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body))
 
        log.debug("sendPatch finished, response: %s" % done.body)
 
        return done
 

	
 
    return cyclone.httpclient.fetch(
 
        url=putUri,
 
        method='PUT',
 
        headers={'Content-Type': ['application/json']},
 
        postdata=body,
 
        ).addCallback(ok)
light9/rdfdb/syncedgraph.py
Show inline comments
 
@@ -56,13 +56,13 @@ class SyncedGraph(CurrentStateGraphApi, 
 
        label is a string that the server will display in association
 
        with your connection
 
        """
 
        self.initiallySynced = defer.Deferred()
 
        _graph = self._graph = ConjunctiveGraph()
 

	
 
        self._receiver = PatchReceiver(label, _graph, self.initiallySynced)
 
        self._receiver = PatchReceiver(_graph, label, self.initiallySynced)
 
        
 
        self._sender = PatchSender('http://localhost:8051/patches',
 
                                   self._receiver.updateResource)
 
        AutoDepGraphApi.__init__(self)
 
        
 
    def resync(self):
0 comments (0 inline, 0 general)