# HG changeset patch # User Drew Perttula # Date 2018-04-26 08:22:19 # Node ID bbe05d5c6a8f33a77b64fc79d7219f01ec2bef95 # Parent f140153c087c4646dc6d332fa4cc1498a9f85150 move rdfdb.service to rdfdb repo Ignore-this: 7245785c2345e5f745fbe1c45ba45b17 diff --git a/bin/rdfdb b/bin/rdfdb --- a/bin/rdfdb +++ b/bin/rdfdb @@ -1,530 +1,21 @@ #!bin/python -""" -other tools POST themselves to here as subscribers to the graph. They -are providing a URL we can PUT to with graph updates. - -we immediately PUT them back all the contents of the graph as a bunch -of adds. - -later we PUT them back with patches (del/add lists) when there are -changes. - -If we fail to reach a registered caller, we forget about it for future -calls. We could PUT empty diffs as a heartbeat to notice disappearing -callers faster. - -A caller can submit a patch which we'll persist and broadcast to every -other client. - -Global data undo should probably happen within this service. Some -operations should not support undo, such as updating the default -position of a window. How will we separate those? A blacklist of -subj+pred pairs that don't save undo? Or just save the updates like -everything else, but when you press undo, there's a way to tell which -updates *should* be part of your app's undo system? - -Maybe some subgraphs are for transient data (e.g. current timecode, -mouse position in curvecalc) that only some listeners want to hear about. - -Deletes are graph-specific, so callers may be surprised to delete a -stmt from one graph but then find that statement is still true. - -Alternate plan: would it help to insist that every patch is within -only one subgraph? I think it's ok for them to span multiple ones. - -Inserts can be made on any subgraphs, and each subgraph is saved in -its own file. The file might not be in a format that can express -graphs, so I'm just going to not store the subgraph URI in any file. - -I don't support wildcard deletes, and there are race conditions where a -s-p could end up with unexpected multiple objects. Every client needs -to be ready for this. - -We watch the files and push their own changes back to the clients. - -Persist our client list, to survive restarts. In another rdf file? A -random json one? memcache? Also hold the recent changes. We're not -logging everything forever, though, since the output files and a VCS -shall be used for that - -Bnodes: this rdfdb graph might be able to track bnodes correctly, and -they make for more compact n3 files. I'm not sure if it's going to be -hard to keep the client bnodes in sync though. File rereads would be -hard, if ever a bnode was used across graphs, so that probably should -not be allowed. - -Our API: - -GET / ui -GET /graph the whole graph, or a query from it (needed? just for ui browsing?) -PUT /patches clients submit changes -GET /patches (recent) patches from clients -POST /graphClients clientUpdate={uri} to subscribe -GET /graphClients current clients - -format: -json {"adds" : [[quads]...], - "deletes": [[quads]], - "senderUpdateUri" : tooluri, - "created":tttt // maybe to help resolve some conflicts - } -maybe use some http://json-ld.org/ in there. - -proposed rule feature: -rdfdb should be able to watch a pair of (sourceFile, rulesFile) and -rerun the rules when either one changes. Should the sourceFile be able -to specify its own rules file? That would be easier -configuration. How do edits work? Not allowed? Patch the source only? -Also see the source graph loaded into a different ctx, and you can -edit that one and see the results in the output context? - -Our web ui: - - sections - - registered clients - - recent patches, each one says what client it came from. You can reverse - them here. We should be able to take patches that are close in time - and keep updating the same data (e.g. a stream of changes as the user - drags a slider) and collapse them into a single edit for clarity. - - Ways to display patches, using labels and creator/subj icons - where possible: - - set 's

to - changed 's from to - added to

- - raw messages for debugging this client - - ctx urls take you to-> - files, who's dirty, have we seen external changes, notice big - files that are taking a long time to save - - graph contents. plain rdf browser like an outliner or - something. clicking any resource from the other displays takes you - to this, focused on that resource - -""" -from twisted.internet import reactor, defer -import twisted.internet.error -from twisted.python.filepath import FilePath -from twisted.python.failure import Failure -from twisted.internet.inotify import humanReadableMask, IN_CREATE -import sys, optparse, logging, json, os -import cyclone.web, cyclone.httpclient, cyclone.websocket -sys.path.append(".") -from light9 import networking, showconfig, prof -from rdflib import ConjunctiveGraph, URIRef, Graph -from rdfdb.graphfile import GraphFile -from rdfdb.patch import Patch, ALLSTMTS -from rdfdb.rdflibpatch import patchQuads -from rdfdb.file_vs_uri import correctToTopdirPrefix, fileForUri, uriFromFile -from rdfdb.patchsender import sendPatch -from rdfdb.patchreceiver import makePatchEndpointPutMethod - -from twisted.internet.inotify import INotify -from run_local import log -log.setLevel(logging.DEBUG) - -from lib.cycloneerr import PrettyErrorHandler - -class WebsocketDisconnect(ValueError): - pass - -def sendGraphToClient(graph, client): - """send the client the whole graph contents""" - log.info("sending all graphs to %r" % client) - client.sendPatch(Patch( - addQuads=graph.quads(ALLSTMTS), - delQuads=[])) - - -class Client(object): - """ - one of our syncedgraph clients - """ - def __init__(self, updateUri, label): - self.label = label - # todo: updateUri is used publicly to compare clients. Replace - # it with Client.__eq__ so WsClient doesn't have to fake an - # updateUri. - self.updateUri = updateUri - - def __repr__(self): - return "<%s client at %s>" % (self.label, self.updateUri) - - def sendPatch(self, p): - """ - returns deferred. error will be interpreted as the client being - broken. - """ - return sendPatch(self.updateUri, p) - -class WsClient(object): - def __init__(self, connectionId, sendMessage): - self.updateUri = connectionId - self.sendMessage = sendMessage - - def __repr__(self): - return "" % self.updateUri - - def sendPatch(self, p): - self.sendMessage(p.makeJsonRepr()) - return defer.succeed(None) - -class WatchedFiles(object): - """ - find files, notice new files. - - This object watches directories. Each GraphFile watches its own file. - """ - def __init__(self, dirUriMap, patch, getSubgraph, addlPrefixes): - self.dirUriMap = dirUriMap # {abspath : uri prefix} - self.patch, self.getSubgraph = patch, getSubgraph - self.addlPrefixes = addlPrefixes - - self.graphFiles = {} # context uri : GraphFile - - self.notifier = INotify() - self.notifier.startReading() - - self.findAndLoadFiles() - - def findAndLoadFiles(self): - self.initialLoad = True - try: - for topdir in self.dirUriMap: - for dirpath, dirnames, filenames in os.walk(topdir): - for base in filenames: - self.watchFile(os.path.join(dirpath, base)) - self.notifier.watch(FilePath(dirpath), autoAdd=True, - callbacks=[self.dirChange]) - finally: - self.initialLoad = False - - def dirChange(self, watch, path, mask): - if mask & IN_CREATE: - if path.path.endswith(('~', '.swp', 'swx', '.rdfdb-temp')): - return - - log.debug("%s created; consider adding a watch", path) - self.watchFile(path.path) - - def watchFile(self, inFile): - """ - consider adding a GraphFile to self.graphFiles - - inFile needs to be a relative path, not an absolute (e.g. in a - FilePath) because we use its exact relative form in the - context URI - """ - if not os.path.isfile(inFile): - return - - inFile = correctToTopdirPrefix(self.dirUriMap, inFile) - if os.path.splitext(inFile)[1] not in ['.n3']: - return - - if '/capture/' in inFile: - # smaller graph for now - return - - # an n3 file with rules makes it all the way past this reading - # and the serialization. Then, on the receiving side, a - # SyncedGraph calls graphFromNQuad on the incoming data and - # has a parse error. I'm not sure where this should be fixed - # yet. - if '-rules' in inFile: - return - - # for legacy versions, compile all the config stuff you want - # read into one file called config.n3. New versions won't read - # it. - if inFile.endswith("config.n3"): - return - - ctx = uriFromFile(self.dirUriMap, inFile) - gf = self._addGraphFile(ctx, inFile) - log.info("%s do initial read", inFile) - gf.reread() - - def aboutToPatch(self, ctx): - """ - warn us that a patch is about to come to this context. it's more - straightforward to create the new file now - - this is meant to make the file before we add triples, so we - wouldn't see the blank file and lose those triples. But it - didn't work, so there are other measures that make us not lose - the triples from a new file. Calling this before patching the - graph is still a reasonable thing to do, though. - """ - g = self.getSubgraph(ctx) +import run_local +import os +from light9 import networking, showconfig +import rdfdb.service - if ctx not in self.graphFiles: - outFile = fileForUri(self.dirUriMap, ctx) - assert '//' not in outFile, (outFile, self.dirUriMap, ctx) - log.info("starting new file %r", outFile) - self._addGraphFile(ctx, outFile) - - def _addGraphFile(self, ctx, path): - self.addlPrefixes.setdefault(ctx, {}) - self.addlPrefixes.setdefault(None, {}) - gf = GraphFile(self.notifier, path, ctx, - self.patch, self.getSubgraph, - globalPrefixes=self.addlPrefixes[None], - ctxPrefixes=self.addlPrefixes[ctx]) - self.graphFiles[ctx] = gf - return gf - - - def dirtyFiles(self, ctxs): - """mark dirty the files that we watch in these contexts. - - the ctx might not be a file that we already read; it might be - for a new file we have to create, or it might be for a - transient context that we're not going to save - - if it's a ctx with no file, error - """ - for ctx in ctxs: - g = self.getSubgraph(ctx) - self.graphFiles[ctx].dirty(g) - - -class Db(object): - """ - the master graph, all the connected clients, all the files we're watching - """ - def __init__(self, dirUriMap, addlPrefixes): - - self.clients = [] - self.graph = ConjunctiveGraph() - - self.watchedFiles = WatchedFiles(dirUriMap, - self.patch, self.getSubgraph, - addlPrefixes) - - self.summarizeToLog() - - def patch(self, p, dueToFileChange=False): - """ - apply this patch to the master graph then notify everyone about it - - dueToFileChange if this is a patch describing an edit we read - *from* the file (such that we shouldn't write it back to the file) - - if p has a senderUpdateUri attribute, we won't send this patch - back to the sender with that updateUri - """ - ctx = p.getContext() - log.info("patching graph %s -%d +%d" % ( - ctx, len(p.delQuads), len(p.addQuads))) - - if hasattr(self, 'watchedFiles'): # not available during startup - self.watchedFiles.aboutToPatch(ctx) - - patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True) - self._sendPatch(p) - if not dueToFileChange: - self.watchedFiles.dirtyFiles([ctx]) - sendToLiveClients(asJson=p.jsonRepr) - - def _sendPatch(self, p): - senderUpdateUri = getattr(p, 'senderUpdateUri', None) - - for c in self.clients: - if c.updateUri == senderUpdateUri: - # this client has self-applied the patch already - continue - d = c.sendPatch(p) - d.addErrback(self.clientErrored, c) - - def clientErrored(self, err, c): - err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect) - log.info("%r %r - dropping client", c, err.getErrorMessage()) - if c in self.clients: - self.clients.remove(c) - self.sendClientsToAllLivePages() - - def summarizeToLog(self): - log.info("contexts in graph (%s total stmts):" % len(self.graph)) - for c in self.graph.contexts(): - log.info(" %s: %s statements" % - (c.identifier, len(self.getSubgraph(c.identifier)))) - - def getSubgraph(self, uri): - """ - this is meant to return a live view of the given subgraph, but - if i'm still working around an rdflib bug, it might return a - copy - - and it's returning triples, but I think quads would be better - """ - # this is returning an empty Graph :( - #return self.graph.get_context(uri) - - g = Graph() - for s in self.graph.triples(ALLSTMTS, uri): - g.add(s) - return g - - def addClient(self, newClient): - [self.clients.remove(c) - for c in self.clients if c.updateUri == newClient.updateUri] - - log.info("new client %r" % newClient) - sendGraphToClient(self.graph, newClient) - self.clients.append(newClient) - self.sendClientsToAllLivePages() - - def sendClientsToAllLivePages(self): - sendToLiveClients({"clients":[ - dict(updateUri=c.updateUri, label=repr(c)) - for c in self.clients]}) +rdfdb.service.main( + dirUriMap={os.environ['LIGHT9_SHOW'].rstrip('/') + '/': + showconfig.showUri() + '/'}, + prefixes={ + 'show': showconfig.showUri() + '/', + '': 'http://light9.bigasterisk.com/', + 'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#', + 'rdfs': 'http://www.w3.org/2000/01/rdf-schema#', + 'xsd': 'http://www.w3.org/2001/XMLSchema#', + 'effect': 'http://light9.bigasterisk.com/effect/', + 'dev': 'http://light9.bigasterisk.com/device/', + }, + port=networking.rdfdb.port, + ) -class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler): - def get(self): - accept = self.request.headers.get('accept', '') - format = 'n3' - if accept == 'text/plain': - format = 'nt' - elif accept == 'application/n-quads': - format = 'nquads' - elif accept == 'pickle': - # don't use this; it's just for speed comparison - import cPickle as pickle - pickle.dump(self.settings.db.graph, self, protocol=2) - return - elif accept == 'msgpack': - self.write(repr(self.settings.db.graph.__getstate__)) - return - self.write(self.settings.db.graph.serialize(format=format)) - -class Patches(PrettyErrorHandler, cyclone.web.RequestHandler): - def __init__(self, *args, **kw): - cyclone.web.RequestHandler.__init__(self, *args, **kw) - p = makePatchEndpointPutMethod(self.settings.db.patch) - self.put = lambda: p(self) - - def get(self): - pass - -class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler): - def get(self): - pass - - def post(self): - upd = self.get_argument("clientUpdate") - try: - self.settings.db.addClient(Client(upd, self.get_argument("label"))) - except: - import traceback - traceback.print_exc() - raise - -class Prefixes(PrettyErrorHandler, cyclone.web.RequestHandler): - def post(self): - suggestion = json.loads(self.request.body) - addlPrefixes = self.settings.db.watchedFiles.addlPrefixes - addlPrefixes.setdefault(URIRef(suggestion['ctx']), {}).update(suggestion['prefixes']) - -_wsClientSerial = 0 -class WebsocketClient(cyclone.websocket.WebSocketHandler): - - def connectionMade(self, *args, **kwargs): - global _wsClientSerial - connectionId = 'connection-%s' % _wsClientSerial - _wsClientSerial += 1 - - self.wsClient = WsClient(connectionId, self.sendMessage) - log.info("new ws client %r", self.wsClient) - self.settings.db.addClient(self.wsClient) - - def connectionLost(self, reason): - log.info("bye ws client %r", self.wsClient) - self.settings.db.clientErrored( - Failure(WebsocketDisconnect(reason)), self.wsClient) - - def messageReceived(self, message): - if message == 'PING': - self.sendMessage('PONG') - return - log.info("got message from %r: %s", self.wsClient, message) - p = Patch(jsonRepr=message) - p.senderUpdateUri = self.wsClient.updateUri - self.settings.db.patch(p) - -liveClients = set() -def sendToLiveClients(d=None, asJson=None): - j = asJson or json.dumps(d) - for c in liveClients: - c.sendMessage(j) - -class Live(cyclone.websocket.WebSocketHandler): - - def connectionMade(self, *args, **kwargs): - log.info("websocket opened") - liveClients.add(self) - self.settings.db.sendClientsToAllLivePages() - - def connectionLost(self, reason): - log.info("websocket closed") - liveClients.remove(self) - - def messageReceived(self, message): - log.info("got message %s" % message) - self.sendMessage(message) - -class NoExts(cyclone.web.StaticFileHandler): - # .html pages can be get() without .html on them - def get(self, path, *args, **kw): - if path and '.' not in path: - path = path + ".html" - cyclone.web.StaticFileHandler.get(self, path, *args, **kw) - - - -if __name__ == "__main__": - logging.basicConfig() - log = logging.getLogger() - - parser = optparse.OptionParser() - parser.add_option("-v", "--verbose", action="store_true", - help="logging.DEBUG") - (options, args) = parser.parse_args() - - log.setLevel(logging.DEBUG if options.verbose else logging.INFO) - - db = Db(dirUriMap={os.environ['LIGHT9_SHOW'].rstrip('/') + '/': - showconfig.showUri() + '/'}, - addlPrefixes={None: { - 'show': showconfig.showUri() + '/', - '': 'http://light9.bigasterisk.com/', - 'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#', - 'rdfs': 'http://www.w3.org/2000/01/rdf-schema#', - 'xsd': 'http://www.w3.org/2001/XMLSchema#', - 'effect': 'http://light9.bigasterisk.com/effect/', - 'dev': 'http://light9.bigasterisk.com/device/', - }}) - - from twisted.python import log as twlog - twlog.startLogging(sys.stdout) - - reactor.listenTCP(networking.rdfdb.port, cyclone.web.Application(handlers=[ - (r'/live', Live), - (r'/graph', GraphResource), - (r'/patches', Patches), - (r'/graphClients', GraphClients), - (r'/syncedGraph', WebsocketClient), - (r'/prefixes', Prefixes), - - (r'/(.*)', NoExts, - {"path" : "light9/rdfdb/web", - "default_filename" : "index.html"}), - - ], debug=True, db=db)) - log.info("serving on %s" % networking.rdfdb.port) - prof.run(reactor.run, profile=None) diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -33,4 +33,4 @@ typing==3.6.1 watchdog==0.8.3 web.py==0.38 webcolors==1.7 -https://projects.bigasterisk.com/rdfdb/rdfdb-0.3.0.tar.gz +https://projects.bigasterisk.com/rdfdb/rdfdb-0.4.0.tar.gz