# HG changeset patch
# User drewp@bigasterisk.com
# Date 1681640373 25200
# Node ID 071943adf0006c70a287f9c316b0e11dee5134b3
# Parent 838eb0223bdbef371b61bb808252807f81814c03
dnd a file or a url which we'll queue and fetch
diff -r 838eb0223bdb -r 071943adf000 pyproject.toml
--- a/pyproject.toml Sun Apr 16 03:17:48 2023 -0700
+++ b/pyproject.toml Sun Apr 16 03:19:33 2023 -0700
@@ -18,6 +18,7 @@
"uvicorn>=0.21.1",
"watchgod>=0.8.2",
"sse-starlette>=1.3.3",
+ "mongo-queue-service>=0.1.8",
]
requires-python = ">=3.10"
license = { text = "MIT" }
diff -r 838eb0223bdb -r 071943adf000 src/ingest/IngestDrop.ts
--- a/src/ingest/IngestDrop.ts Sun Apr 16 03:17:48 2023 -0700
+++ b/src/ingest/IngestDrop.ts Sun Apr 16 03:19:33 2023 -0700
@@ -38,23 +38,23 @@
return;
}
- for (let i = 0; i < ev.dataTransfer.files.length; i++) {
- const f = ev.dataTransfer.files[i];
- const name = f.name;
- const stream = f.stream();
- fetch("../api/ingest/videoUpload?name=" + encodeURIComponent(f.name), {
- method: "POST",
- body: stream,
- duplex: "half",
- });
- }
-
const url = ev.dataTransfer.getData("text/plain");
if (url) {
fetch("../api/ingest/videoUrl", {
method: "POST",
body: url,
});
+ } else {
+ for (let i = 0; i < ev.dataTransfer.files.length; i++) {
+ const f = ev.dataTransfer.files[i];
+ const name = f.name;
+ const stream = f.stream();
+ fetch("../api/ingest/videoUpload?name=" + encodeURIComponent(f.name), {
+ method: "POST",
+ body: stream,
+ duplex: "half",
+ } as any);
+ }
}
}
}
diff -r 838eb0223bdb -r 071943adf000 src/ingest/IngestStatus.ts
--- a/src/ingest/IngestStatus.ts Sun Apr 16 03:17:48 2023 -0700
+++ b/src/ingest/IngestStatus.ts Sun Apr 16 03:19:33 2023 -0700
@@ -1,21 +1,49 @@
import { LitElement, html, css } from "lit";
import { customElement, property } from "lit/decorators.js";
+interface Row {
+ url: string;
+ t: string;
+ progress: string;
+}
+
@customElement("ingest-status")
export class IngestStatus extends LitElement {
-
+ @property() queue: Row[] = [];
static styles = [
css`
+ table {
+ background: #ccc;
+ }
`,
];
+ connectedCallback(): void {
+ super.connectedCallback();
+ const es = new EventSource("../api/ingest/queue");
+ es.onmessage = (ev) => {
+ this.queue = JSON.parse(ev.data);
+ };
+ }
render() {
return html`
-
-
+
+
+ Queued at |
+ Progress |
+ Source |
+
+
+ ${this.queue.map(
+ (row) => html`
+
+ ${row.t} |
+ ${row.progress} |
+ ${row.url} |
+
+ `
+ )}
+
+
`;
}
}
diff -r 838eb0223bdb -r 071943adf000 video.py
--- a/video.py Sun Apr 16 03:17:48 2023 -0700
+++ b/video.py Sun Apr 16 03:19:33 2023 -0700
@@ -1,3 +1,5 @@
+import asyncio
+import json
import logging
from pathlib import Path
@@ -11,9 +13,10 @@
from video_file_store import VideoFileStore
from video_ingest import VideoIngest
-
+import dl_queue
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger()
+logging.getLogger('sse_starlette').setLevel(logging.WARNING)
def root(req):
@@ -36,7 +39,7 @@
async def ingestVideoUrl(req: Request) -> Response:
url = await req.body()
- svc.ingestUrl(url)
+ await svc.ingestUrl(url.decode('utf8'))
return Response(status_code=202)
@@ -47,18 +50,15 @@
async def ingestQueue(req: Request) -> EventSourceResponse:
-
- def convertEvents(svcEvents):
- for ev in svcEvents:
- yield dict(type='ev')
-
- return EventSourceResponse(convertEvents(svc.events()))
+ async def g():
+ async for ev in svc.events():
+ yield json.dumps(ev)
+ return EventSourceResponse(g())
store = VideoFileStore(top=Path('/data'))
svc = VideoIngest(store)
-
def main():
app = Starlette(
@@ -78,6 +78,8 @@
app.add_middleware(PrometheusMiddleware, app_name='video_api')
app.add_route("/metrics", handle_metrics)
+
+ app.state.processTask = asyncio.create_task(dl_queue.process())
return app
diff -r 838eb0223bdb -r 071943adf000 video_ingest.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/video_ingest.py Sun Apr 16 03:19:33 2023 -0700
@@ -0,0 +1,26 @@
+import asyncio
+from dataclasses import dataclass
+from typing import Any, Coroutine
+
+import dl_queue
+from video_file_store import VideoFileStore
+
+
+@dataclass
+class VideoIngest:
+ store: VideoFileStore
+
+ async def addContent(self, name: str, body: Coroutine[Any, Any, bytes]):
+ await self.store.save(name, iter([await body]))
+
+ async def ingestUrl(self, url: str):
+ dl_queue.queue.put({'url': url, 'outDir': str(self.store.top)})
+
+ async def events(self):
+ prev = None
+ while True:
+ p = dl_queue.pending()
+ if p != prev:
+ prev = p
+ yield p
+ await asyncio.sleep(1)