view video_file_store.py @ 43:a7b644dc1b4b

ridiculous fix for vite not proxying urls with . in them
author drewp@bigasterisk.com
date Fri, 06 Dec 2024 00:58:46 -0800
parents 0aea9e55899b
children
line wrap: on
line source

import asyncio
import hashlib
import logging
import os
from dataclasses import dataclass
from pathlib import Path
import re
from typing import Iterable, Iterator

import pymongo.collection

log = logging.getLogger('vfs')


@dataclass
class VideoFile:
    diskPath: Path
    webRelPath: str
    webDataPath: str
    label: str
    # perms, playlists, req by/when


def numSort(docs: list):

    def pad(match):
        return match.group(0).zfill(10)

    docs.sort(key=lambda doc: re.sub(r"\d+", pad, doc['label']))

@dataclass
class VideoFileStore:
    fs: pymongo.collection.Collection

    def findInDir(self, subdir: str) -> Iterable[VideoFile]:
        log.warning(f'findInDir:  {subdir=}')
        webRelParent = '.' if subdir == '/' else subdir
        docs = list(self.fs.find({
                'type': 'file',
                'webRelParent': webRelParent
        }))
        numSort(docs)
        for doc in docs:
            yield VideoFile(Path(doc['diskPath']), doc['webRelPath'],
                            doc['webDataPath'], doc['label'])

    def findSubdirs(self, subdir: str) -> Iterable:
        docs = list(self.fs.find({
                'type':
                'dir',
                'webRelParent':
                '.' if subdir == '/' else subdir
        }))
        numSort(docs)

        for doc in docs:
            yield {
                'label': doc['label'],
                'path': doc['webRelPath'],
            }

    async def save(self, name: str, chunks: Iterator[bytes]):
        raise
        p = self.top / name
        if p.exists():
            raise ValueError(f'{p} exists')
        data = b''
        for c in chunks:
            data += c
        p.write_bytes(data)

    def folderTree(self):
        out = {'name': 'TOP'}

        def fill(node: dict, pathToHere: Path):
            for subName in sorted(os.listdir(pathToHere)):
                if subName in IGNORE:
                    continue
                subDir = pathToHere / subName
                if subDir.is_dir():
                    subNode = {'name': subName}
                    node.setdefault('children', []).append(subNode)
                    fill(subNode, subDir)

        fill(out, self.top)
        return out