changeset 13:c9d1764d64ad

add web server. remove more traces of light9 Ignore-this: 253b280db4623e56ee6d124470953e00
author Drew Perttula <drewp@bigasterisk.com>
date Thu, 26 Apr 2018 07:49:50 +0000
parents ebfda644b2d2
children 6f54302a31d2
files rdfdb/patch.py rdfdb/patchreceiver.py rdfdb/readme rdfdb/service.py rdfdb/syncedgraph.py setup.py
diffstat 6 files changed, 536 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/patch.py	Thu Apr 26 07:18:13 2018 +0000
+++ b/rdfdb/patch.py	Thu Apr 26 07:49:50 2018 +0000
@@ -51,13 +51,6 @@
             if isinstance(n, Literal):
                 if n.datatype == XSD['double']:
                     return str(n.toPython())
-            if isinstance(n, URIRef):
-                for long, short in [
-                        ("http://light9.bigasterisk.com/", "l9"),
-                        
-                ]:
-                    if n.startswith(long):
-                        return short+":"+n[len(long):]
             return n.n3()
         def formatQuad(quad):
             return " ".join(shorten(n) for n in quad)
--- a/rdfdb/patchreceiver.py	Thu Apr 26 07:18:13 2018 +0000
+++ b/rdfdb/patchreceiver.py	Thu Apr 26 07:49:50 2018 +0000
@@ -1,6 +1,5 @@
 import logging, cyclone.httpclient, traceback, urllib
 from twisted.internet import reactor
-from light9 import networking
 from rdfdb.patch import Patch
 log = logging.getLogger('syncedgraph')
 
@@ -10,7 +9,7 @@
     master. See onPatch for what happens when the rdfdb master sends
     us a patch
     """
-    def __init__(self, rdfdbRoot, label, onPatch):
+    def __init__(self, rdfdbRoot, host, label, onPatch):
         """
         label is what we'll call ourselves to the rdfdb server
 
@@ -22,8 +21,7 @@
         ]))
         port = listen._realPortNumber  # what's the right call for this?
         
-        self.updateResource = 'http://%s:%s/update' % (
-            networking.patchReceiverUpdateHost.value, port)
+        self.updateResource = 'http://%s:%s/update' % (host, port)
         log.info("listening on %s" % port)
         self._register(label)
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/readme	Thu Apr 26 07:49:50 2018 +0000
@@ -0,0 +1,106 @@
+other tools POST themselves to here as subscribers to the graph. They
+are providing a URL we can PUT to with graph updates.
+
+we immediately PUT them back all the contents of the graph as a bunch
+of adds.
+
+later we PUT them back with patches (del/add lists) when there are
+changes.
+
+If we fail to reach a registered caller, we forget about it for future
+calls. We could PUT empty diffs as a heartbeat to notice disappearing
+callers faster.
+
+A caller can submit a patch which we'll persist and broadcast to every
+other client.
+
+Global data undo should probably happen within this service. Some
+operations should not support undo, such as updating the default
+position of a window. How will we separate those? A blacklist of
+subj+pred pairs that don't save undo? Or just save the updates like
+everything else, but when you press undo, there's a way to tell which
+updates *should* be part of your app's undo system?
+
+Maybe some subgraphs are for transient data (e.g. current timecode,
+mouse position in curvecalc) that only some listeners want to hear about.
+
+Deletes are graph-specific, so callers may be surprised to delete a
+stmt from one graph but then find that statement is still true.
+
+Alternate plan: would it help to insist that every patch is within
+only one subgraph? I think it's ok for them to span multiple ones.
+
+Inserts can be made on any subgraphs, and each subgraph is saved in
+its own file. The file might not be in a format that can express
+graphs, so I'm just going to not store the subgraph URI in any file.
+
+I don't support wildcard deletes, and there are race conditions where a
+s-p could end up with unexpected multiple objects. Every client needs
+to be ready for this.
+
+We watch the files and push their own changes back to the clients.
+
+Persist our client list, to survive restarts. In another rdf file? A
+random json one? memcache? Also hold the recent changes. We're not
+logging everything forever, though, since the output files and a VCS
+shall be used for that
+
+Bnodes: this rdfdb graph might be able to track bnodes correctly, and
+they make for more compact n3 files. I'm not sure if it's going to be
+hard to keep the client bnodes in sync though. File rereads would be
+hard, if ever a bnode was used across graphs, so that probably should
+not be allowed.
+
+Our API:
+
+GET /  ui
+GET /graph    the whole graph, or a query from it (needed? just for ui browsing?)
+PUT /patches  clients submit changes
+GET /patches  (recent) patches from clients
+POST /graphClients clientUpdate={uri} to subscribe
+GET /graphClients  current clients
+
+format:
+json {"adds" : [[quads]...],
+      "deletes": [[quads]],
+      "senderUpdateUri" : tooluri,
+      "created":tttt // maybe to help resolve some conflicts
+     }
+maybe use some http://json-ld.org/ in there.
+
+proposed rule feature:
+rdfdb should be able to watch a pair of (sourceFile, rulesFile) and
+rerun the rules when either one changes. Should the sourceFile be able
+to specify its own rules file?  That would be easier
+configuration. How do edits work? Not allowed?  Patch the source only?
+Also see the source graph loaded into a different ctx, and you can
+edit that one and see the results in the output context?
+
+Our web ui:
+
+  sections
+
+    registered clients
+
+    recent patches, each one says what client it came from. You can reverse
+    them here. We should be able to take patches that are close in time
+    and keep updating the same data (e.g. a stream of changes as the user
+    drags a slider) and collapse them into a single edit for clarity.
+
+        Ways to display patches, using labels and creator/subj icons
+        where possible:
+
+          <creator> set <subj>'s <p> to <o>
+          <creator> changed <subj>'s <pred> from <o1> to <o2>
+          <creator> added <o> to <s> <p>
+
+    raw messages for debugging this client
+
+    ctx urls take you to->
+    files, who's dirty, have we seen external changes, notice big
+    files that are taking a long time to save
+
+    graph contents. plain rdf browser like an outliner or
+    something. clicking any resource from the other displays takes you
+    to this, focused on that resource
+    
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/service.py	Thu Apr 26 07:49:50 2018 +0000
@@ -0,0 +1,419 @@
+from twisted.internet import reactor, defer
+import twisted.internet.error
+from twisted.python.filepath import FilePath
+from twisted.python.failure import Failure
+from twisted.internet.inotify import humanReadableMask, IN_CREATE
+import sys, optparse, logging, json, os
+import cyclone.web, cyclone.httpclient, cyclone.websocket
+
+from rdflib import ConjunctiveGraph, URIRef, Graph
+from rdfdb.graphfile import GraphFile
+from rdfdb.patch import Patch, ALLSTMTS
+from rdfdb.rdflibpatch import patchQuads
+from rdfdb.file_vs_uri import correctToTopdirPrefix, fileForUri, uriFromFile
+from rdfdb.patchsender import sendPatch
+from rdfdb.patchreceiver import makePatchEndpointPutMethod
+
+from twisted.internet.inotify import INotify
+
+log = logging.getLogger('rdfdb')
+log.setLevel(logging.DEBUG)
+
+class WebsocketDisconnect(ValueError):
+    pass
+
+def sendGraphToClient(graph, client):
+    """send the client the whole graph contents"""
+    log.info("sending all graphs to %r" % client)
+    client.sendPatch(Patch(
+        addQuads=graph.quads(ALLSTMTS),
+        delQuads=[]))
+    
+
+class Client(object):
+    """
+    one of our syncedgraph clients
+    """
+    def __init__(self, updateUri, label):
+        self.label = label
+        # todo: updateUri is used publicly to compare clients. Replace
+        # it with Client.__eq__ so WsClient doesn't have to fake an
+        # updateUri.
+        self.updateUri = updateUri
+
+    def __repr__(self):
+        return "<%s client at %s>" % (self.label, self.updateUri)
+
+    def sendPatch(self, p):
+        """
+        returns deferred. error will be interpreted as the client being
+        broken.
+        """
+        return sendPatch(self.updateUri, p)
+        
+class WsClient(object):
+    def __init__(self, connectionId, sendMessage):
+        self.updateUri = connectionId
+        self.sendMessage = sendMessage
+
+    def __repr__(self):
+        return "<WsClient %s>" % self.updateUri
+
+    def sendPatch(self, p):
+        self.sendMessage(p.makeJsonRepr())
+        return defer.succeed(None)
+        
+class WatchedFiles(object):
+    """
+    find files, notice new files.
+
+    This object watches directories. Each GraphFile watches its own file.
+    """
+    def __init__(self, dirUriMap, patch, getSubgraph, addlPrefixes):
+        self.dirUriMap = dirUriMap # {abspath : uri prefix}
+        self.patch, self.getSubgraph = patch, getSubgraph
+        self.addlPrefixes = addlPrefixes
+        
+        self.graphFiles = {} # context uri : GraphFile
+        
+        self.notifier = INotify()
+        self.notifier.startReading()
+        
+        self.findAndLoadFiles()
+
+    def findAndLoadFiles(self):
+        self.initialLoad = True
+        try:
+            for topdir in self.dirUriMap:
+                for dirpath, dirnames, filenames in os.walk(topdir):
+                    for base in filenames:
+                        self.watchFile(os.path.join(dirpath, base))
+                    self.notifier.watch(FilePath(dirpath), autoAdd=True,
+                                        callbacks=[self.dirChange])
+        finally:
+            self.initialLoad = False
+
+    def dirChange(self, watch, path, mask):
+        if mask & IN_CREATE:
+            if path.path.endswith(('~', '.swp', 'swx', '.rdfdb-temp')):
+                return
+                
+            log.debug("%s created; consider adding a watch", path)
+            self.watchFile(path.path)
+            
+    def watchFile(self, inFile):
+        """
+        consider adding a GraphFile to self.graphFiles
+
+        inFile needs to be a relative path, not an absolute (e.g. in a
+        FilePath) because we use its exact relative form in the
+        context URI
+        """
+        if not os.path.isfile(inFile):
+            return
+
+        inFile = correctToTopdirPrefix(self.dirUriMap, inFile)
+        if os.path.splitext(inFile)[1] not in ['.n3']:
+            return
+
+        if '/capture/' in inFile:
+            # smaller graph for now
+            return
+            
+        # an n3 file with rules makes it all the way past this reading
+        # and the serialization. Then, on the receiving side, a
+        # SyncedGraph calls graphFromNQuad on the incoming data and
+        # has a parse error. I'm not sure where this should be fixed
+        # yet.
+        if '-rules' in inFile:
+            return
+
+        # for legacy versions, compile all the config stuff you want
+        # read into one file called config.n3. New versions won't read
+        # it.
+        if inFile.endswith("config.n3"):
+            return
+            
+        ctx = uriFromFile(self.dirUriMap, inFile)
+        gf = self._addGraphFile(ctx, inFile)
+        log.info("%s do initial read", inFile)
+        gf.reread()
+
+    def aboutToPatch(self, ctx):
+        """
+        warn us that a patch is about to come to this context. it's more
+        straightforward to create the new file now
+
+        this is meant to make the file before we add triples, so we
+        wouldn't see the blank file and lose those triples. But it
+        didn't work, so there are other measures that make us not lose
+        the triples from a new file. Calling this before patching the
+        graph is still a reasonable thing to do, though.
+        """
+        g = self.getSubgraph(ctx)
+
+        if ctx not in self.graphFiles:
+            outFile = fileForUri(self.dirUriMap, ctx)
+            assert '//' not in outFile, (outFile, self.dirUriMap, ctx)
+            log.info("starting new file %r", outFile)
+            self._addGraphFile(ctx, outFile)
+
+    def _addGraphFile(self, ctx, path):
+        self.addlPrefixes.setdefault(ctx, {})
+        self.addlPrefixes.setdefault(None, {})
+        gf = GraphFile(self.notifier, path, ctx,
+                       self.patch, self.getSubgraph,
+                       globalPrefixes=self.addlPrefixes[None],
+                       ctxPrefixes=self.addlPrefixes[ctx])
+        self.graphFiles[ctx] = gf 
+        return gf
+
+            
+    def dirtyFiles(self, ctxs):
+        """mark dirty the files that we watch in these contexts.
+
+        the ctx might not be a file that we already read; it might be
+        for a new file we have to create, or it might be for a
+        transient context that we're not going to save
+
+        if it's a ctx with no file, error
+        """
+        for ctx in ctxs:
+            g = self.getSubgraph(ctx)
+            self.graphFiles[ctx].dirty(g)
+
+        
+class Db(object):
+    """
+    the master graph, all the connected clients, all the files we're watching
+    """
+    def __init__(self, dirUriMap, addlPrefixes):
+      
+        self.clients = []
+        self.graph = ConjunctiveGraph()
+
+        self.watchedFiles = WatchedFiles(dirUriMap,
+                                         self.patch, self.getSubgraph,
+                                         addlPrefixes)
+        
+        self.summarizeToLog()
+
+    def patch(self, p, dueToFileChange=False):
+        """
+        apply this patch to the master graph then notify everyone about it
+
+        dueToFileChange if this is a patch describing an edit we read
+        *from* the file (such that we shouldn't write it back to the file)
+
+        if p has a senderUpdateUri attribute, we won't send this patch
+        back to the sender with that updateUri
+        """
+        ctx = p.getContext()
+        log.info("patching graph %s -%d +%d" % (
+            ctx, len(p.delQuads), len(p.addQuads)))
+
+        if hasattr(self, 'watchedFiles'): # not available during startup
+            self.watchedFiles.aboutToPatch(ctx)
+        
+        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
+        self._sendPatch(p)
+        if not dueToFileChange:
+            self.watchedFiles.dirtyFiles([ctx])
+        sendToLiveClients(asJson=p.jsonRepr)
+
+    def _sendPatch(self, p):
+        senderUpdateUri = getattr(p, 'senderUpdateUri', None)
+
+        for c in self.clients:
+            if c.updateUri == senderUpdateUri:
+                # this client has self-applied the patch already
+                continue
+            d = c.sendPatch(p)
+            d.addErrback(self.clientErrored, c)
+        
+    def clientErrored(self, err, c):
+        err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect)
+        log.info("%r %r - dropping client", c, err.getErrorMessage())
+        if c in self.clients:
+            self.clients.remove(c)
+        self.sendClientsToAllLivePages()
+
+    def summarizeToLog(self):
+        log.info("contexts in graph (%s total stmts):" % len(self.graph))
+        for c in self.graph.contexts():
+            log.info("  %s: %s statements" %
+                     (c.identifier, len(self.getSubgraph(c.identifier))))
+
+    def getSubgraph(self, uri):
+        """
+        this is meant to return a live view of the given subgraph, but
+        if i'm still working around an rdflib bug, it might return a
+        copy
+
+        and it's returning triples, but I think quads would be better
+        """
+        # this is returning an empty Graph :(
+        #return self.graph.get_context(uri)
+
+        g = Graph()
+        for s in self.graph.triples(ALLSTMTS, uri):
+            g.add(s)
+        return g
+
+    def addClient(self, newClient):
+        [self.clients.remove(c)
+         for c in self.clients if c.updateUri == newClient.updateUri]
+
+        log.info("new client %r" % newClient)
+        sendGraphToClient(self.graph, newClient)
+        self.clients.append(newClient)
+        self.sendClientsToAllLivePages()
+
+    def sendClientsToAllLivePages(self):
+        sendToLiveClients({"clients":[
+            dict(updateUri=c.updateUri, label=repr(c))
+            for c in self.clients]})
+
+class GraphResource(cyclone.web.RequestHandler):
+    def get(self):
+        accept = self.request.headers.get('accept', '')
+        format = 'n3'
+        if accept == 'text/plain':
+            format = 'nt'
+        elif accept == 'application/n-quads':
+            format = 'nquads'
+        elif accept == 'pickle':
+            # don't use this; it's just for speed comparison
+            import cPickle as pickle
+            pickle.dump(self.settings.db.graph, self, protocol=2)
+            return
+        elif accept == 'msgpack':
+            self.write(repr(self.settings.db.graph.__getstate__))
+            return
+        self.write(self.settings.db.graph.serialize(format=format))
+
+class Patches(cyclone.web.RequestHandler):
+    def __init__(self, *args, **kw):
+        cyclone.web.RequestHandler.__init__(self, *args, **kw)
+        p = makePatchEndpointPutMethod(self.settings.db.patch)
+        self.put = lambda: p(self)
+
+    def get(self):
+        pass
+
+class GraphClients(cyclone.web.RequestHandler):
+    def get(self):
+        pass
+
+    def post(self):
+        upd = self.get_argument("clientUpdate")
+        try:
+            self.settings.db.addClient(Client(upd, self.get_argument("label")))
+        except:
+            import traceback
+            traceback.print_exc()
+            raise
+            
+class Prefixes(cyclone.web.RequestHandler):
+    def post(self):
+        suggestion = json.loads(self.request.body)
+        addlPrefixes = self.settings.db.watchedFiles.addlPrefixes
+        addlPrefixes.setdefault(URIRef(suggestion['ctx']), {}).update(suggestion['prefixes'])
+    
+_wsClientSerial = 0
+class WebsocketClient(cyclone.websocket.WebSocketHandler):
+
+    def connectionMade(self, *args, **kwargs):
+        global _wsClientSerial
+        connectionId = 'connection-%s' % _wsClientSerial
+        _wsClientSerial += 1
+
+        self.wsClient = WsClient(connectionId, self.sendMessage)
+        log.info("new ws client %r", self.wsClient)
+        self.settings.db.addClient(self.wsClient)
+
+    def connectionLost(self, reason):
+        log.info("bye ws client %r", self.wsClient)
+        self.settings.db.clientErrored(
+            Failure(WebsocketDisconnect(reason)), self.wsClient)
+
+    def messageReceived(self, message):
+        if message == 'PING':
+            self.sendMessage('PONG')
+            return
+        log.info("got message from %r: %s", self.wsClient, message)
+        p = Patch(jsonRepr=message)
+        p.senderUpdateUri = self.wsClient.updateUri
+        self.settings.db.patch(p)
+
+liveClients = set()
+def sendToLiveClients(d=None, asJson=None):
+    j = asJson or json.dumps(d)
+    for c in liveClients:
+        c.sendMessage(j)
+
+class Live(cyclone.websocket.WebSocketHandler):
+
+    def connectionMade(self, *args, **kwargs):
+        log.info("websocket opened")
+        liveClients.add(self)
+        self.settings.db.sendClientsToAllLivePages()
+
+    def connectionLost(self, reason):
+        log.info("websocket closed")
+        liveClients.remove(self)
+
+    def messageReceived(self, message):
+        log.info("got message %s" % message)
+        self.sendMessage(message)
+
+class NoExts(cyclone.web.StaticFileHandler):
+    # .html pages can be get() without .html on them
+    def get(self, path, *args, **kw):
+        if path and '.' not in path:
+            path = path + ".html"
+        cyclone.web.StaticFileHandler.get(self, path, *args, **kw)
+
+
+def main(dirUriMap=None, prefixes=None, port=9999):
+
+    if dirUriMap is None:
+        dirUriMap = {'data/': URIRef('http://example.com/data/')}
+    if prefixes is None:
+        prefixes = {
+            'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
+            'rdfs': 'http://www.w3.org/2000/01/rdf-schema#',
+            'xsd': 'http://www.w3.org/2001/XMLSchema#',
+        }
+    
+    logging.basicConfig()
+    log = logging.getLogger()
+
+    parser = optparse.OptionParser()
+    parser.add_option("-v", "--verbose", action="store_true",
+                      help="logging.DEBUG")
+    (options, args) = parser.parse_args()
+
+    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
+
+    db = Db(dirUriMap=dirUriMap,
+            addlPrefixes={None: prefixes})
+
+    from twisted.python import log as twlog
+    twlog.startLogging(sys.stdout)
+
+    reactor.listenTCP(port, cyclone.web.Application(handlers=[
+        (r'/live', Live),
+        (r'/graph', GraphResource),
+        (r'/patches', Patches),
+        (r'/graphClients', GraphClients),
+        (r'/syncedGraph', WebsocketClient),
+        (r'/prefixes', Prefixes),
+
+        (r'/(.*)', NoExts,
+         {"path" : FilePath(__file__).sibling("web").path,
+          "default_filename" : "index.html"}),
+
+        ], debug=True, db=db))
+    log.info("serving on %s" % port)
+    reactor.run()
--- a/rdfdb/syncedgraph.py	Thu Apr 26 07:18:13 2018 +0000
+++ b/rdfdb/syncedgraph.py	Thu Apr 26 07:49:50 2018 +0000
@@ -52,16 +52,18 @@
     pending local changes) and get the data again from the
     server.
     """
-    def __init__(self, rdfdbRoot, label):
+    def __init__(self, rdfdbRoot, receiverHost, label):
         """
         label is a string that the server will display in association
         with your connection
+
+        receiverHost is the hostname other nodes can use to talk to me
         """
         self.rdfdbRoot = rdfdbRoot
         self.initiallySynced = defer.Deferred()
         self._graph = ConjunctiveGraph()
 
-        self._receiver = PatchReceiver(self.rdfdbRoot, label, self._onPatch)
+        self._receiver = PatchReceiver(self.rdfdbRoot, receiverHost, label, self._onPatch)
         
         self._sender = PatchSender(self.rdfdbRoot + 'patches',
                                    self._receiver.updateResource)
--- a/setup.py	Thu Apr 26 07:18:13 2018 +0000
+++ b/setup.py	Thu Apr 26 07:49:50 2018 +0000
@@ -9,5 +9,10 @@
         'cyclone',
         'mock',
         'treq',
+        'rdflib-jsonld',
+        'service_identity',
         ],
+    entry_points={
+        'console_scripts': ['rdfdb=rdfdb.service:main'],
+    },
  )