Mercurial > code > home > repos > video
view video_file_store.py @ 38:0aea9e55899b
hack in path router - dirs kind of work; putting a video in the path doesn't
author | drewp@bigasterisk.com |
---|---|
date | Wed, 04 Dec 2024 21:50:16 -0800 |
parents | 7cacfae58430 |
children |
line wrap: on
line source
import asyncio import hashlib import logging import os from dataclasses import dataclass from pathlib import Path import re from typing import Iterable, Iterator import pymongo.collection log = logging.getLogger('vfs') @dataclass class VideoFile: diskPath: Path webRelPath: str webDataPath: str label: str # perms, playlists, req by/when def numSort(docs: list): def pad(match): return match.group(0).zfill(10) docs.sort(key=lambda doc: re.sub(r"\d+", pad, doc['label'])) @dataclass class VideoFileStore: fs: pymongo.collection.Collection def findInDir(self, subdir: str) -> Iterable[VideoFile]: log.warning(f'findInDir: {subdir=}') webRelParent = '.' if subdir == '/' else subdir docs = list(self.fs.find({ 'type': 'file', 'webRelParent': webRelParent })) numSort(docs) for doc in docs: yield VideoFile(Path(doc['diskPath']), doc['webRelPath'], doc['webDataPath'], doc['label']) def findSubdirs(self, subdir: str) -> Iterable: docs = list(self.fs.find({ 'type': 'dir', 'webRelParent': '.' if subdir == '/' else subdir })) numSort(docs) for doc in docs: yield { 'label': doc['label'], 'path': doc['webRelPath'], } async def save(self, name: str, chunks: Iterator[bytes]): raise p = self.top / name if p.exists(): raise ValueError(f'{p} exists') data = b'' for c in chunks: data += c p.write_bytes(data) def folderTree(self): out = {'name': 'TOP'} def fill(node: dict, pathToHere: Path): for subName in sorted(os.listdir(pathToHere)): if subName in IGNORE: continue subDir = pathToHere / subName if subDir.is_dir(): subNode = {'name': subName} node.setdefault('children', []).append(subNode) fill(subNode, subDir) fill(out, self.top) return out