Mercurial > code > home > repos > rdfdb
annotate rdfdb/service.py @ 107:19100db34354
s/Db/SharedGraph/
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 May 2022 22:44:10 -0700 |
parents | 4681ea3fcdf6 |
children | bc643d61bb7c |
rev | line source |
---|---|
105 | 1 import functools |
80 | 2 import itertools |
3 import logging | |
89
1120c6489888
refactor, redo the SyncedGraph class ordering, fix most typing errors
drewp@bigasterisk.com
parents:
87
diff
changeset
|
4 from pathlib import Path |
105 | 5 from typing import Dict, Optional, cast |
46
3b36b2c8ae65
fix bug that echoed a patch back to KC. https://bigasterisk.com/light9/work/2019/kc-patch-echo-bug.png
Drew Perttula <drewp@bigasterisk.com>
parents:
45
diff
changeset
|
6 |
105 | 7 from rdflib import URIRef |
8 from starlette.applications import Starlette | |
9 from starlette.endpoints import WebSocketEndpoint | |
10 from starlette.requests import Request | |
11 from starlette.responses import PlainTextResponse, Response | |
12 from starlette.routing import Route, WebSocketRoute | |
13 from starlette.websockets import WebSocket | |
14 from starlette_exporter import PrometheusMiddleware, handle_metrics | |
13
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
15 |
105 | 16 from rdfdb.file_vs_uri import DirUriMap |
17 from rdfdb.patch import Patch | |
18 | |
107 | 19 from rdfdb.shared_graph import SharedGraph |
52 | 20 |
13
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
21 log = logging.getLogger('rdfdb') |
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
22 |
65
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
23 _wsClientSerial = itertools.count(0) |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
24 |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
25 |
105 | 26 class SyncedGraphSocket(WebSocketEndpoint): |
65
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
27 """ |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
28 Send patches to the client (starting with a client who has 0 |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
29 statements) to keep it in sync with the graph. |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
30 |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
31 Accept patches from the client, and assume that the client has |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
32 already applied them to its local graph. |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
33 |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
34 Treat a disconnect as 'out of sync'. Either the client thinks it |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
35 is out of sync and wants to start over, or we can't apply a patch |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
36 correctly therefore we disconnect to make the client start over. |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
37 |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
38 This socket may also carry some special messages meant for the |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
39 rdfdb web UI, e.g. about who is connected, etc. |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
40 """ |
105 | 41 encoding = "text" |
65
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
42 |
107 | 43 def __init__(self, db: SharedGraph, *a, **kw): |
105 | 44 WebSocketEndpoint.__init__(self, *a, **kw) |
45 self.db = db | |
46 self.connectionId = f'WS{next(_wsClientSerial)}' # unneeded? | |
47 log.info(f'ws connection init {self.connectionId}') | |
65
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
48 |
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
49 def __repr__(self): |
105 | 50 return f'<WebSocket {self.connectionId}>' |
51 | |
52 async def on_connect(self, websocket: WebSocket): | |
53 await websocket.accept() | |
54 await websocket.send_json({'connectedAs': self.connectionId}) | |
55 log.info(f"new ws client {self.connectionId}") | |
56 | |
57 await self.db.addClient(self.connectionId, functools.partial(self._onPatch,websocket)) | |
58 | |
59 async def _onPatch(self, websocket: WebSocket, p: Patch): | |
60 await websocket.send_text(p.makeJsonRepr()) | |
65
9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents:
59
diff
changeset
|
61 |
105 | 62 async def on_receive(self, websocket: WebSocket, data: str): |
63 if data == 'PING': | |
64 await websocket.send_text('PONG') | |
65 return | |
66 log.debug("%r sends patch to us: %s", self, data[:64]) | |
67 p = Patch(jsonRepr=data) | |
68 try: | |
69 await self.db.patch(p, sender=self.connectionId) | |
70 except ValueError as e: | |
71 log.warning(f'patch from {self!r} did not apply: {e!r}') | |
72 # here we should disconnect that client and make them reset | |
73 await websocket.close() | |
80 | 74 |
105 | 75 async def on_disconnect(self, websocket, close_code): |
76 log.info("bye ws client %r: %s", self.connectionId, close_code) | |
77 self.db.clientDisconnected(self.connectionId) | |
13
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
78 |
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
79 |
107 | 80 def get_graph(db: SharedGraph, request: Request) -> Response: |
105 | 81 accept = request.headers.get('accept', '') |
82 if accept == 'text/plain': | |
83 format = 'nt' | |
84 elif accept == 'application/n-quads': | |
85 format = 'nquads' | |
86 else: | |
13
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
87 format = 'n3' |
105 | 88 return PlainTextResponse(db.graph.serialize(format=format)) |
13
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
89 |
45 | 90 |
107 | 91 async def post_prefixes(db: SharedGraph, request: Request) -> Response: |
105 | 92 suggestion = await request.json() |
93 db.addPrefixes( | |
94 ctx=URIRef(suggestion['ctx']), # | |
95 prefixes=dict((cast(str, k), URIRef(v)) for k, v in suggestion['prefixes'].items())) | |
96 return Response(status_code=204) | |
45 | 97 |
98 | |
105 | 99 def makeApp(dirUriMap: Optional[DirUriMap] = None, prefixes: Optional[Dict[str, URIRef]] = None) -> Starlette: |
100 log.info('makeApp start') | |
13
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
101 |
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
102 if dirUriMap is None: |
89
1120c6489888
refactor, redo the SyncedGraph class ordering, fix most typing errors
drewp@bigasterisk.com
parents:
87
diff
changeset
|
103 dirUriMap = {Path('data/'): URIRef('http://example.com/data/')} |
13
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
104 if prefixes is None: |
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
105 prefixes = { |
42 | 106 'rdf': URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#'), |
107 'rdfs': URIRef('http://www.w3.org/2000/01/rdf-schema#'), | |
108 'xsd': URIRef('http://www.w3.org/2001/XMLSchema#'), | |
13
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
109 } |
45 | 110 |
105 | 111 log.setLevel(logging.DEBUG if 1 else logging.INFO) |
13
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
112 |
105 | 113 log.info('setup watches') |
107 | 114 db = SharedGraph(dirUriMap=dirUriMap, prefixes=prefixes) |
13
c9d1764d64ad
add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff
changeset
|
115 |
105 | 116 app = Starlette( |
117 debug=True, | |
118 routes=[ | |
119 Route('/graph', functools.partial(get_graph, db)), | |
120 WebSocketRoute('/syncedGraph', functools.partial(SyncedGraphSocket, db)), | |
121 Route('/prefixes', functools.partial(post_prefixes, db)), | |
122 ], | |
123 ) | |
86 | 124 |
105 | 125 app.add_middleware(PrometheusMiddleware) |
126 app.add_route("/metrics", handle_metrics) | |
86 | 127 |
105 | 128 return app |