# HG changeset patch # User drewp@bigasterisk.com # Date 1681640373 25200 # Node ID 071943adf0006c70a287f9c316b0e11dee5134b3 # Parent 838eb0223bdbef371b61bb808252807f81814c03 dnd a file or a url which we'll queue and fetch diff -r 838eb0223bdb -r 071943adf000 pyproject.toml --- a/pyproject.toml Sun Apr 16 03:17:48 2023 -0700 +++ b/pyproject.toml Sun Apr 16 03:19:33 2023 -0700 @@ -18,6 +18,7 @@ "uvicorn>=0.21.1", "watchgod>=0.8.2", "sse-starlette>=1.3.3", + "mongo-queue-service>=0.1.8", ] requires-python = ">=3.10" license = { text = "MIT" } diff -r 838eb0223bdb -r 071943adf000 src/ingest/IngestDrop.ts --- a/src/ingest/IngestDrop.ts Sun Apr 16 03:17:48 2023 -0700 +++ b/src/ingest/IngestDrop.ts Sun Apr 16 03:19:33 2023 -0700 @@ -38,23 +38,23 @@ return; } - for (let i = 0; i < ev.dataTransfer.files.length; i++) { - const f = ev.dataTransfer.files[i]; - const name = f.name; - const stream = f.stream(); - fetch("../api/ingest/videoUpload?name=" + encodeURIComponent(f.name), { - method: "POST", - body: stream, - duplex: "half", - }); - } - const url = ev.dataTransfer.getData("text/plain"); if (url) { fetch("../api/ingest/videoUrl", { method: "POST", body: url, }); + } else { + for (let i = 0; i < ev.dataTransfer.files.length; i++) { + const f = ev.dataTransfer.files[i]; + const name = f.name; + const stream = f.stream(); + fetch("../api/ingest/videoUpload?name=" + encodeURIComponent(f.name), { + method: "POST", + body: stream, + duplex: "half", + } as any); + } } } } diff -r 838eb0223bdb -r 071943adf000 src/ingest/IngestStatus.ts --- a/src/ingest/IngestStatus.ts Sun Apr 16 03:17:48 2023 -0700 +++ b/src/ingest/IngestStatus.ts Sun Apr 16 03:19:33 2023 -0700 @@ -1,21 +1,49 @@ import { LitElement, html, css } from "lit"; import { customElement, property } from "lit/decorators.js"; +interface Row { + url: string; + t: string; + progress: string; +} + @customElement("ingest-status") export class IngestStatus extends LitElement { - + @property() queue: Row[] = []; static styles = [ css` + table { + background: #ccc; + } `, ]; + connectedCallback(): void { + super.connectedCallback(); + const es = new EventSource("../api/ingest/queue"); + es.onmessage = (ev) => { + this.queue = JSON.parse(ev.data); + }; + } render() { return html` - - - - - -
SourceStatus
+ + + + + + + + ${this.queue.map( + (row) => html` + + + + + + ` + )} + +
Queued atProgressSource
${row.t}${row.progress}${row.url}
`; } } diff -r 838eb0223bdb -r 071943adf000 video.py --- a/video.py Sun Apr 16 03:17:48 2023 -0700 +++ b/video.py Sun Apr 16 03:19:33 2023 -0700 @@ -1,3 +1,5 @@ +import asyncio +import json import logging from pathlib import Path @@ -11,9 +13,10 @@ from video_file_store import VideoFileStore from video_ingest import VideoIngest - +import dl_queue logging.basicConfig(level=logging.DEBUG) log = logging.getLogger() +logging.getLogger('sse_starlette').setLevel(logging.WARNING) def root(req): @@ -36,7 +39,7 @@ async def ingestVideoUrl(req: Request) -> Response: url = await req.body() - svc.ingestUrl(url) + await svc.ingestUrl(url.decode('utf8')) return Response(status_code=202) @@ -47,18 +50,15 @@ async def ingestQueue(req: Request) -> EventSourceResponse: - - def convertEvents(svcEvents): - for ev in svcEvents: - yield dict(type='ev') - - return EventSourceResponse(convertEvents(svc.events())) + async def g(): + async for ev in svc.events(): + yield json.dumps(ev) + return EventSourceResponse(g()) store = VideoFileStore(top=Path('/data')) svc = VideoIngest(store) - def main(): app = Starlette( @@ -78,6 +78,8 @@ app.add_middleware(PrometheusMiddleware, app_name='video_api') app.add_route("/metrics", handle_metrics) + + app.state.processTask = asyncio.create_task(dl_queue.process()) return app diff -r 838eb0223bdb -r 071943adf000 video_ingest.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/video_ingest.py Sun Apr 16 03:19:33 2023 -0700 @@ -0,0 +1,26 @@ +import asyncio +from dataclasses import dataclass +from typing import Any, Coroutine + +import dl_queue +from video_file_store import VideoFileStore + + +@dataclass +class VideoIngest: + store: VideoFileStore + + async def addContent(self, name: str, body: Coroutine[Any, Any, bytes]): + await self.store.save(name, iter([await body])) + + async def ingestUrl(self, url: str): + dl_queue.queue.put({'url': url, 'outDir': str(self.store.top)}) + + async def events(self): + prev = None + while True: + p = dl_queue.pending() + if p != prev: + prev = p + yield p + await asyncio.sleep(1)