Mercurial > code > home > repos > video
view video_file_store.py @ 19:f0a773084a2e
add shoelace
author | drewp@bigasterisk.com |
---|---|
date | Mon, 17 Apr 2023 00:42:27 -0700 |
parents | 53d99454f394 |
children | cf6842952e13 |
line wrap: on
line source
import asyncio import hashlib import re from dataclasses import dataclass from pathlib import Path from typing import Iterable, Iterator, NewType @dataclass class VideoFile: diskPath: Path webRelPath: str label: str # perms, playlists, req by/when def vf(p: Path, label: str): return VideoFile(p, './files/' + str(p.relative_to('/data')), label) def thumbWebPath(rel: str) -> str: return './files/' + rel @dataclass class VideoFileStore: top: Path def findInDir(self, subdir: str) -> Iterable[VideoFile]: if subdir[0] != '/': raise ValueError here = self.top / subdir[1:] manifests = list(here.glob('*.mpd')) if manifests: p = manifests[0] label = p.parent.name yield vf(p, label) return for p in sorted(list(here.glob('*.mp4')) + list(here.glob('*.webm'))): label = re.sub(r' \[[^\]]+\]\.\w+', '', p.name) yield vf(p, label) def findSubdirs(self, subdir: str) -> Iterable: if subdir[0] != '/': raise ValueError here = self.top / subdir[1:] for p in here.iterdir(): if p.is_dir() and p.name not in {'_thumb'}: yield { 'label': p.name, 'path': '/' + str(p.relative_to(self.top)) } def thumbPath(self, vf: VideoFile) -> str: sha256 = hashlib.sha256() with open(vf.diskPath, 'rb') as f: firstMb = f.read(1 << 20) sha256.update(firstMb) cksum = sha256.hexdigest() return f'_thumb/{cksum}.jpg' async def getOrCreateThumb(self, vf: VideoFile) -> str: p = self.top / self.thumbPath(vf) if not p.exists(): sp = asyncio.create_subprocess_exec('ffmpegthumbnailer', '-s', '250', '-i', str(vf.diskPath), '-o', str(p)) await sp return thumbWebPath(str(p.relative_to(self.top))) async def save(self, name: str, chunks: Iterator[bytes]): p = self.top / name if p.exists(): raise ValueError(f'{p} exists') data = b'' for c in chunks: data += c p.write_bytes(data)