# -*- coding: utf-8 -*- from __future__ import annotations from typing import Self import abc, io import tarfile from tarfile import TarFile from .CopyContext import CopyContext from .FileContext import FileContext from .log import * class TarIo(CopyContext): def __init__(self, src_uri: str, dst_uri: str) -> None: super().__init__(src_uri=src_uri, dst_uri=dst_uri, chroot=False) def _match(self, path: str, path_filter: list[str]) -> bool: return path in path_filter def _filter_tar_file(self, blob: bytes, path_filter: list[str]|None=None) -> bytes: ret = io.BytesIO() with tarfile.open(fileobj=ret, mode='w') as tf_out: tf_in = TarFile(fileobj=io.BytesIO(blob)) for info in tf_in.getmembers(): if path_filter is not None and not self._match(info.name, path_filter): continue log(DEBUG, f'Adding {info.name}') buf = tf_in.extractfile(info) tf_out.addfile(info, buf) return ret.getvalue() async def _read_filtered(self, path, path_filter: list[str]|None=None) -> bytes: blob = (await self.src.get(path)).stdout return self._filter_tar_file(blob, path_filter) def _add(self, tf: TarFile, path: str, st: StatResult, contents: bytes) -> None: file_obj = io.BytesIO(contents) info = TarInfo() info.name = path info.mode = st.mode info.uname = st.owner info.gname = st.group info.size = st.size info.atime = st.atime info.mtime = st.mtime info.ctime = st.ctime tf.addfile(info, io.BytesIO(file_obj)) async def _add_from_path(self, src: FileContext, tf: TarFile, path: str) -> None: contents = await src.get(path) st = await self.stat(path) self._add(tf, path, st, contents) @abc.abstractmethod async def _extract(self, root: str|None=None, path_filter: list[str]|None=None) -> None: raise NotImplementedError() async def extract(self, root: str|None=None, path_filter: list[str]|None=None) -> None: return await self._extract(root=root, path_filter=path_filter) @classmethod def create(cls, *args, type: str=None, **kwargs): if type is not None: raise NotImplementedError #return TarIoTarFile(*args, **kwargs) return TarIoTarExec(*args, **kwargs) class TarIoTarFile(TarIo): async def _extract(self, root: str|None=None, path_filter: list[str]|None=None) -> None: tf = TarFile(fileobj=io.BytesIO(await self._read_filtered(self.src.root, path_filter))) for info in tf.getmembers(): log(DEBUG, f'Extracting {info.name}') path = root + '/' + info.name if root else info.name buf = tf.extractfile(info) if buf is None: if info.isdir(): await self.dst.mkdir(path, info.mode) await self.dst.chown(path, info.uname, info.gname) continue raise Exception(f'Can\'t extract unsupported file type of "{path}"') await self.dst.put( path, buf.read(), owner = info.uname, group = info.gname, mode = info.mode, ) class TarIoTarExec(TarIo): async def _extract(self, root: str|None=None, path_filter: list[str]|None=None) -> None: cmd = ['tar'] if root is not None: cmd += ['-C', root] cmd += ['-x', '-f', '-'] await self.dst.run(cmd, cmd_input=await self._read_filtered(self.src.root, path_filter))