Mercurial > code > home > repos > rdfdb
view rdfdb/service.py @ 104:d1fd6aeffb27
(rough) move Db to shared_graph.py (includes some asyncio updates probably)
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 May 2022 20:38:06 -0700 |
parents | 0f921dd06887 |
children | 4681ea3fcdf6 |
line wrap: on
line source
import itertools import json import logging import optparse import os import sys from pathlib import Path from typing import Dict, List, Optional, cast import cyclone.web import cyclone.websocket import twisted.internet.error import twisted.internet.reactor from rdflib import ConjunctiveGraph, Graph, URIRef from twisted.internet.inotify import IN_CREATE, INotify from twisted.internet.interfaces import IReactorCore from twisted.python.failure import Failure from twisted.python.filepath import FilePath from rdfdb.file_vs_uri import (DirUriMap, correctToTopdirPrefix, fileForUri, uriFromFile) from rdfdb.graphfile import GetSubgraph, GraphFile, PatchCb from rdfdb.patch import ALLSTMTS, Patch from rdfdb.rdflibpatch import patchQuads from prometheus_client import Gauge from prometheus_client.exposition import generate_latest from prometheus_client.registry import REGISTRY reactor = cast(IReactorCore, twisted.internet.reactor) STAT_CLIENTS = Gauge('clients', 'connected clients') # gatherProcessStats() # stats = scales.collection( # '/webServer', # scales.IntStat('liveClients'), # scales.PmfStat('setAttr'), # ) # graphStats = scales.collection( # '/graph', # scales.IntStat('statements'), # scales.RecentFpsStat('patchFps'), # ) # fileStats = scales.collection( # '/file', # scales.IntStat('mappedGraphFiles'), # ) log = logging.getLogger('rdfdb') class WebsocketDisconnect(ValueError): pass CtxPrefixes = Dict[Optional[URIRef], Dict[str, URIRef]] _wsClientSerial = itertools.count(0) class WebsocketClient(cyclone.websocket.WebSocketHandler): """ Send patches to the client (starting with a client who has 0 statements) to keep it in sync with the graph. Accept patches from the client, and assume that the client has already applied them to its local graph. Treat a disconnect as 'out of sync'. Either the client thinks it is out of sync and wants to start over, or we can't apply a patch correctly therefore we disconnect to make the client start over. This socket may also carry some special messages meant for the rdfdb web UI, e.g. about who is connected, etc. """ connectionId: str def connectionMade(self, *args, **kwargs) -> None: self.connectionId = f'WS{next(_wsClientSerial)}' self.sendMessage(json.dumps({'connectedAs': self.connectionId})) log.info("new ws client %r", self) self.settings.db.addClient(self) def connectionLost(self, reason): log.info("bye ws client %r: %s", self, reason) self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)), self) def messageReceived(self, message: bytes): if message == b'PING': self.sendMessage('PONG') return log.debug("got message from %r: %s", self, message[:32]) p = Patch(jsonRepr=message.decode('utf8')) self.settings.db.patch(p, sender=self.connectionId) def sendPatch(self, p: Patch): self.sendMessage(p.makeJsonRepr()) def __repr__(self): return f"<SyncedGraph client {self.connectionId}>" class GraphResource(cyclone.web.RequestHandler): def get(self): accept = self.request.headers.get('accept', '') format = 'n3' if accept == 'text/plain': format = 'nt' elif accept == 'application/n-quads': format = 'nquads' elif accept == 'pickle': # don't use this; it's just for speed comparison import pickle as pickle pickle.dump(self.settings.db.graph, self, protocol=2) return elif accept == 'msgpack': self.write(repr(self.settings.db.graph.__getstate__)) return self.write(self.settings.db.graph.serialize(format=format)) class Prefixes(cyclone.web.RequestHandler): def post(self): suggestion = json.loads(self.request.body) addlPrefixes = self.settings.db.watchedFiles.addlPrefixes addlPrefixes.setdefault(URIRef(suggestion['ctx']), {}).update(suggestion['prefixes']) class NoExts(cyclone.web.StaticFileHandler): # .html pages can be get() without .html on them def get(self, path, *args, **kw): if path and '.' not in path: path = path + ".html" cyclone.web.StaticFileHandler.get(self, path, *args, **kw) class Metrics(cyclone.web.RequestHandler): def get(self): self.add_header('content-type', 'text/plain') self.write(generate_latest(REGISTRY)) def main(dirUriMap: Optional[DirUriMap] = None, prefixes: Optional[Dict[str, URIRef]] = None, port=9999): if dirUriMap is None: dirUriMap = {Path('data/'): URIRef('http://example.com/data/')} if prefixes is None: prefixes = { 'rdf': URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#'), 'rdfs': URIRef('http://www.w3.org/2000/01/rdf-schema#'), 'xsd': URIRef('http://www.w3.org/2001/XMLSchema#'), } logging.basicConfig() log = logging.getLogger() parser = optparse.OptionParser() parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG") (options, args) = parser.parse_args() log.setLevel(logging.DEBUG if options.verbose else logging.INFO) db = Db(dirUriMap=dirUriMap, addlPrefixes={None: prefixes}) from twisted.python import log as twlog twlog.startLogging(sys.stdout) reactor.listenTCP( port, cyclone.web.Application( handlers=[ (r'/graph', GraphResource), (r'/syncedGraph', WebsocketClient), (r'/prefixes', Prefixes), (r'/metrics', Metrics), # (r'/stats/(.*)', StatsHandler, { # 'serverName': 'rdfdb' # }), (r'/(.*)', NoExts, { "path": FilePath(__file__).sibling("web").path, "default_filename": "index.html" }), ], debug=True, db=db)) log.info("serving on %s" % port) reactor.run() if __name__ == '__main__': main()