jw.pkg.lib.util.run_cmd(): Add output_encoding

Add a parameter "output_encoding" to run_cmd(). The parameter allows
the caller to specify if the output encoding should be detected as is
by passing None (the default), if the output should be returned as
undecoded bytes by passing the special string "bytes", or if the
output should be treated as the encoding with the specified name and
decoded to strings.

Signed-off-by: Jan Lindemann <jan@janware.com>
This commit is contained in:
Jan Lindemann 2026-02-18 13:38:08 +01:00
commit e104fa2e46

View file

@ -27,13 +27,14 @@ def pretty_cmd(cmd: list[str], wd=None):
async def run_cmd(
*args: str,
wd: str | None = None,
wd: str|None = None,
throw: bool = True,
verbose: bool = False,
cmd_input: str|None = None,
env: dict[str, str] | None = None,
title: str=None
) -> tuple[str|None, str|None]:
env: dict[str, str]|None = None,
title: str=None,
output_encoding: str|None = None, # None => unchanged; "bytes" => return raw bytes
) -> tuple[str|bytes|None, str|bytes|None]:
"""
Run a command asynchronously and return its output
@ -47,11 +48,18 @@ async def run_cmd(
- "mode:interactive" -> Inherit terminal stdin
- "mode:auto" -> Inherit terminal stdin if it is a TTY
- otherwise -> String fed to stdin
output_encoding:
- None -> unchanged behavior (decode stdout via sys.stdout.encoding, stderr via sys.stderr.encoding)
- "bytes" -> return raw bytes instead of decoded strings
- otherwise -> decode stdout/stderr using this encoding
Returns:
(stdout, stderr), each as a string or None
(stdout, stderr), each as a string/bytes or None
In PTY mode stderr is always None because PTY merges stdout/stderr.
"""
want_bytes = (output_encoding == "bytes")
def __log(prio, *args):
if verbose:
log(prio, "|", *args)
@ -64,13 +72,14 @@ async def run_cmd(
if throw:
raise RuntimeError(msg)
def __make_pty_reader(collector: list[str], encoding: str, verbose: bool):
def __make_pty_reader(collector: list[bytes], enc_for_verbose: str):
def _read(fd):
data = os.read(fd, 1024)
if not data:
return data
text = data.decode(encoding, errors="replace")
collector.append(text)
collector.append(data)
if verbose:
__log(NOTICE, data.decode(enc_for_verbose, errors="replace").rstrip("\n"))
return data
return _read
@ -95,14 +104,16 @@ async def run_cmd(
import pty
stdout_chunks: list[str] = []
stdout_chunks_b: list[bytes] = []
reader = __make_pty_reader(
stdout_chunks,
sys.stdout.encoding or "utf-8",
verbose,
enc_for_verbose = (
(sys.stdout.encoding or "utf-8")
if output_encoding in (None, "bytes")
else output_encoding
)
reader = __make_pty_reader(stdout_chunks_b, enc_for_verbose)
def _spawn():
# Apply env in PTY mode by temporarily updating os.environ around spawn.
if env:
@ -118,7 +129,13 @@ async def run_cmd(
__check_exit_code(await asyncio.to_thread(_spawn))
# PTY merges stdout/stderr
return "".join(stdout_chunks), None
stdout_b = b"".join(stdout_chunks_b) if stdout_chunks_b else None
if want_bytes:
return stdout_b, None
stdout_dec_enc = (sys.stdout.encoding or "utf-8") if output_encoding is None else output_encoding
stdout_s = stdout_b.decode(stdout_dec_enc, errors="replace") if stdout_b is not None else None
return stdout_s, None
# -- non-interactive mode
stdin = (
@ -135,24 +152,32 @@ async def run_cmd(
env=env,
)
stdout_chunks: list[str] = []
stderr_chunks: list[str] = []
stdout_parts_b: list[bytes] = []
stderr_parts_b: list[bytes] = []
async def read_stream(stream, prio, collector, encoding):
# -- decoding for verbose output in pipe mode
if output_encoding is None or want_bytes:
stdout_log_enc = sys.stdout.encoding or "utf-8"
stderr_log_enc = sys.stderr.encoding or "utf-8"
else:
stdout_log_enc = output_encoding
stderr_log_enc = output_encoding
async def read_stream(stream, prio, collector: list[bytes], log_enc: str):
while True:
line = await stream.readline()
if not line:
break
text = line.decode(encoding, errors="replace")
collector.append(text)
__log(prio, text.rstrip("\n"))
collector.append(line)
if verbose:
__log(prio, line.decode(log_enc, errors="replace").rstrip("\n"))
tasks = [
asyncio.create_task(
read_stream(proc.stdout, NOTICE, stdout_chunks, sys.stdout.encoding or "utf-8")
read_stream(proc.stdout, NOTICE, stdout_parts_b, stdout_log_enc)
),
asyncio.create_task(
read_stream(proc.stderr, ERR, stderr_chunks, sys.stderr.encoding or "utf-8")
read_stream(proc.stderr, ERR, stderr_parts_b, stderr_log_enc)
),
]
@ -165,10 +190,22 @@ async def run_cmd(
await asyncio.gather(*tasks)
__check_exit_code(exit_code)
return (
"".join(stdout_chunks) if stdout_chunks else None,
"".join(stderr_chunks) if stderr_chunks else None,
)
stdout_b = b"".join(stdout_parts_b) if stdout_parts_b else None
stderr_b = b"".join(stderr_parts_b) if stderr_parts_b else None
if want_bytes:
return stdout_b, stderr_b
if output_encoding is None:
stdout_dec_enc = sys.stdout.encoding or "utf-8"
stderr_dec_enc = sys.stderr.encoding or "utf-8"
else:
stdout_dec_enc = output_encoding
stderr_dec_enc = output_encoding
stdout_s = stdout_b.decode(stdout_dec_enc, errors="replace") if stdout_b is not None else None
stderr_s = stderr_b.decode(stderr_dec_enc, errors="replace") if stderr_b is not None else None
return stdout_s, stderr_s
finally:
if cwd is not None: