# -*- coding: utf-8 -*- from __future__ import annotations import abc, re from enum import Enum, auto from typing import TYPE_CHECKING, Self from functools import cached_property, cache if TYPE_CHECKING: from typing import Self from .log import * from .base import Input, InputMode, Result, StatResult from .ProcFilter import ProcPipeline class FileContext(abc.ABC): class Direction(Enum): In = auto() Out = auto() def __init__( self, uri: str, interactive: bool|None = None, verbose_default = False, chroot: bool = False, in_pipe: ProcPipeline|None = None, out_pipe: ProcPipeline|None = None, ): self.__uri = uri self.__id, self.__root = self.split_uri(uri) self.__chroot = chroot self.__interactive = interactive self.__verbose_default = verbose_default self.__log_name: str|None = None self.__in_pipe = in_pipe self.__out_pipe = out_pipe self.__open_count = 0 if not verbose_default in [True, False]: raise ValueError(f'Tried to instantiate FileContext with verbose_default = "{verbose_default}"') async def __aenter__(self): await self.open() return self async def __aexit__(self, exc_type, exc, tb): await self.close() def __pipe(self, d: Direction): match d: case self.Direction.In: if not self.__in_pipe: self.__in_pipe = ProcPipeline() return self.__in_pipe case self.Direction.Out: if not self.__out_pipe: self.__out_pipe = ProcPipeline() return self.__out_pipe case _: raise Exception(f'Invalid pipe direction "{str(d)}"') def _chroot(self, path: str) -> str: if not self.__chroot: return path if not len(path): return self.__root if path[-1] == '/': return self.__root + path return self.__root + '/' + path def add_proc_filter(self, d: Direction, proc_filter: ProcFilter): self.__pipe(d).append(proc_filter) async def _open(self) -> None: pass async def open(self) -> None: self.__open_count += 1 if self.__open_count == 1: await self._open() async def _close(self) -> None: pass async def close(self) -> None: if self.__open_count == 1: await self._close() self.__open_count -= 1 assert self.__open_count >= 0, f'Closed file context "{self.__uri}" more often than opened' @classmethod def schema_from_uri(cls, uri: str) -> str: tokens = re.split(r'://', uri) return tokens[0] if tokens[0] != uri else 'file' @classmethod @cache def split_uri(cls, uri: str) -> tuple[str, str]: from urllib.parse import urlparse p = urlparse(uri) netloc = p.netloc if p.netloc else '' return f'{cls.schema_from_uri(uri)}://{netloc}', p.path @property def uri(self) -> str: return self.__uri @property def id(self) -> str: return self.__id @property def root(self) -> str: return self.__root @property def log_name(self) -> str: if self.__log_name is None: self.__log_name = self.__class__.__name__.lower() from urllib.parse import urlparse parsed = urlparse(self.__uri) uri: list[str] = [] if parsed.scheme: uri.append(parsed.scheme) if parsed.hostname: uri.append(parsed.hostname) if uri: self.__log_name += ' ' + '://'.join(uri) return self.__log_name @property def interactive(self) -> bool|None: return self.__interactive @property def verbose_default(self) -> bool: return self.__verbose_default @abc.abstractmethod async def _get( self, path: str, wd: str|None, throw: bool, verbose: bool|None, title: str ) -> Result: raise NotImplementedError() async def get( self, path: str, wd: str|None = None, throw: bool = True, verbose: bool|None = None, title: str=None, owner: str|None=None, group: str|None=None, mode: str|None=None, ) -> Result: ret = await self._get( self._chroot(path), wd = wd, throw = throw, verbose = verbose, title = title, ) return await self.__in_pipe.run(ret) if self.__in_pipe else ret async def _put( self, path: str, content: bytes, wd: str|None, throw: bool, verbose: bool|None, title: str, owner: str|None, group: str|None, mode: str|None, atomic: bool, ) -> Result: raise NotImplementedError() async def put( self, path: str, content: str, wd: str|None = None, throw: bool = True, verbose: bool|None = None, title: str = None, owner: str|None = None, group: str|None = None, mode: int|None = None, atomic: bool = False ) -> Result: mode_str = None if mode is None else oct(mode).replace('0o', '0') if self.__out_pipe is not None: content = self.__out_pipe.run(content).stdout return await self._put( self._chroot(path), content, wd = wd, throw = throw, verbose = verbose, title = title, owner = owner, group = group, mode = mode_str, atomic = atomic, ) async def _unlink(self, path: str) -> None: raise NotImplementedError(f'{self.log_name}: unlink("{path}") is not implemented') async def unlink(self, path: str) -> None: return await self._unlink(self._chroot(path)) async def _erase(self, path: str) -> None: raise NotImplementedError(f'{self.log_name}: erase("{path}") is not implemented') async def erase(self, path: str) -> None: return await self._erase(self._chroot(path)) async def _rename(self, src: str, dst: str) -> None: raise NotImplementedError(f'{self.log_name}: rename("{path}") is not implemented') async def rename(self, src: str, dst: str) -> None: return await self._rename(src, dst) async def _mkdir(self, path: str, mode: int) -> None: raise NotImplementedError(f'{self.log_path}: mkdir({path}) is not implemented') async def mkdir(self, path: str, mode: int=0o777) -> None: return await self._mkdir(path, mode) async def _mktemp(self, tmpl: str, directory: bool) -> None: raise NotImplementedError(f'{self.log_name}: mktemp("{path}") is not implemented') async def mktemp(self, tmpl: str, directory: bool=False) -> None: return await self._mktemp(self._chroot(tmpl), directory) async def _chown(self, path: str, owner: str|None, group: str|None) -> None: raise NotImplementedError(f'{self.log_name}: chown("{path}") is not implemented') async def chown(self, path: str, owner: str|None=None, group: str|None=None) -> None: if owner is None and group is None: raise ValueError(f'Tried to change ownership of {path} specifying neither owner nor group') return await self._chown(self._chroot(path), owner, group) async def _chmod(self, path: str, mode: int) -> None: raise NotImplementedError(f'{self.log_name}: chmod("{path}") is not implemented') async def chmod(self, path: str, mode: int) -> None: return await self._chmod(self._chroot(path), mode) async def _stat(self, path: str, follow_symlinks: bool) -> StatResult: raise NotImplementedError(f'{self.log_name}: lstat("{path}") is not implemented') async def stat(self, path: str, follow_symlinks: bool=True) -> StatResult: if not isinstance(path, str): raise TypeError(f"path must be str, got {type(path).__name__}") return await self._stat(self._chroot(path), follow_symlinks) async def _file_exists(self, path: str) -> bool: try: await self._stat(path, False) except FileNotFoundError as e: log(DEBUG, f'Could not stat file {path} ({str(e)}), ignored') return False except Exception as e: log(ERR, f'Could not stat file {path} ({str(e)}), ignored') raise return True async def file_exists(self, path: str) -> bool: return await self._file_exists(self._chroot(path)) async def _is_dir(self, path: str) -> bool: try: return S_ISDIR(await self._stat(path).st_mode) except NotImplementedError: log(DEBUG, f'{self.log_name} doesn\'t implement stat(), judging by trailing slash if {path} is a directory') return path[-1] == '/' except FileNotFoundError as e: log(DEBUG, f'{self.log_name}: Failed to stat({path}) ({str(e)})') return False except Exception as e: log(ERR, f'{self.log_name}: Failed to stat({path}) ({str(e)})') raise return False async def is_dir(self, path: str) -> bool: return self._is_dir(self._chroot(path)) @classmethod def create(cls, uri: str, *args, **kwargs) -> Self: match cls.schema_from_uri(uri): case 'local' | 'file': from .ec.Local import Local return Local(uri, *args, **kwargs) case 'ssh': from .ec.SSHClient import ssh_client return ssh_client(uri, *args, **kwargs) case 'http' | 'https': from .ec.Curl import Curl return Curl(uri, *args, **kwargs) case _: pass raise Exception(f'Can\'t create file transfer instance for {uri} with unknown schema "{cls.schema_from_uri(uri)}"')