view rdfdb/service.py @ 104:d1fd6aeffb27

(rough) move Db to shared_graph.py (includes some asyncio updates probably)
author drewp@bigasterisk.com
date Mon, 30 May 2022 20:38:06 -0700
parents 0f921dd06887
children 4681ea3fcdf6
line wrap: on
line source

import itertools
import json
import logging
import optparse
import os
import sys
from pathlib import Path
from typing import Dict, List, Optional, cast

import cyclone.web
import cyclone.websocket
import twisted.internet.error
import twisted.internet.reactor
from rdflib import ConjunctiveGraph, Graph, URIRef
from twisted.internet.inotify import IN_CREATE, INotify
from twisted.internet.interfaces import IReactorCore
from twisted.python.failure import Failure
from twisted.python.filepath import FilePath

from rdfdb.file_vs_uri import (DirUriMap, correctToTopdirPrefix, fileForUri, uriFromFile)
from rdfdb.graphfile import GetSubgraph, GraphFile, PatchCb
from rdfdb.patch import ALLSTMTS, Patch
from rdfdb.rdflibpatch import patchQuads
from prometheus_client import Gauge
from prometheus_client.exposition import generate_latest
from prometheus_client.registry import REGISTRY
reactor = cast(IReactorCore, twisted.internet.reactor)

STAT_CLIENTS = Gauge('clients', 'connected clients')
# gatherProcessStats()
# stats = scales.collection(
#     '/webServer',
#     scales.IntStat('liveClients'),
#     scales.PmfStat('setAttr'),
# )
# graphStats = scales.collection(
#     '/graph',
#     scales.IntStat('statements'),
#     scales.RecentFpsStat('patchFps'),
# )
# fileStats = scales.collection(
#     '/file',
#     scales.IntStat('mappedGraphFiles'),
# )

log = logging.getLogger('rdfdb')


class WebsocketDisconnect(ValueError):
    pass


CtxPrefixes = Dict[Optional[URIRef], Dict[str, URIRef]]


_wsClientSerial = itertools.count(0)


class WebsocketClient(cyclone.websocket.WebSocketHandler):
    """
    Send patches to the client (starting with a client who has 0
    statements) to keep it in sync with the graph.

    Accept patches from the client, and assume that the client has
    already applied them to its local graph.

    Treat a disconnect as 'out of sync'. Either the client thinks it
    is out of sync and wants to start over, or we can't apply a patch
    correctly therefore we disconnect to make the client start over.

    This socket may also carry some special messages meant for the
    rdfdb web UI, e.g. about who is connected, etc.
    """
    connectionId: str

    def connectionMade(self, *args, **kwargs) -> None:
        self.connectionId = f'WS{next(_wsClientSerial)}'

        self.sendMessage(json.dumps({'connectedAs': self.connectionId}))
        log.info("new ws client %r", self)
        self.settings.db.addClient(self)

    def connectionLost(self, reason):
        log.info("bye ws client %r: %s", self, reason)
        self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)), self)

    def messageReceived(self, message: bytes):
        if message == b'PING':
            self.sendMessage('PONG')
            return
        log.debug("got message from %r: %s", self, message[:32])
        p = Patch(jsonRepr=message.decode('utf8'))
        self.settings.db.patch(p, sender=self.connectionId)

    def sendPatch(self, p: Patch):
        self.sendMessage(p.makeJsonRepr())

    def __repr__(self):
        return f"<SyncedGraph client {self.connectionId}>"




class GraphResource(cyclone.web.RequestHandler):

    def get(self):
        accept = self.request.headers.get('accept', '')
        format = 'n3'
        if accept == 'text/plain':
            format = 'nt'
        elif accept == 'application/n-quads':
            format = 'nquads'
        elif accept == 'pickle':
            # don't use this; it's just for speed comparison
            import pickle as pickle
            pickle.dump(self.settings.db.graph, self, protocol=2)
            return
        elif accept == 'msgpack':
            self.write(repr(self.settings.db.graph.__getstate__))
            return
        self.write(self.settings.db.graph.serialize(format=format))


class Prefixes(cyclone.web.RequestHandler):

    def post(self):
        suggestion = json.loads(self.request.body)
        addlPrefixes = self.settings.db.watchedFiles.addlPrefixes
        addlPrefixes.setdefault(URIRef(suggestion['ctx']), {}).update(suggestion['prefixes'])


class NoExts(cyclone.web.StaticFileHandler):
    # .html pages can be get() without .html on them
    def get(self, path, *args, **kw):
        if path and '.' not in path:
            path = path + ".html"
        cyclone.web.StaticFileHandler.get(self, path, *args, **kw)


class Metrics(cyclone.web.RequestHandler):

    def get(self):
        self.add_header('content-type', 'text/plain')
        self.write(generate_latest(REGISTRY))


def main(dirUriMap: Optional[DirUriMap] = None, prefixes: Optional[Dict[str, URIRef]] = None, port=9999):

    if dirUriMap is None:
        dirUriMap = {Path('data/'): URIRef('http://example.com/data/')}
    if prefixes is None:
        prefixes = {
            'rdf': URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#'),
            'rdfs': URIRef('http://www.w3.org/2000/01/rdf-schema#'),
            'xsd': URIRef('http://www.w3.org/2001/XMLSchema#'),
        }

    logging.basicConfig()
    log = logging.getLogger()

    parser = optparse.OptionParser()
    parser.add_option("-v", "--verbose", action="store_true", help="logging.DEBUG")
    (options, args) = parser.parse_args()

    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)

    db = Db(dirUriMap=dirUriMap, addlPrefixes={None: prefixes})

    from twisted.python import log as twlog
    twlog.startLogging(sys.stdout)

    reactor.listenTCP(
        port,
        cyclone.web.Application(
            handlers=[
                (r'/graph', GraphResource),
                (r'/syncedGraph', WebsocketClient),
                (r'/prefixes', Prefixes),
                (r'/metrics', Metrics),
                # (r'/stats/(.*)', StatsHandler, {
                #     'serverName': 'rdfdb'
                # }),
                (r'/(.*)', NoExts, {
                    "path": FilePath(__file__).sibling("web").path,
                    "default_filename": "index.html"
                }),
            ],
            debug=True,
            db=db))
    log.info("serving on %s" % port)
    reactor.run()


if __name__ == '__main__':
    main()