From 4eb13ae3f0acace660a690b5dea88b927ab90ac8 Mon Sep 17 00:00:00 2001 From: John Mulligan Date: Wed, 16 Aug 2023 15:23:42 -0400 Subject: [PATCH] cephadm: move command call functions to call_wrappers.py Signed-off-by: John Mulligan Pair-programmed-with: Adam King Co-authored-by: Adam King Signed-off-by: John Mulligan --- src/cephadm/cephadm.py | 297 +---------------------- src/cephadm/cephadmlib/call_wrappers.py | 299 ++++++++++++++++++++++++ src/cephadm/tests/fixtures.py | 2 + src/cephadm/tests/test_agent.py | 3 + src/cephadm/tests/test_cephadm.py | 10 +- 5 files changed, 320 insertions(+), 291 deletions(-) create mode 100644 src/cephadm/cephadmlib/call_wrappers.py diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py index fbb48a80ab7e7..9a89bac6e7cf0 100755 --- a/src/cephadm/cephadm.py +++ b/src/cephadm/cephadm.py @@ -1,7 +1,5 @@ #!/usr/bin/python3 -import asyncio -import asyncio.subprocess import argparse import datetime import fcntl @@ -26,7 +24,7 @@ import errno import struct import ssl from enum import Enum -from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable, TextIO, Generator +from typing import Dict, List, Tuple, Optional, Union, Any, Callable, IO, Sequence, TypeVar, cast, Set, Iterable, TextIO, Generator import re import uuid @@ -95,10 +93,17 @@ from cephadmlib.exceptions import ( ClusterAlreadyExists, Error, PortOccupiedError, - TimeoutExpired, UnauthorizedRegistryError, ) from cephadmlib.exe_utils import find_executable, find_program +from cephadmlib.call_wrappers import ( + CallVerbosity, + async_run, + call, + call_throws, + call_timeout, + concurrent_tasks, +) FuncT = TypeVar('FuncT', bound=Callable) @@ -131,22 +136,6 @@ cached_stdin = None ################################## -async def run_func(func: Callable, cmd: str) -> subprocess.CompletedProcess: - logger.debug(f'running function {func.__name__}, with parms: {cmd}') - response = func(cmd) - return response - - -async def concurrent_tasks(func: Callable, cmd_list: List[str]) -> List[Any]: - tasks = [] - for cmd in cmd_list: - tasks.append(run_func(func, cmd)) - - data = await asyncio.gather(*tasks) - - return data - - class EndPoint: """EndPoint representing an ip:port format""" @@ -1764,274 +1753,6 @@ class FileLock(object): return None -################################## -# Popen wrappers, lifted from ceph-volume - -class CallVerbosity(Enum): - ##### - # Format: - # Normal Operation: , Errors: - # - # NOTE: QUIET log level is custom level only used when --verbose is passed - ##### - - # Normal Operation: None, Errors: None - SILENT = 0 - # Normal Operation: QUIET, Error: QUIET - QUIET = 1 - # Normal Operation: DEBUG, Error: DEBUG - DEBUG = 2 - # Normal Operation: QUIET, Error: INFO - QUIET_UNLESS_ERROR = 3 - # Normal Operation: DEBUG, Error: INFO - VERBOSE_ON_FAILURE = 4 - # Normal Operation: INFO, Error: INFO - VERBOSE = 5 - - def success_log_level(self) -> int: - _verbosity_level_to_log_level = { - self.SILENT: 0, - self.QUIET: QUIET_LOG_LEVEL, - self.DEBUG: logging.DEBUG, - self.QUIET_UNLESS_ERROR: QUIET_LOG_LEVEL, - self.VERBOSE_ON_FAILURE: logging.DEBUG, - self.VERBOSE: logging.INFO - } - return _verbosity_level_to_log_level[self] # type: ignore - - def error_log_level(self) -> int: - _verbosity_level_to_log_level = { - self.SILENT: 0, - self.QUIET: QUIET_LOG_LEVEL, - self.DEBUG: logging.DEBUG, - self.QUIET_UNLESS_ERROR: logging.INFO, - self.VERBOSE_ON_FAILURE: logging.INFO, - self.VERBOSE: logging.INFO - } - return _verbosity_level_to_log_level[self] # type: ignore - - -# disable coverage for the next block. this is copy-n-paste -# from other code for compatibilty on older python versions -if sys.version_info < (3, 8): # pragma: no cover - import itertools - import threading - import warnings - from asyncio import events - - class ThreadedChildWatcher(asyncio.AbstractChildWatcher): - """Threaded child watcher implementation. - The watcher uses a thread per process - for waiting for the process finish. - It doesn't require subscription on POSIX signal - but a thread creation is not free. - The watcher has O(1) complexity, its performance doesn't depend - on amount of spawn processes. - """ - - def __init__(self) -> None: - self._pid_counter = itertools.count(0) - self._threads: Dict[Any, Any] = {} - - def is_active(self) -> bool: - return True - - def close(self) -> None: - self._join_threads() - - def _join_threads(self) -> None: - """Internal: Join all non-daemon threads""" - threads = [thread for thread in list(self._threads.values()) - if thread.is_alive() and not thread.daemon] - for thread in threads: - thread.join() - - def __enter__(self) -> Any: - return self - - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - pass - - def __del__(self, _warn: Any = warnings.warn) -> None: - threads = [thread for thread in list(self._threads.values()) - if thread.is_alive()] - if threads: - _warn(f'{self.__class__} has registered but not finished child processes', - ResourceWarning, - source=self) - - def add_child_handler(self, pid: Any, callback: Any, *args: Any) -> None: - loop = events.get_event_loop() - thread = threading.Thread(target=self._do_waitpid, - name=f'waitpid-{next(self._pid_counter)}', - args=(loop, pid, callback, args), - daemon=True) - self._threads[pid] = thread - thread.start() - - def remove_child_handler(self, pid: Any) -> bool: - # asyncio never calls remove_child_handler() !!! - # The method is no-op but is implemented because - # abstract base classe requires it - return True - - def attach_loop(self, loop: Any) -> None: - pass - - def _do_waitpid(self, loop: Any, expected_pid: Any, callback: Any, args: Any) -> None: - assert expected_pid > 0 - - try: - pid, status = os.waitpid(expected_pid, 0) - except ChildProcessError: - # The child process is already reaped - # (may happen if waitpid() is called elsewhere). - pid = expected_pid - returncode = 255 - logger.warning( - 'Unknown child process pid %d, will report returncode 255', - pid) - else: - if os.WIFEXITED(status): - returncode = os.WEXITSTATUS(status) - elif os.WIFSIGNALED(status): - returncode = -os.WTERMSIG(status) - else: - raise ValueError(f'unknown wait status {status}') - if loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) - - if loop.is_closed(): - logger.warning('Loop %r that handles pid %r is closed', loop, pid) - else: - loop.call_soon_threadsafe(callback, pid, returncode, *args) - - self._threads.pop(expected_pid) - - # unlike SafeChildWatcher which handles SIGCHLD in the main thread, - # ThreadedChildWatcher runs in a separated thread, hence allows us to - # run create_subprocess_exec() in non-main thread, see - # https://bugs.python.org/issue35621 - asyncio.set_child_watcher(ThreadedChildWatcher()) - - -try: - from asyncio import run as async_run # type: ignore[attr-defined] -except ImportError: # pragma: no cover - # disable coverage for this block. it should be a copy-n-paste from - # from newer libs for compatibilty on older python versions - def async_run(coro): # type: ignore - loop = asyncio.new_event_loop() - try: - asyncio.set_event_loop(loop) - return loop.run_until_complete(coro) - finally: - try: - loop.run_until_complete(loop.shutdown_asyncgens()) - finally: - asyncio.set_event_loop(None) - loop.close() - - -def call(ctx: CephadmContext, - command: List[str], - desc: Optional[str] = None, - verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, - timeout: Optional[int] = DEFAULT_TIMEOUT, - **kwargs: Any) -> Tuple[str, str, int]: - """ - Wrap subprocess.Popen to - - - log stdout/stderr to a logger, - - decode utf-8 - - cleanly return out, err, returncode - - :param timeout: timeout in seconds - """ - - prefix = command[0] if desc is None else desc - if prefix: - prefix += ': ' - timeout = timeout or ctx.timeout - - async def run_with_timeout() -> Tuple[str, str, int]: - process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=os.environ.copy()) - assert process.stdout - assert process.stderr - try: - stdout, stderr = await asyncio.wait_for( - process.communicate(), - timeout, - ) - except asyncio.TimeoutError: - # try to terminate the process assuming it is still running. It's - # possible that even after killing the process it will not - # complete, particularly if it is D-state. If that happens the - # process.wait call will block, but we're no worse off than before - # when the timeout did not work. Additionally, there are other - # corner-cases we could try and handle here but we decided to start - # simple. - process.kill() - await process.wait() - logger.info(prefix + f'timeout after {timeout} seconds') - return '', '', 124 - else: - assert process.returncode is not None - return ( - stdout.decode('utf-8'), - stderr.decode('utf-8'), - process.returncode, - ) - - stdout, stderr, returncode = async_run(run_with_timeout()) - log_level = verbosity.success_log_level() - if returncode != 0: - log_level = verbosity.error_log_level() - logger.log(log_level, f'Non-zero exit code {returncode} from {" ".join(command)}') - for line in stdout.splitlines(): - logger.log(log_level, prefix + 'stdout ' + line) - for line in stderr.splitlines(): - logger.log(log_level, prefix + 'stderr ' + line) - return stdout, stderr, returncode - - -def call_throws( - ctx: CephadmContext, - command: List[str], - desc: Optional[str] = None, - verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, - timeout: Optional[int] = DEFAULT_TIMEOUT, - **kwargs: Any) -> Tuple[str, str, int]: - out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs) - if ret: - for s in (out, err): - if s.strip() and len(s.splitlines()) <= 2: # readable message? - raise RuntimeError(f'Failed command: {" ".join(command)}: {s}') - raise RuntimeError('Failed command: %s' % ' '.join(command)) - return out, err, ret - - -def call_timeout(ctx, command, timeout): - # type: (CephadmContext, List[str], int) -> int - logger.debug('Running command (timeout=%s): %s' - % (timeout, ' '.join(command))) - - def raise_timeout(command, timeout): - # type: (List[str], int) -> NoReturn - msg = 'Command `%s` timed out after %s seconds' % (command, timeout) - logger.debug(msg) - raise TimeoutExpired(msg) - - try: - return subprocess.call(command, timeout=timeout, env=os.environ.copy()) - except subprocess.TimeoutExpired: - raise_timeout(command, timeout) - ################################## diff --git a/src/cephadm/cephadmlib/call_wrappers.py b/src/cephadm/cephadmlib/call_wrappers.py new file mode 100644 index 0000000000000..d2881027bae98 --- /dev/null +++ b/src/cephadm/cephadmlib/call_wrappers.py @@ -0,0 +1,299 @@ +# call_wrappers.py - functions to wrap calling external commands + + +import asyncio +import logging +import os +import subprocess +import sys + +from enum import Enum +from typing import Callable, List, Dict, Optional, Any, Tuple, NoReturn + +from .constants import QUIET_LOG_LEVEL, DEFAULT_TIMEOUT +from .context import CephadmContext +from .exceptions import TimeoutExpired + +logger = logging.getLogger() + + +async def run_func(func: Callable, cmd: str) -> subprocess.CompletedProcess: + logger.debug(f'running function {func.__name__}, with parms: {cmd}') + response = func(cmd) + return response + + +async def concurrent_tasks(func: Callable, cmd_list: List[str]) -> List[Any]: + tasks = [] + for cmd in cmd_list: + tasks.append(run_func(func, cmd)) + + data = await asyncio.gather(*tasks) + + return data + + +class CallVerbosity(Enum): + ##### + # Format: + # Normal Operation: , Errors: + # + # NOTE: QUIET log level is custom level only used when --verbose is passed + ##### + + # Normal Operation: None, Errors: None + SILENT = 0 + # Normal Operation: QUIET, Error: QUIET + QUIET = 1 + # Normal Operation: DEBUG, Error: DEBUG + DEBUG = 2 + # Normal Operation: QUIET, Error: INFO + QUIET_UNLESS_ERROR = 3 + # Normal Operation: DEBUG, Error: INFO + VERBOSE_ON_FAILURE = 4 + # Normal Operation: INFO, Error: INFO + VERBOSE = 5 + + def success_log_level(self) -> int: + _verbosity_level_to_log_level = { + self.SILENT: 0, + self.QUIET: QUIET_LOG_LEVEL, + self.DEBUG: logging.DEBUG, + self.QUIET_UNLESS_ERROR: QUIET_LOG_LEVEL, + self.VERBOSE_ON_FAILURE: logging.DEBUG, + self.VERBOSE: logging.INFO + } + return _verbosity_level_to_log_level[self] # type: ignore + + def error_log_level(self) -> int: + _verbosity_level_to_log_level = { + self.SILENT: 0, + self.QUIET: QUIET_LOG_LEVEL, + self.DEBUG: logging.DEBUG, + self.QUIET_UNLESS_ERROR: logging.INFO, + self.VERBOSE_ON_FAILURE: logging.INFO, + self.VERBOSE: logging.INFO + } + return _verbosity_level_to_log_level[self] # type: ignore + + +# disable coverage for the next block. this is copy-n-paste +# from other code for compatibilty on older python versions +if sys.version_info < (3, 8): # pragma: no cover + import itertools + import threading + import warnings + from asyncio import events + + class ThreadedChildWatcher(asyncio.AbstractChildWatcher): + """Threaded child watcher implementation. + The watcher uses a thread per process + for waiting for the process finish. + It doesn't require subscription on POSIX signal + but a thread creation is not free. + The watcher has O(1) complexity, its performance doesn't depend + on amount of spawn processes. + """ + + def __init__(self) -> None: + self._pid_counter = itertools.count(0) + self._threads: Dict[Any, Any] = {} + + def is_active(self) -> bool: + return True + + def close(self) -> None: + self._join_threads() + + def _join_threads(self) -> None: + """Internal: Join all non-daemon threads""" + threads = [thread for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon] + for thread in threads: + thread.join() + + def __enter__(self) -> Any: + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + pass + + def __del__(self, _warn: Any = warnings.warn) -> None: + threads = [thread for thread in list(self._threads.values()) + if thread.is_alive()] + if threads: + _warn(f'{self.__class__} has registered but not finished child processes', + ResourceWarning, + source=self) + + def add_child_handler(self, pid: Any, callback: Any, *args: Any) -> None: + loop = events.get_event_loop() + thread = threading.Thread(target=self._do_waitpid, + name=f'waitpid-{next(self._pid_counter)}', + args=(loop, pid, callback, args), + daemon=True) + self._threads[pid] = thread + thread.start() + + def remove_child_handler(self, pid: Any) -> bool: + # asyncio never calls remove_child_handler() !!! + # The method is no-op but is implemented because + # abstract base classe requires it + return True + + def attach_loop(self, loop: Any) -> None: + pass + + def _do_waitpid(self, loop: Any, expected_pid: Any, callback: Any, args: Any) -> None: + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, 0) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + 'Unknown child process pid %d, will report returncode 255', + pid) + else: + if os.WIFEXITED(status): + returncode = os.WEXITSTATUS(status) + elif os.WIFSIGNALED(status): + returncode = -os.WTERMSIG(status) + else: + raise ValueError(f'unknown wait status {status}') + if loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) + + if loop.is_closed(): + logger.warning('Loop %r that handles pid %r is closed', loop, pid) + else: + loop.call_soon_threadsafe(callback, pid, returncode, *args) + + self._threads.pop(expected_pid) + + # unlike SafeChildWatcher which handles SIGCHLD in the main thread, + # ThreadedChildWatcher runs in a separated thread, hence allows us to + # run create_subprocess_exec() in non-main thread, see + # https://bugs.python.org/issue35621 + asyncio.set_child_watcher(ThreadedChildWatcher()) + + +try: + from asyncio import run as async_run # type: ignore[attr-defined] +except ImportError: # pragma: no cover + # disable coverage for this block. it should be a copy-n-paste from + # from newer libs for compatibilty on older python versions + def async_run(coro): # type: ignore + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + return loop.run_until_complete(coro) + finally: + try: + loop.run_until_complete(loop.shutdown_asyncgens()) + finally: + asyncio.set_event_loop(None) + loop.close() + + +def call(ctx: CephadmContext, + command: List[str], + desc: Optional[str] = None, + verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, + timeout: Optional[int] = DEFAULT_TIMEOUT, + **kwargs: Any) -> Tuple[str, str, int]: + """ + Wrap subprocess.Popen to + + - log stdout/stderr to a logger, + - decode utf-8 + - cleanly return out, err, returncode + + :param timeout: timeout in seconds + """ + + prefix = command[0] if desc is None else desc + if prefix: + prefix += ': ' + timeout = timeout or ctx.timeout + + async def run_with_timeout() -> Tuple[str, str, int]: + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=os.environ.copy()) + assert process.stdout + assert process.stderr + try: + stdout, stderr = await asyncio.wait_for( + process.communicate(), + timeout, + ) + except asyncio.TimeoutError: + # try to terminate the process assuming it is still running. It's + # possible that even after killing the process it will not + # complete, particularly if it is D-state. If that happens the + # process.wait call will block, but we're no worse off than before + # when the timeout did not work. Additionally, there are other + # corner-cases we could try and handle here but we decided to start + # simple. + process.kill() + await process.wait() + logger.info(prefix + f'timeout after {timeout} seconds') + return '', '', 124 + else: + assert process.returncode is not None + return ( + stdout.decode('utf-8'), + stderr.decode('utf-8'), + process.returncode, + ) + + stdout, stderr, returncode = async_run(run_with_timeout()) + log_level = verbosity.success_log_level() + if returncode != 0: + log_level = verbosity.error_log_level() + logger.log(log_level, f'Non-zero exit code {returncode} from {" ".join(command)}') + for line in stdout.splitlines(): + logger.log(log_level, prefix + 'stdout ' + line) + for line in stderr.splitlines(): + logger.log(log_level, prefix + 'stderr ' + line) + return stdout, stderr, returncode + + +def call_throws( + ctx: CephadmContext, + command: List[str], + desc: Optional[str] = None, + verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, + timeout: Optional[int] = DEFAULT_TIMEOUT, + **kwargs: Any) -> Tuple[str, str, int]: + out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs) + if ret: + for s in (out, err): + if s.strip() and len(s.splitlines()) <= 2: # readable message? + raise RuntimeError(f'Failed command: {" ".join(command)}: {s}') + raise RuntimeError('Failed command: %s' % ' '.join(command)) + return out, err, ret + + +def call_timeout(ctx, command, timeout): + # type: (CephadmContext, List[str], int) -> int + logger.debug('Running command (timeout=%s): %s' + % (timeout, ' '.join(command))) + + def raise_timeout(command, timeout): + # type: (List[str], int) -> NoReturn + msg = 'Command `%s` timed out after %s seconds' % (command, timeout) + logger.debug(msg) + raise TimeoutExpired(msg) + + try: + return subprocess.call(command, timeout=timeout, env=os.environ.copy()) + except subprocess.TimeoutExpired: + raise_timeout(command, timeout) diff --git a/src/cephadm/tests/fixtures.py b/src/cephadm/tests/fixtures.py index 266b7e0894d56..83cddf331d8ac 100644 --- a/src/cephadm/tests/fixtures.py +++ b/src/cephadm/tests/fixtures.py @@ -146,6 +146,8 @@ def with_cephadm_ctx( _cephadm = import_cephadm() with mock.patch('cephadm.attempt_bind'), \ + mock.patch('cephadmlib.call_wrappers.call', return_value=('', '', 0)), \ + mock.patch('cephadmlib.call_wrappers.call_timeout', return_value=0), \ mock.patch('cephadm.call', return_value=('', '', 0)), \ mock.patch('cephadm.call_timeout', return_value=0), \ mock.patch('cephadmlib.exe_utils.find_executable', return_value='foo'), \ diff --git a/src/cephadm/tests/test_agent.py b/src/cephadm/tests/test_agent.py index f9cf201e27527..38c35e3558307 100644 --- a/src/cephadm/tests/test_agent.py +++ b/src/cephadm/tests/test_agent.py @@ -34,6 +34,9 @@ def _check_file(path, content): assert fcontent == content +# FIXME(refactor): call is handled by with_cephadm_ctx but not call_throws +# this leaves the test somewhat inconsistent and slightly confusing but we +# are not going to change this while we break cephadm up into multiple files. @mock.patch('cephadm.call_throws') def test_agent_deploy_daemon_unit(_call_throws, cephadm_fs): _call_throws.return_value = ('', '', 0) diff --git a/src/cephadm/tests/test_cephadm.py b/src/cephadm/tests/test_cephadm.py index 9c59958c9184b..8722bceda521a 100644 --- a/src/cephadm/tests/test_cephadm.py +++ b/src/cephadm/tests/test_cephadm.py @@ -1039,7 +1039,7 @@ class TestCephAdm(object): infer_config(ctx) assert ctx.config == result - @mock.patch('cephadm.call') + @mock.patch('cephadmlib.call_wrappers.call') def test_extract_uid_gid_fail(self, _call): err = """Error: container_linux.go:370: starting container process caused: process_linux.go:459: container init caused: process_linux.go:422: setting cgroup config for procHooks process caused: Unit libpod-056038e1126191fba41d8a037275136f2d7aeec9710b9ee ff792c06d8544b983.scope not found.: OCI runtime error""" @@ -2088,10 +2088,14 @@ class TestValidateRepo: class TestPull: @mock.patch('time.sleep') - @mock.patch('cephadm.call', return_value=('', '', 0)) @mock.patch('cephadm.get_image_info_from_inspect', return_value={}) @mock.patch('cephadm.logger') - def test_error(self, _logger, _get_image_info_from_inspect, _call, _sleep): + def test_error(self, _logger, _get_image_info_from_inspect, _sleep, monkeypatch): + # manually create a mock and use pytest's monkeypatch fixture to set + # multiple targets to the *same* mock + _call = mock.MagicMock() + monkeypatch.setattr('cephadm.call', _call) + monkeypatch.setattr('cephadmlib.call_wrappers.call', _call) ctx = _cephadm.CephadmContext() ctx.container_engine = mock_podman() ctx.insecure = False -- 2.39.5