cmds.secrets.lib.tar: Add module
Add a module cmds.secrets.lib.tar. Secrets handling demands treating tar archive members more individually, jw.pkg.lib.TarIo is not a good fit for that, so try with a different module. To be merged eventually.
Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
parent
5bb7fe96db
commit
530efa1427
1 changed files with 61 additions and 0 deletions
61
src/python/jw/pkg/cmds/secrets/lib/tar.py
Normal file
61
src/python/jw/pkg/cmds/secrets/lib/tar.py
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Callable
|
||||
|
||||
import tarfile, io
|
||||
from tarfile import TarFile
|
||||
|
||||
from ....lib.log import *
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ....lib.ExecContext import ExecContext
|
||||
|
||||
def filter(blob: bytes, path_filter: Callable[[str], bool]|None, matched: list[str]|None=None) -> bytes:
|
||||
ret = io.BytesIO()
|
||||
with tarfile.open(fileobj=ret, mode='w') as tf_out:
|
||||
tf_in = TarFile(fileobj=io.BytesIO(blob))
|
||||
for info in tf_in.getmembers():
|
||||
if path_filter is not None and not path_filter(info.name):
|
||||
continue
|
||||
log(DEBUG, f'Adding {info.name}')
|
||||
if matched is not None:
|
||||
matched.append(info.name)
|
||||
buf = tf_in.extractfile(info)
|
||||
tf_out.addfile(info, buf)
|
||||
return ret.getvalue()
|
||||
|
||||
def rewrite(blob: bytes, rewrite_filter: Callable[[str], str]) -> bytes:
|
||||
ret = io.BytesIO()
|
||||
with tarfile.open(fileobj=ret, mode='w') as tf_out:
|
||||
tf_in = TarFile(fileobj=io.BytesIO(blob))
|
||||
for info in tf_in.getmembers():
|
||||
new_name = rewrite_filter(info.name)
|
||||
log(DEBUG, f'Rewriting {info.name} -> {new_name}')
|
||||
info.name = new_name
|
||||
buf = tf_in.extractfile(info)
|
||||
tf_out.addfile(info, buf)
|
||||
return ret.getvalue()
|
||||
|
||||
def merge(blobs: Iterable[bytes], overwrite: bool=False) -> bytes:
|
||||
ret = io.BytesIO()
|
||||
with tarfile.open(fileobj=ret, mode='w') as tf_out:
|
||||
for blob in blobs:
|
||||
tf_in = TarFile(fileobj=io.BytesIO(blob))
|
||||
existing_names = tf_out.getnames()
|
||||
for info in tf_in.getmembers():
|
||||
if not overwrite and info.name in existing_names:
|
||||
continue
|
||||
buf = tf_in.extractfile(info)
|
||||
tf_out.addfile(info, buf)
|
||||
return ret.getvalue()
|
||||
|
||||
async def extract(dst: ExecContext, blob: bytes, root: str|None=None, verbose: bool=False) -> None:
|
||||
cmd = ['tar']
|
||||
if root is not None:
|
||||
cmd += ['-C', root]
|
||||
if verbose:
|
||||
cmd += '-v'
|
||||
cmd += ['-x', '-f', '-']
|
||||
await dst.run(cmd, verbose=verbose, cmd_input=blob)
|
||||
Loading…
Add table
Add a link
Reference in a new issue