Mercurial > code > home > repos > rdfdb
view rdfdb/service.py @ 45:dc61012eeace
python reformat
Ignore-this: 93c7e122cd60efa7ff4e8620d9bc10d3
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Mon, 27 May 2019 07:45:24 +0000 |
parents | 8d4822ae58bc |
children | 3b36b2c8ae65 |
line wrap: on
line source
from twisted.internet import reactor, defer import twisted.internet.error from twisted.python.filepath import FilePath from twisted.python.failure import Failure from twisted.internet.inotify import IN_CREATE import sys, optparse, logging, json, os import cyclone.web, cyclone.httpclient, cyclone.websocket from typing import Dict, List, Set, Optional, Union from rdflib import ConjunctiveGraph, URIRef, Graph from rdfdb.graphfile import GraphFile, PatchCb, GetSubgraph from rdfdb.patch import Patch, ALLSTMTS from rdfdb.rdflibpatch import patchQuads from rdfdb.file_vs_uri import correctToTopdirPrefix, fileForUri, uriFromFile, DirUriMap from rdfdb.patchsender import sendPatch from rdfdb.patchreceiver import makePatchEndpointPutMethod from twisted.internet.inotify import INotify log = logging.getLogger('rdfdb') log.setLevel(logging.DEBUG) class WebsocketDisconnect(ValueError): pass class Client(object): """ one of our syncedgraph clients """ def __init__(self, updateUri: URIRef, label: str): self.label = label # todo: updateUri is used publicly to compare clients. Replace # it with Client.__eq__ so WsClient doesn't have to fake an # updateUri. self.updateUri = updateUri def __repr__(self): return "<%s client at %s>" % (self.label, self.updateUri) def sendPatch(self, p: Patch) -> defer.Deferred: """ returns deferred. error will be interpreted as the client being broken. """ return sendPatch(self.updateUri, p) class WsClient(object): def __init__(self, connectionId: str, sendMessage): self.updateUri = URIRef(connectionId) self.sendMessage = sendMessage def __repr__(self): return "<WsClient %s>" % self.updateUri def sendPatch(self, p: Patch) -> defer.Deferred: self.sendMessage(p.makeJsonRepr()) return defer.succeed(None) def sendGraphToClient(graph, client: Union[Client, WsClient]) -> None: """send the client the whole graph contents""" log.info("sending all graphs to %r" % client) client.sendPatch(Patch(addQuads=graph.quads(ALLSTMTS), delQuads=[])) class WatchedFiles(object): """ find files, notice new files. This object watches directories. Each GraphFile watches its own file. """ def __init__(self, dirUriMap: DirUriMap, patch: PatchCb, getSubgraph: GetSubgraph, addlPrefixes: Dict[str, URIRef]): self.dirUriMap = dirUriMap # {abspath : uri prefix} self.patch, self.getSubgraph = patch, getSubgraph self.addlPrefixes = addlPrefixes self.graphFiles: Dict[URIRef, GraphFile] = {} # context uri : GraphFile self.notifier = INotify() self.notifier.startReading() self.findAndLoadFiles() def findAndLoadFiles(self) -> None: self.initialLoad = True try: for topdir in self.dirUriMap: for dirpath, dirnames, filenames in os.walk(topdir): for base in filenames: p = os.path.join(dirpath, base) # why wasn't mypy catching this? assert isinstance(p, bytes) self.watchFile(p) self.notifier.watch(FilePath(dirpath), autoAdd=True, callbacks=[self.dirChange]) finally: self.initialLoad = False def dirChange(self, watch, path: FilePath, mask): if mask & IN_CREATE: if path.path.endswith((b'~', b'.swp', b'swx', b'.rdfdb-temp')): return log.debug("%s created; consider adding a watch", path) self.watchFile(path.path) def watchFile(self, inFile: bytes): """ consider adding a GraphFile to self.graphFiles inFile needs to be a relative path, not an absolute (e.g. in a FilePath) because we use its exact relative form in the context URI """ if not os.path.isfile(inFile): return inFile = correctToTopdirPrefix(self.dirUriMap, inFile) if os.path.splitext(inFile)[1] not in [b'.n3']: return if b'/capture/' in inFile: # smaller graph for now return # an n3 file with rules makes it all the way past this reading # and the serialization. Then, on the receiving side, a # SyncedGraph calls graphFromNQuad on the incoming data and # has a parse error. I'm not sure where this should be fixed # yet. if b'-rules' in inFile: return # for legacy versions, compile all the config stuff you want # read into one file called config.n3. New versions won't read # it. if inFile.endswith(b"config.n3"): return ctx = uriFromFile(self.dirUriMap, inFile) gf = self._addGraphFile(ctx, inFile) log.info("%s do initial read", inFile) gf.reread() def aboutToPatch(self, ctx: URIRef): """ warn us that a patch is about to come to this context. it's more straightforward to create the new file now this is meant to make the file before we add triples, so we wouldn't see the blank file and lose those triples. But it didn't work, so there are other measures that make us not lose the triples from a new file. Calling this before patching the graph is still a reasonable thing to do, though. """ if ctx not in self.graphFiles: outFile = fileForUri(self.dirUriMap, ctx) # mypy missed the next line because of # https://github.com/python/typeshed/issues/2937 ('str in # bytes' isn't an error) assert b'//' not in outFile, (outFile, self.dirUriMap, ctx) log.info("starting new file %r", outFile) self._addGraphFile(ctx, outFile) def _addGraphFile(self, ctx, path): self.addlPrefixes.setdefault(ctx, {}) self.addlPrefixes.setdefault(None, {}) gf = GraphFile(self.notifier, path, ctx, self.patch, self.getSubgraph, globalPrefixes=self.addlPrefixes[None], ctxPrefixes=self.addlPrefixes[ctx]) self.graphFiles[ctx] = gf return gf def dirtyFiles(self, ctxs): """mark dirty the files that we watch in these contexts. the ctx might not be a file that we already read; it might be for a new file we have to create, or it might be for a transient context that we're not going to save if it's a ctx with no file, error """ for ctx in ctxs: g = self.getSubgraph(ctx) self.graphFiles[ctx].dirty(g) class Db(object): """ the master graph, all the connected clients, all the files we're watching """ def __init__(self, dirUriMap: DirUriMap, addlPrefixes): self.clients: List[Union[Client, WsClient]] = [] self.graph = ConjunctiveGraph() self.watchedFiles = WatchedFiles(dirUriMap, self.patch, self.getSubgraph, addlPrefixes) self.summarizeToLog() def patch(self, patch: Patch, dueToFileChange: bool = False) -> None: """ apply this patch to the master graph then notify everyone about it dueToFileChange if this is a patch describing an edit we read *from* the file (such that we shouldn't write it back to the file) if p has a senderUpdateUri attribute, we won't send this patch back to the sender with that updateUri """ ctx = patch.getContext() log.info("patching graph %s -%d +%d" % (ctx, len(patch.delQuads), len(patch.addQuads))) if hasattr(self, 'watchedFiles'): # not available during startup self.watchedFiles.aboutToPatch(ctx) patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True) self._sendPatch(patch) if not dueToFileChange: self.watchedFiles.dirtyFiles([ctx]) sendToLiveClients(asJson=patch.jsonRepr) def _sendPatch(self, p): senderUpdateUri = getattr(p, 'senderUpdateUri', None) for c in self.clients: if c.updateUri == senderUpdateUri: # this client has self-applied the patch already continue d = c.sendPatch(p) d.addErrback(self.clientErrored, c) def clientErrored(self, err, c): err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect) log.info("%r %r - dropping client", c, err.getErrorMessage()) if c in self.clients: self.clients.remove(c) self.sendClientsToAllLivePages() def summarizeToLog(self): log.info("contexts in graph (%s total stmts):" % len(self.graph)) for c in self.graph.contexts(): log.info(" %s: %s statements" % (c.identifier, len(self.getSubgraph(c.identifier)))) def getSubgraph(self, uri: URIRef) -> Graph: """ this is meant to return a live view of the given subgraph, but if i'm still working around an rdflib bug, it might return a copy and it's returning triples, but I think quads would be better """ # this is returning an empty Graph :( #return self.graph.get_context(uri) g = Graph() for s in self.graph.triples(ALLSTMTS, uri): g.add(s) return g def addClient(self, newClient: Union[Client, WsClient]) -> None: for c in self.clients: if c.updateUri == newClient.updateUri: self.clients.remove(c) log.info("new client %r" % newClient) sendGraphToClient(self.graph, newClient) self.clients.append(newClient) self.sendClientsToAllLivePages() def sendClientsToAllLivePages(self) -> None: sendToLiveClients({ "clients": [ dict(updateUri=c.updateUri.toPython(), label=repr(c)) for c in self.clients ] }) class GraphResource(cyclone.web.RequestHandler): def get(self): accept = self.request.headers.get('accept', '') format = 'n3' if accept == 'text/plain': format = 'nt' elif accept == 'application/n-quads': format = 'nquads' elif accept == 'pickle': # don't use this; it's just for speed comparison import pickle as pickle pickle.dump(self.settings.db.graph, self, protocol=2) return elif accept == 'msgpack': self.write(repr(self.settings.db.graph.__getstate__)) return self.write(self.settings.db.graph.serialize(format=format)) class Patches(cyclone.web.RequestHandler): def __init__(self, *args, **kw): cyclone.web.RequestHandler.__init__(self, *args, **kw) p = makePatchEndpointPutMethod(self.settings.db.patch) self.put = lambda: p(self) def get(self): pass class GraphClients(cyclone.web.RequestHandler): def get(self): pass def post(self) -> None: upd = URIRef(self.get_argument("clientUpdate")) try: self.settings.db.addClient(Client(upd, self.get_argument("label"))) except: import traceback traceback.print_exc() raise class Prefixes(cyclone.web.RequestHandler): def post(self): suggestion = json.loads(self.request.body) addlPrefixes = self.settings.db.watchedFiles.addlPrefixes addlPrefixes.setdefault(URIRef(suggestion['ctx']), {}).update(suggestion['prefixes']) _wsClientSerial = 0 class WebsocketClient(cyclone.websocket.WebSocketHandler): wsClient: Optional[WsClient] = None def connectionMade(self, *args, **kwargs) -> None: global _wsClientSerial connectionId = f'connection-{_wsClientSerial}' _wsClientSerial += 1 self.wsClient = WsClient(connectionId, self.sendMessage) log.info("new ws client %r", self.wsClient) self.settings.db.addClient(self.wsClient) def connectionLost(self, reason): log.info("bye ws client %r", self.wsClient) self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)), self.wsClient) def messageReceived(self, message: bytes): if message == b'PING': self.sendMessage('PONG') return log.info("got message from %r: %s", self.wsClient, message) p = Patch(jsonRepr=message.decode('utf8')) assert self.wsClient is not None p.senderUpdateUri = self.wsClient.updateUri self.settings.db.patch(p) class Live(cyclone.websocket.WebSocketHandler): def connectionMade(self, *args, **kwargs): log.info("websocket opened") liveClients.add(self) self.settings.db.sendClientsToAllLivePages() def connectionLost(self, reason): log.info("websocket closed") liveClients.remove(self) def messageReceived(self, message): log.info("got message %s" % message) self.sendMessage(message) liveClients: Set[Live] = set() def sendToLiveClients(d=None, asJson=None): j = asJson or json.dumps(d) for c in liveClients: c.sendMessage(j) class NoExts(cyclone.web.StaticFileHandler): # .html pages can be get() without .html on them def get(self, path, *args, **kw): if path and '.' not in path: path = path + ".html" cyclone.web.StaticFileHandler.get(self, path, *args, **kw) def main(dirUriMap: Optional[DirUriMap] = None, prefixes: Optional[Dict[str, URIRef]] = None, port=9999): if dirUriMap is None: dirUriMap = {b'data/': URIRef('http://example.com/data/')} if prefixes is None: prefixes = { 'rdf': URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#'), 'rdfs': URIRef('http://www.w3.org/2000/01/rdf-schema#'), 'xsd': URIRef('http://www.w3.org/2001/XMLSchema#'), } logging.basicConfig() log = logging.getLogger() parser = optparse.OptionParser() parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG") (options, args) = parser.parse_args() log.setLevel(logging.DEBUG if options.verbose else logging.INFO) db = Db(dirUriMap=dirUriMap, addlPrefixes={None: prefixes}) from twisted.python import log as twlog twlog.startLogging(sys.stdout) reactor.listenTCP( port, cyclone.web.Application(handlers=[ (r'/live', Live), (r'/graph', GraphResource), (r'/patches', Patches), (r'/graphClients', GraphClients), (r'/syncedGraph', WebsocketClient), (r'/prefixes', Prefixes), (r'/(.*)', NoExts, { "path": FilePath(__file__).sibling("web").path, "default_filename": "index.html" }), ], debug=True, db=db)) log.info("serving on %s" % port) reactor.run()