"""Utility functions for shell execution, logging, and data manipulation."""
import datetime
import logging
import os
import shlex
import subprocess
import sys
from collections.abc import Iterable, Iterator
from typing import Any
import chorelib
logger = logging.getLogger(__name__)
[docs]
def flatten(seq: Any, ignore_none: bool = True) -> Iterator[Any]:
"""Recursively flatten nested iterables into a single sequence.
Strings are treated as atomic values and are not iterated over.
Args:
seq: A value or nested iterable to flatten.
ignore_none: If True, None values are skipped.
Yields:
Individual non-iterable elements from the input.
"""
if isinstance(seq, str) or (not isinstance(seq, Iterable)):
yield seq
return
for item in seq:
if isinstance(item, str) or (not isinstance(item, Iterable)):
if ignore_none and (item is None):
continue
yield item
else:
yield from flatten(item)
def unique_list(lst: Iterable[Any]) -> list[Any]:
"""Return a list with duplicates removed, preserving insertion order.
Args:
lst: An iterable of hashable elements.
Returns:
A list containing only the first occurrence of each element.
"""
return list({e: None for e in lst}.keys())
def to_timestamp(ts: Any) -> Any:
"""Convert a value to a numeric timestamp.
Args:
ts: A datetime object or numeric timestamp. If a naive datetime
(without tzinfo) is given, it is treated as local time zone.
Returns:
A float timestamp, or the original value if not a datetime.
"""
match ts:
case datetime.datetime():
if not ts.tzinfo:
return ts.astimezone().timestamp()
return ts.timestamp()
case _:
return ts
[docs]
def shell(
*cmd: Any,
cwd: str | None = None,
text: bool = True,
check: bool = True,
capture: bool = False,
env: dict[str, str] | None = None,
echo: bool = True,
) -> str | None:
"""Execute a command string via the system shell.
Multiple arguments are joined with spaces into a single shell command line.
Args:
*cmd: Command string(s) or objects to execute. Nested sequences
are recursively flattened and each element is converted to
a string, then joined with spaces into a single command line.
cwd: Working directory for the command.
text: If True, decode stdout/stderr as text.
check: If True, raise CalledProcessError on non-zero exit.
capture: If True, capture and return stdout.
env: Environment variables for the subprocess.
echo: If True, print the command to stderr.
Returns:
The captured stdout string if capture=True, otherwise None.
"""
cmdstr: str
match cmd:
case Iterable():
cmdstr = " ".join(str(c) for c in flatten(cmd))
case _:
cmdstr = str(cmd)
stdout: int | None = None
if capture:
stdout = subprocess.PIPE
if echo:
message(cmdstr, 0)
ret = subprocess.run(
cmdstr, cwd=cwd, shell=True, stdout=stdout, text=text, check=check, env=env
)
if stdout:
return ret.stdout # type: ignore[no-any-return]
return None
[docs]
def command(
*cmd: Any,
cwd: str | None = None,
text: bool = True,
check: bool = True,
capture: bool = False,
env: dict[str, str] | None = None,
echo: bool = True,
) -> str | None:
"""Execute a command directly without a shell (no shell interpretation).
Each argument is passed as a separate element in the argument list,
avoiding shell injection risks.
Args:
*cmd: Command and arguments to execute. Nested sequences are
recursively flattened and each element is converted to a
string before being passed to the subprocess.
cwd: Working directory for the command.
text: If True, decode stdout/stderr as text.
check: If True, raise CalledProcessError on non-zero exit.
capture: If True, capture and return stdout.
env: Environment variables for the subprocess.
echo: If True, print the command to stderr.
Returns:
The captured stdout string if capture=True, otherwise None.
"""
stdout: int | None = None
if capture:
stdout = subprocess.PIPE
cmdlist: list[str] = [str(c) for c in flatten(cmd)]
if echo:
message(shlex.join(cmdlist))
ret = subprocess.run(
cmdlist, cwd=cwd, shell=False, stdout=stdout, text=text, check=check, env=env
)
if stdout:
return ret.stdout # type: ignore[no-any-return]
return None
[docs]
def message(msg: str, level: int = 0) -> None:
"""Print a message to stderr if the current verbosity level is sufficient.
Args:
msg: The message string to display.
level: Minimum verbosity level required to show the message.
The message is printed only when ``chorelib.verbose >= level``.
"""
if level <= chorelib.verbose:
print(msg, file=sys.stderr)
class chdir:
"""Non thread-safe context manager to change the current working directory."""
def __init__(self, path: str | os.PathLike[str]) -> None:
self.path = path
self._old_cwd: list[str] = []
def __enter__(self) -> None:
self._old_cwd.append(os.getcwd())
os.chdir(self.path)
def __exit__(self, *excinfo: object) -> None:
os.chdir(self._old_cwd.pop())