Mercurial > code > home > repos > video
changeset 17:071943adf000
dnd a file or a url which we'll queue and fetch
author | drewp@bigasterisk.com |
---|---|
date | Sun, 16 Apr 2023 03:19:33 -0700 |
parents | 838eb0223bdb |
children | 1b388ee5dd09 |
files | pyproject.toml src/ingest/IngestDrop.ts src/ingest/IngestStatus.ts video.py video_ingest.py |
diffstat | 5 files changed, 84 insertions(+), 27 deletions(-) [+] |
line wrap: on
line diff
--- a/pyproject.toml Sun Apr 16 03:17:48 2023 -0700 +++ b/pyproject.toml Sun Apr 16 03:19:33 2023 -0700 @@ -18,6 +18,7 @@ "uvicorn>=0.21.1", "watchgod>=0.8.2", "sse-starlette>=1.3.3", + "mongo-queue-service>=0.1.8", ] requires-python = ">=3.10" license = { text = "MIT" }
--- a/src/ingest/IngestDrop.ts Sun Apr 16 03:17:48 2023 -0700 +++ b/src/ingest/IngestDrop.ts Sun Apr 16 03:19:33 2023 -0700 @@ -38,23 +38,23 @@ return; } - for (let i = 0; i < ev.dataTransfer.files.length; i++) { - const f = ev.dataTransfer.files[i]; - const name = f.name; - const stream = f.stream(); - fetch("../api/ingest/videoUpload?name=" + encodeURIComponent(f.name), { - method: "POST", - body: stream, - duplex: "half", - }); - } - const url = ev.dataTransfer.getData("text/plain"); if (url) { fetch("../api/ingest/videoUrl", { method: "POST", body: url, }); + } else { + for (let i = 0; i < ev.dataTransfer.files.length; i++) { + const f = ev.dataTransfer.files[i]; + const name = f.name; + const stream = f.stream(); + fetch("../api/ingest/videoUpload?name=" + encodeURIComponent(f.name), { + method: "POST", + body: stream, + duplex: "half", + } as any); + } } } }
--- a/src/ingest/IngestStatus.ts Sun Apr 16 03:17:48 2023 -0700 +++ b/src/ingest/IngestStatus.ts Sun Apr 16 03:19:33 2023 -0700 @@ -1,21 +1,49 @@ import { LitElement, html, css } from "lit"; import { customElement, property } from "lit/decorators.js"; +interface Row { + url: string; + t: string; + progress: string; +} + @customElement("ingest-status") export class IngestStatus extends LitElement { - + @property() queue: Row[] = []; static styles = [ css` + table { + background: #ccc; + } `, ]; + connectedCallback(): void { + super.connectedCallback(); + const es = new EventSource("../api/ingest/queue"); + es.onmessage = (ev) => { + this.queue = JSON.parse(ev.data); + }; + } render() { return html` - -<table> - <thead><th>Source</th><th>Status</th></thead> - <tbody id="processing"> - </tbody> - </table> + <table> + <thead> + <th>Queued at</th> + <th>Progress</th> + <th>Source</th> + </thead> + <tbody id="processing"> + ${this.queue.map( + (row) => html` + <tr> + <td>${row.t}</td> + <td>${row.progress}</td> + <td>${row.url}</td> + </tr> + ` + )} + </tbody> + </table> `; } }
--- a/video.py Sun Apr 16 03:17:48 2023 -0700 +++ b/video.py Sun Apr 16 03:19:33 2023 -0700 @@ -1,3 +1,5 @@ +import asyncio +import json import logging from pathlib import Path @@ -11,9 +13,10 @@ from video_file_store import VideoFileStore from video_ingest import VideoIngest - +import dl_queue logging.basicConfig(level=logging.DEBUG) log = logging.getLogger() +logging.getLogger('sse_starlette').setLevel(logging.WARNING) def root(req): @@ -36,7 +39,7 @@ async def ingestVideoUrl(req: Request) -> Response: url = await req.body() - svc.ingestUrl(url) + await svc.ingestUrl(url.decode('utf8')) return Response(status_code=202) @@ -47,18 +50,15 @@ async def ingestQueue(req: Request) -> EventSourceResponse: - - def convertEvents(svcEvents): - for ev in svcEvents: - yield dict(type='ev') - - return EventSourceResponse(convertEvents(svc.events())) + async def g(): + async for ev in svc.events(): + yield json.dumps(ev) + return EventSourceResponse(g()) store = VideoFileStore(top=Path('/data')) svc = VideoIngest(store) - def main(): app = Starlette( @@ -78,6 +78,8 @@ app.add_middleware(PrometheusMiddleware, app_name='video_api') app.add_route("/metrics", handle_metrics) + + app.state.processTask = asyncio.create_task(dl_queue.process()) return app
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/video_ingest.py Sun Apr 16 03:19:33 2023 -0700 @@ -0,0 +1,26 @@ +import asyncio +from dataclasses import dataclass +from typing import Any, Coroutine + +import dl_queue +from video_file_store import VideoFileStore + + +@dataclass +class VideoIngest: + store: VideoFileStore + + async def addContent(self, name: str, body: Coroutine[Any, Any, bytes]): + await self.store.save(name, iter([await body])) + + async def ingestUrl(self, url: str): + dl_queue.queue.put({'url': url, 'outDir': str(self.store.top)}) + + async def events(self): + prev = None + while True: + p = dl_queue.pending() + if p != prev: + prev = p + yield p + await asyncio.sleep(1)