changeset 17:071943adf000

dnd a file or a url which we'll queue and fetch
author drewp@bigasterisk.com
date Sun, 16 Apr 2023 03:19:33 -0700
parents 838eb0223bdb
children 1b388ee5dd09
files pyproject.toml src/ingest/IngestDrop.ts src/ingest/IngestStatus.ts video.py video_ingest.py
diffstat 5 files changed, 84 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/pyproject.toml	Sun Apr 16 03:17:48 2023 -0700
+++ b/pyproject.toml	Sun Apr 16 03:19:33 2023 -0700
@@ -18,6 +18,7 @@
     "uvicorn>=0.21.1",
     "watchgod>=0.8.2",
     "sse-starlette>=1.3.3",
+    "mongo-queue-service>=0.1.8",
 ]
 requires-python = ">=3.10"
 license = { text = "MIT" }
--- a/src/ingest/IngestDrop.ts	Sun Apr 16 03:17:48 2023 -0700
+++ b/src/ingest/IngestDrop.ts	Sun Apr 16 03:19:33 2023 -0700
@@ -38,23 +38,23 @@
       return;
     }
 
-    for (let i = 0; i < ev.dataTransfer.files.length; i++) {
-      const f = ev.dataTransfer.files[i];
-      const name = f.name;
-      const stream = f.stream();
-      fetch("../api/ingest/videoUpload?name=" + encodeURIComponent(f.name), {
-        method: "POST",
-        body: stream,
-        duplex: "half",
-      });
-    }
-
     const url = ev.dataTransfer.getData("text/plain");
     if (url) {
       fetch("../api/ingest/videoUrl", {
         method: "POST",
         body: url,
       });
+    } else {
+      for (let i = 0; i < ev.dataTransfer.files.length; i++) {
+        const f = ev.dataTransfer.files[i];
+        const name = f.name;
+        const stream = f.stream();
+        fetch("../api/ingest/videoUpload?name=" + encodeURIComponent(f.name), {
+          method: "POST",
+          body: stream,
+          duplex: "half",
+        } as any);
+      }
     }
   }
 }
--- a/src/ingest/IngestStatus.ts	Sun Apr 16 03:17:48 2023 -0700
+++ b/src/ingest/IngestStatus.ts	Sun Apr 16 03:19:33 2023 -0700
@@ -1,21 +1,49 @@
 import { LitElement, html, css } from "lit";
 import { customElement, property } from "lit/decorators.js";
 
+interface Row {
+  url: string;
+  t: string;
+  progress: string;
+}
+
 @customElement("ingest-status")
 export class IngestStatus extends LitElement {
-
+  @property() queue: Row[] = [];
   static styles = [
     css`
+      table {
+        background: #ccc;
+      }
     `,
   ];
+  connectedCallback(): void {
+    super.connectedCallback();
+    const es = new EventSource("../api/ingest/queue");
+    es.onmessage = (ev) => {
+      this.queue = JSON.parse(ev.data);
+    };
+  }
   render() {
     return html`
-
-<table>
-      <thead><th>Source</th><th>Status</th></thead>
-      <tbody id="processing">
-      </tbody>
-    </table>
+      <table>
+        <thead>
+          <th>Queued at</th>
+          <th>Progress</th>
+          <th>Source</th>
+        </thead>
+        <tbody id="processing">
+          ${this.queue.map(
+            (row) => html`
+              <tr>
+                <td>${row.t}</td>
+                <td>${row.progress}</td>
+                <td>${row.url}</td>
+              </tr>
+            `
+          )}
+        </tbody>
+      </table>
     `;
   }
 }
--- a/video.py	Sun Apr 16 03:17:48 2023 -0700
+++ b/video.py	Sun Apr 16 03:19:33 2023 -0700
@@ -1,3 +1,5 @@
+import asyncio
+import json
 import logging
 from pathlib import Path
 
@@ -11,9 +13,10 @@
 
 from video_file_store import VideoFileStore
 from video_ingest import VideoIngest
-
+import dl_queue
 logging.basicConfig(level=logging.DEBUG)
 log = logging.getLogger()
+logging.getLogger('sse_starlette').setLevel(logging.WARNING)
 
 
 def root(req):
@@ -36,7 +39,7 @@
 
 async def ingestVideoUrl(req: Request) -> Response:
     url = await req.body()
-    svc.ingestUrl(url)
+    await svc.ingestUrl(url.decode('utf8'))
     return Response(status_code=202)
 
 
@@ -47,18 +50,15 @@
 
 
 async def ingestQueue(req: Request) -> EventSourceResponse:
-
-    def convertEvents(svcEvents):
-        for ev in svcEvents:
-            yield dict(type='ev')
-
-    return EventSourceResponse(convertEvents(svc.events()))
+    async def g():
+        async for ev in svc.events():
+            yield json.dumps(ev)
+    return EventSourceResponse(g())
 
 
 store = VideoFileStore(top=Path('/data'))
 svc = VideoIngest(store)
 
-
 def main():
 
     app = Starlette(
@@ -78,6 +78,8 @@
 
     app.add_middleware(PrometheusMiddleware, app_name='video_api')
     app.add_route("/metrics", handle_metrics)
+    
+    app.state.processTask = asyncio.create_task(dl_queue.process())
     return app
 
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/video_ingest.py	Sun Apr 16 03:19:33 2023 -0700
@@ -0,0 +1,26 @@
+import asyncio
+from dataclasses import dataclass
+from typing import Any, Coroutine
+
+import dl_queue
+from video_file_store import VideoFileStore
+
+
+@dataclass
+class VideoIngest:
+    store: VideoFileStore
+
+    async def addContent(self, name: str, body: Coroutine[Any, Any, bytes]):
+        await self.store.save(name, iter([await body]))
+
+    async def ingestUrl(self, url: str):
+        dl_queue.queue.put({'url': url, 'outDir': str(self.store.top)})
+
+    async def events(self):
+        prev = None
+        while True:
+            p = dl_queue.pending()
+            if p != prev:
+                prev = p
+                yield p
+            await asyncio.sleep(1)