annotate rdfdb/service.py @ 107:19100db34354

s/Db/SharedGraph/
author drewp@bigasterisk.com
date Mon, 30 May 2022 22:44:10 -0700
parents 4681ea3fcdf6
children bc643d61bb7c
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
1 import functools
80
22c9679dbf67 reformat
drewp@bigasterisk.com
parents: 72
diff changeset
2 import itertools
22c9679dbf67 reformat
drewp@bigasterisk.com
parents: 72
diff changeset
3 import logging
89
1120c6489888 refactor, redo the SyncedGraph class ordering, fix most typing errors
drewp@bigasterisk.com
parents: 87
diff changeset
4 from pathlib import Path
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
5 from typing import Dict, Optional, cast
46
3b36b2c8ae65 fix bug that echoed a patch back to KC. https://bigasterisk.com/light9/work/2019/kc-patch-echo-bug.png
Drew Perttula <drewp@bigasterisk.com>
parents: 45
diff changeset
6
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
7 from rdflib import URIRef
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
8 from starlette.applications import Starlette
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
9 from starlette.endpoints import WebSocketEndpoint
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
10 from starlette.requests import Request
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
11 from starlette.responses import PlainTextResponse, Response
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
12 from starlette.routing import Route, WebSocketRoute
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
13 from starlette.websockets import WebSocket
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
14 from starlette_exporter import PrometheusMiddleware, handle_metrics
13
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
15
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
16 from rdfdb.file_vs_uri import DirUriMap
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
17 from rdfdb.patch import Patch
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
18
107
19100db34354 s/Db/SharedGraph/
drewp@bigasterisk.com
parents: 105
diff changeset
19 from rdfdb.shared_graph import SharedGraph
52
c81bd512d587 start the stats/ page
drewp@bigasterisk.com
parents: 48
diff changeset
20
13
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
21 log = logging.getLogger('rdfdb')
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
22
65
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
23 _wsClientSerial = itertools.count(0)
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
24
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
25
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
26 class SyncedGraphSocket(WebSocketEndpoint):
65
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
27 """
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
28 Send patches to the client (starting with a client who has 0
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
29 statements) to keep it in sync with the graph.
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
30
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
31 Accept patches from the client, and assume that the client has
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
32 already applied them to its local graph.
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
33
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
34 Treat a disconnect as 'out of sync'. Either the client thinks it
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
35 is out of sync and wants to start over, or we can't apply a patch
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
36 correctly therefore we disconnect to make the client start over.
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
37
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
38 This socket may also carry some special messages meant for the
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
39 rdfdb web UI, e.g. about who is connected, etc.
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
40 """
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
41 encoding = "text"
65
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
42
107
19100db34354 s/Db/SharedGraph/
drewp@bigasterisk.com
parents: 105
diff changeset
43 def __init__(self, db: SharedGraph, *a, **kw):
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
44 WebSocketEndpoint.__init__(self, *a, **kw)
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
45 self.db = db
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
46 self.connectionId = f'WS{next(_wsClientSerial)}' # unneeded?
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
47 log.info(f'ws connection init {self.connectionId}')
65
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
48
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
49 def __repr__(self):
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
50 return f'<WebSocket {self.connectionId}>'
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
51
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
52 async def on_connect(self, websocket: WebSocket):
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
53 await websocket.accept()
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
54 await websocket.send_json({'connectedAs': self.connectionId})
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
55 log.info(f"new ws client {self.connectionId}")
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
56
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
57 await self.db.addClient(self.connectionId, functools.partial(self._onPatch,websocket))
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
58
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
59 async def _onPatch(self, websocket: WebSocket, p: Patch):
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
60 await websocket.send_text(p.makeJsonRepr())
65
9bc9de580033 big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Drew Perttula <drewp@bigasterisk.com>
parents: 59
diff changeset
61
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
62 async def on_receive(self, websocket: WebSocket, data: str):
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
63 if data == 'PING':
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
64 await websocket.send_text('PONG')
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
65 return
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
66 log.debug("%r sends patch to us: %s", self, data[:64])
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
67 p = Patch(jsonRepr=data)
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
68 try:
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
69 await self.db.patch(p, sender=self.connectionId)
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
70 except ValueError as e:
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
71 log.warning(f'patch from {self!r} did not apply: {e!r}')
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
72 # here we should disconnect that client and make them reset
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
73 await websocket.close()
80
22c9679dbf67 reformat
drewp@bigasterisk.com
parents: 72
diff changeset
74
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
75 async def on_disconnect(self, websocket, close_code):
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
76 log.info("bye ws client %r: %s", self.connectionId, close_code)
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
77 self.db.clientDisconnected(self.connectionId)
13
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
78
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
79
107
19100db34354 s/Db/SharedGraph/
drewp@bigasterisk.com
parents: 105
diff changeset
80 def get_graph(db: SharedGraph, request: Request) -> Response:
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
81 accept = request.headers.get('accept', '')
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
82 if accept == 'text/plain':
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
83 format = 'nt'
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
84 elif accept == 'application/n-quads':
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
85 format = 'nquads'
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
86 else:
13
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
87 format = 'n3'
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
88 return PlainTextResponse(db.graph.serialize(format=format))
13
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
89
45
dc61012eeace python reformat
Drew Perttula <drewp@bigasterisk.com>
parents: 43
diff changeset
90
107
19100db34354 s/Db/SharedGraph/
drewp@bigasterisk.com
parents: 105
diff changeset
91 async def post_prefixes(db: SharedGraph, request: Request) -> Response:
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
92 suggestion = await request.json()
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
93 db.addPrefixes(
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
94 ctx=URIRef(suggestion['ctx']), #
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
95 prefixes=dict((cast(str, k), URIRef(v)) for k, v in suggestion['prefixes'].items()))
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
96 return Response(status_code=204)
45
dc61012eeace python reformat
Drew Perttula <drewp@bigasterisk.com>
parents: 43
diff changeset
97
dc61012eeace python reformat
Drew Perttula <drewp@bigasterisk.com>
parents: 43
diff changeset
98
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
99 def makeApp(dirUriMap: Optional[DirUriMap] = None, prefixes: Optional[Dict[str, URIRef]] = None) -> Starlette:
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
100 log.info('makeApp start')
13
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
101
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
102 if dirUriMap is None:
89
1120c6489888 refactor, redo the SyncedGraph class ordering, fix most typing errors
drewp@bigasterisk.com
parents: 87
diff changeset
103 dirUriMap = {Path('data/'): URIRef('http://example.com/data/')}
13
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
104 if prefixes is None:
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
105 prefixes = {
42
541010f99d61 type fixes
Drew Perttula <drewp@bigasterisk.com>
parents: 40
diff changeset
106 'rdf': URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#'),
541010f99d61 type fixes
Drew Perttula <drewp@bigasterisk.com>
parents: 40
diff changeset
107 'rdfs': URIRef('http://www.w3.org/2000/01/rdf-schema#'),
541010f99d61 type fixes
Drew Perttula <drewp@bigasterisk.com>
parents: 40
diff changeset
108 'xsd': URIRef('http://www.w3.org/2001/XMLSchema#'),
13
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
109 }
45
dc61012eeace python reformat
Drew Perttula <drewp@bigasterisk.com>
parents: 43
diff changeset
110
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
111 log.setLevel(logging.DEBUG if 1 else logging.INFO)
13
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
112
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
113 log.info('setup watches')
107
19100db34354 s/Db/SharedGraph/
drewp@bigasterisk.com
parents: 105
diff changeset
114 db = SharedGraph(dirUriMap=dirUriMap, prefixes=prefixes)
13
c9d1764d64ad add web server. remove more traces of light9
Drew Perttula <drewp@bigasterisk.com>
parents:
diff changeset
115
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
116 app = Starlette(
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
117 debug=True,
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
118 routes=[
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
119 Route('/graph', functools.partial(get_graph, db)),
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
120 WebSocketRoute('/syncedGraph', functools.partial(SyncedGraphSocket, db)),
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
121 Route('/prefixes', functools.partial(post_prefixes, db)),
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
122 ],
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
123 )
86
5b6e90a708ce some weak file_vs_uri coverage
drewp@bigasterisk.com
parents: 81
diff changeset
124
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
125 app.add_middleware(PrometheusMiddleware)
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
126 app.add_route("/metrics", handle_metrics)
86
5b6e90a708ce some weak file_vs_uri coverage
drewp@bigasterisk.com
parents: 81
diff changeset
127
105
4681ea3fcdf6 port to starlette asyncio
drewp@bigasterisk.com
parents: 104
diff changeset
128 return app