From: Shweta Bhosale Date: Tue, 23 Dec 2025 09:33:02 +0000 (+0530) Subject: mgr/cephadm: Use 'cephadm exec' to execute bash commands, just commands requires... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a026eb34a26b71f53084c288be388a27c3cb7051;p=ceph.git mgr/cephadm: Use 'cephadm exec' to execute bash commands, just commands requires to deploy cephadm binary and which command should be executed directly Fixes: https://tracker.ceph.com/issues/74045 Signed-off-by: Shweta Bhosale --- diff --git a/src/pybind/mgr/cephadm/offline_watcher.py b/src/pybind/mgr/cephadm/offline_watcher.py index 4aa07e2f584..37c9e3bee58 100644 --- a/src/pybind/mgr/cephadm/offline_watcher.py +++ b/src/pybind/mgr/cephadm/offline_watcher.py @@ -41,7 +41,7 @@ class OfflineHostWatcher(threading.Thread): if host not in self.mgr.offline_hosts: try: rcmd = ssh.RemoteCommand(ssh.Executables.TRUE) - self.mgr.ssh.check_execute_command(host, rcmd, log_command=self.mgr.log_refresh_metadata) + self.mgr.ssh.check_execute_cephadm_exec(host, rcmd, log_command=self.mgr.log_refresh_metadata) except Exception: logger.debug(f'OfflineHostDetector: detected {host} to be offline') # kick serve loop in case corrective action must be taken for offline host diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index 1f1b5c2fd6f..54985d2641f 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -1401,7 +1401,7 @@ class CephadmServe: continue self.log.info(f'Removing {host}:{path}') cmd = ssh.RemoteCommand(ssh.Executables.RM, ['-f', path]) - self.mgr.ssh.check_execute_command(host, cmd) + self.mgr.ssh.check_execute_cephadm_exec(host, cmd) updated_files = True self.mgr.cache.removed_client_file(host, path) if updated_files: @@ -1901,7 +1901,43 @@ class CephadmServe: # Use tee (from coreutils) to create a copy of cephadm on the target machine self.log.info(f"Deploying cephadm binary to {host}") await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path, - self.mgr._cephadm, addr=addr) + self.mgr._cephadm, addr=addr, bypass_cephadm_exec=True) + async def run_cephadm_exec(self, + host: str, + cmd: List[str], + addr: Optional[str] = None, + stdin: Optional[str] = None, + log_output: Optional[bool] = True, + timeout: Optional[int] = None, + ) -> Tuple[str, str, int]: + """ + Execute a bash command on the remote host via 'cephadm exec --command ' + """ + self.log.debug(f"run_cephadm_exec: Executing command on {host}: {cmd}") + + exec_args = ['--command'] + cmd + try: + out, err, code = await self._run_cephadm( + host=host, + entity=cephadmNoImage, + command='exec', + args=exec_args, + addr=addr, + stdin=stdin, + no_fsid=True, # exec doesn't need fsid + error_ok=True, # We'll handle errors at a higher level + log_output=log_output, + timeout=timeout + ) + stdout = out[0] if out else '' + stderr = err[0] if err else '' + if log_output: + self.log.debug(f"run_cephadm_exec result: code={code}, stdout={stdout}, stderr={stderr}") + return stdout, stderr, code + + except Exception as e: + self.log.exception(f"Error executing command via cephadm exec on {host}: {e}") + return '', str(e), 1 def _retry_failed_operations(self) -> None: self.log.debug('_retry_failed_operations') diff --git a/src/pybind/mgr/cephadm/ssh.py b/src/pybind/mgr/cephadm/ssh.py index 1e3cce55b30..ab70b6d902a 100644 --- a/src/pybind/mgr/cephadm/ssh.py +++ b/src/pybind/mgr/cephadm/ssh.py @@ -270,7 +270,6 @@ class SSHManager: addr: Optional[str] = None, log_command: Optional[bool] = True, ) -> Tuple[str, str, int]: - conn = await self._remote_connection(host, addr) # For hosts being added, always use root (no sudo) even if cluster @@ -415,6 +414,68 @@ class SSHManager: with self.mgr.async_timeout_handler(host, " ".join(cmd)): return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command)) + async def _execute_cephadm_exec(self, + host: str, + cmd_components: RemoteCommand, + stdin: Optional[str] = None, + addr: Optional[str] = None, + log_command: Optional[bool] = True, + ) -> Tuple[str, str, int]: + """ + Execute a command on the remote host via 'cephadm exec --command ' + This routes the command through CephadmServe.run_cephadm_exec + """ + if log_command: + logger.debug(f'Executing command via cephadm exec: {cmd_components}') + + cmd_list = list(cmd_components) + + from cephadm.serve import CephadmServe + + out, err, code = await CephadmServe(self.mgr).run_cephadm_exec( + host=host, + cmd=cmd_list, + addr=addr, + stdin=stdin, + log_output=log_command + ) + return out, err, code + + def execute_cephadm_exec(self, + host: str, + cmd: RemoteCommand, + stdin: Optional[str] = None, + addr: Optional[str] = None, + log_command: Optional[bool] = True + ) -> Tuple[str, str, int]: + with self.mgr.async_timeout_handler(host, " ".join(cmd)): + return self.mgr.wait_async(self._execute_cephadm_exec(host, cmd, stdin, addr, log_command)) + + async def _check_execute_cephadm_exec(self, + host: str, + cmd: RemoteCommand, + stdin: Optional[str] = None, + addr: Optional[str] = None, + log_command: Optional[bool] = True + ) -> str: + """Execute a command via cephadm exec and raise error if it fails""" + out, err, code = await self._execute_cephadm_exec(host, cmd, stdin, addr, log_command) + if code != 0: + msg = f'Command {cmd} failed. {err}' + logger.debug(msg) + raise OrchestratorError(msg) + return out + + def check_execute_cephadm_exec(self, + host: str, + cmd: RemoteCommand, + stdin: Optional[str] = None, + addr: Optional[str] = None, + log_command: Optional[bool] = True, + ) -> str: + with self.mgr.async_timeout_handler(host, " ".join(cmd)): + return self.mgr.wait_async(self._check_execute_cephadm_exec(host, cmd, stdin, addr, log_command)) + async def _write_remote_file(self, host: str, path: str, @@ -423,26 +484,33 @@ class SSHManager: uid: Optional[int] = None, gid: Optional[int] = None, addr: Optional[str] = None, + bypass_cephadm_exec: Optional[bool] = False, ) -> None: + """ + Write a file to a remote host. + """ try: cephadm_tmp_dir = f"/tmp/cephadm-{self.mgr._cluster_fsid}" dirname = os.path.dirname(path) + + # Choose execution method based on bypass flag + execute_method = self._check_execute_command if bypass_cephadm_exec else self._check_execute_cephadm_exec mkdir = RemoteCommand(Executables.MKDIR, ['-p', dirname]) - await self._check_execute_command(host, mkdir, addr=addr) + await execute_method(host, mkdir, addr=addr) mkdir2 = RemoteCommand(Executables.MKDIR, ['-p', cephadm_tmp_dir + dirname]) - await self._check_execute_command(host, mkdir2, addr=addr) + await execute_method(host, mkdir2, addr=addr) tmp_path = cephadm_tmp_dir + path + '.new' touch = RemoteCommand(Executables.TOUCH, [tmp_path]) - await self._check_execute_command(host, touch, addr=addr) + await execute_method(host, touch, addr=addr) if self.mgr.ssh_user != 'root': assert self.mgr.ssh_user chown = RemoteCommand( Executables.CHOWN, ['-R', self.mgr.ssh_user, cephadm_tmp_dir] ) - await self._check_execute_command(host, chown, addr=addr) + await execute_method(host, chown, addr=addr) chmod = RemoteCommand(Executables.CHMOD, [str(644), tmp_path]) - await self._check_execute_command(host, chmod, addr=addr) + await execute_method(host, chmod, addr=addr) with NamedTemporaryFile(prefix='cephadm-write-remote-file-') as f: os.fchmod(f.fileno(), 0o600) f.write(content) @@ -456,11 +524,11 @@ class SSHManager: Executables.CHOWN, ['-R', str(uid) + ':' + str(gid), tmp_path] ) - await self._check_execute_command(host, chown, addr=addr) + await execute_method(host, chown, addr=addr) chmod = RemoteCommand(Executables.CHMOD, [oct(mode)[2:], tmp_path]) - await self._check_execute_command(host, chmod, addr=addr) + await execute_method(host, chmod, addr=addr) mv = RemoteCommand(Executables.MV, ['-Z', tmp_path, path]) - await self._check_execute_command(host, mv, addr=addr) + await execute_method(host, mv, addr=addr) except Exception as e: msg = f"Unable to write {host}:{path}: {e}" logger.exception(msg) @@ -474,10 +542,11 @@ class SSHManager: uid: Optional[int] = None, gid: Optional[int] = None, addr: Optional[str] = None, + bypass_cephadm_exec: Optional[bool] = False, ) -> None: with self.mgr.async_timeout_handler(host, f'writing file {path}'): self.mgr.wait_async(self._write_remote_file( - host, path, content, mode, uid, gid, addr)) + host, path, content, mode, uid, gid, addr, bypass_cephadm_exec)) async def _reset_con(self, host: str) -> None: conn = self.cons.get(host) diff --git a/src/pybind/mgr/cephadm/tests/test_cephadm.py b/src/pybind/mgr/cephadm/tests/test_cephadm.py index b97b7f175f2..9bc1148941a 100644 --- a/src/pybind/mgr/cephadm/tests/test_cephadm.py +++ b/src/pybind/mgr/cephadm/tests/test_cephadm.py @@ -2277,9 +2277,9 @@ class TestCephadm(object): CephadmServe(cephadm_module)._write_all_client_files() # Make sure both ceph conf locations (default and per fsid) are called _write_file.assert_has_calls([mock.call('test', '/etc/ceph/ceph.conf', b'', - 0o644, 0, 0, None), + 0o644, 0, 0, None, False), mock.call('test', '/var/lib/ceph/fsid/config/ceph.conf', b'', - 0o644, 0, 0, None)] + 0o644, 0, 0, None, False)] ) ceph_conf_files = cephadm_module.cache.get_host_client_files('test') assert len(ceph_conf_files) == 2 @@ -2291,10 +2291,10 @@ class TestCephadm(object): CephadmServe(cephadm_module)._write_all_client_files() _write_file.assert_has_calls([mock.call('test', '/etc/ceph/ceph.conf', - b'[mon]\nk=v\n', 0o644, 0, 0, None), + b'[mon]\nk=v\n', 0o644, 0, 0, None, False), mock.call('test', '/var/lib/ceph/fsid/config/ceph.conf', - b'[mon]\nk=v\n', 0o644, 0, 0, None)]) + b'[mon]\nk=v\n', 0o644, 0, 0, None, False)]) # reload cephadm_module.cache.last_client_files = {} cephadm_module.cache.load() diff --git a/src/pybind/mgr/cephadm/tests/test_tuned_profiles.py b/src/pybind/mgr/cephadm/tests/test_tuned_profiles.py index 9db971f6f21..2eba05dfa57 100644 --- a/src/pybind/mgr/cephadm/tests/test_tuned_profiles.py +++ b/src/pybind/mgr/cephadm/tests/test_tuned_profiles.py @@ -80,6 +80,24 @@ class FakeMgr: {'y': 'y'}).to_json()}) return '' + def async_timeout_handler(self, host='', cmd='', timeout=None): + """Mock async_timeout_handler for tests""" + from contextlib import contextmanager + + @contextmanager + def handler(): + yield + return handler() + + def wait_async(self, coro, timeout=None): + """Mock wait_async for tests - just run the coroutine""" + import asyncio + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + class TestTunedProfiles: tspec1 = TunedProfileSpec('p1', @@ -128,19 +146,19 @@ class TestTunedProfiles: ] _write_profiles.assert_has_calls(calls, any_order=True) - @mock.patch('cephadm.ssh.SSHManager.check_execute_command') - def test_rm_stray_tuned_profiles(self, _check_execute_command): + @mock.patch('cephadm.ssh.SSHManager.check_execute_cephadm_exec') + def test_rm_stray_tuned_profiles(self, _check_execute_cephadm_exec): profiles = {'p1': self.tspec1, 'p2': self.tspec2, 'p3': self.tspec3} # for this test, going to use host "a" and put 4 cephadm generated # profiles "p1" "p2", "p3" and "who" only two of which should be there ("p1", "p2") # as well as a file not generated by cephadm. Only the "p3" and "who" # profiles should be removed from the host. This should total to 4 - # calls to check_execute_command, 1 "ls", 2 "rm", and 1 "sysctl --system" - _check_execute_command.return_value = '\n'.join(['p1-cephadm-tuned-profile.conf', - 'p2-cephadm-tuned-profile.conf', - 'p3-cephadm-tuned-profile.conf', - 'who-cephadm-tuned-profile.conf', - 'dont-touch-me']) + # calls to check_execute_cephadm_exec, 1 "ls", 2 "rm", and 1 "sysctl --system" + _check_execute_cephadm_exec.return_value = '\n'.join(['p1-cephadm-tuned-profile.conf', + 'p2-cephadm-tuned-profile.conf', + 'p3-cephadm-tuned-profile.conf', + 'who-cephadm-tuned-profile.conf', + 'dont-touch-me']) mgr = FakeMgr(['a', 'b', 'c'], ['a', 'b', 'c'], [], @@ -169,16 +187,16 @@ class TestTunedProfiles: 'a', RemoteCommand(Executables.SYSCTL, ['--system']) ), ] - _check_execute_command.assert_has_calls(calls, any_order=True) + _check_execute_cephadm_exec.assert_has_calls(calls, any_order=True) - @mock.patch('cephadm.ssh.SSHManager.check_execute_command') + @mock.patch('cephadm.ssh.SSHManager.check_execute_cephadm_exec') @mock.patch('cephadm.ssh.SSHManager.write_remote_file') - def test_write_tuned_profiles(self, _write_remote_file, _check_execute_command): + def test_write_tuned_profiles(self, _write_remote_file, _check_execute_cephadm_exec): profiles = {'p1': self.tspec1, 'p2': self.tspec2, 'p3': self.tspec3} # for this test we will use host "a" and have it so host_needs_tuned_profile_update # returns True for p2 and False for p1 (see FakeCache class). So we should see # 2 ssh calls, one to write p2, one to run sysctl --system - _check_execute_command.return_value = 'success' + _check_execute_cephadm_exec.return_value = 'success' _write_remote_file.return_value = 'success' mgr = FakeMgr(['a', 'b', 'c'], ['a', 'b', 'c'], @@ -186,7 +204,7 @@ class TestTunedProfiles: profiles) tp = TunedProfileUtils(mgr) tp._write_tuned_profiles('a', self.profiles_to_calls(tp, [self.tspec1, self.tspec2])) - _check_execute_command.assert_called_with( + _check_execute_cephadm_exec.assert_called_with( 'a', RemoteCommand(Executables.SYSCTL, ['--system']) ) _write_remote_file.assert_called_with( diff --git a/src/pybind/mgr/cephadm/tuned_profiles.py b/src/pybind/mgr/cephadm/tuned_profiles.py index 7a37d937904..5dbc2c102d2 100644 --- a/src/pybind/mgr/cephadm/tuned_profiles.py +++ b/src/pybind/mgr/cephadm/tuned_profiles.py @@ -73,7 +73,7 @@ class TunedProfileUtils(): if self.mgr.cache.is_host_unreachable(host): return cmd = ssh.RemoteCommand(ssh.Executables.LS, [SYSCTL_DIR]) - found_files = self.mgr.ssh.check_execute_command(host, cmd, log_command=self.mgr.log_refresh_metadata).split('\n') + found_files = self.mgr.ssh.check_execute_cephadm_exec(host, cmd, log_command=self.mgr.log_refresh_metadata).split('\n') found_files = [s.strip() for s in found_files] profile_names: List[str] = sum([[*p] for p in profiles], []) # extract all profiles names profile_names = list(set(profile_names)) # remove duplicates @@ -85,10 +85,10 @@ class TunedProfileUtils(): if file not in expected_files: logger.info(f'Removing stray tuned profile file {file}') cmd = ssh.RemoteCommand(ssh.Executables.RM, ['-f', f'{SYSCTL_DIR}/{file}']) - self.mgr.ssh.check_execute_command(host, cmd) + self.mgr.ssh.check_execute_cephadm_exec(host, cmd) updated = True if updated: - self.mgr.ssh.check_execute_command(host, SYSCTL_SYSTEM_CMD) + self.mgr.ssh.check_execute_cephadm_exec(host, SYSCTL_SYSTEM_CMD) def _write_tuned_profiles(self, host: str, profiles: List[Dict[str, str]]) -> None: if self.mgr.cache.is_host_unreachable(host): @@ -102,5 +102,5 @@ class TunedProfileUtils(): self.mgr.ssh.write_remote_file(host, profile_filename, content.encode('utf-8')) updated = True if updated: - self.mgr.ssh.check_execute_command(host, SYSCTL_SYSTEM_CMD) + self.mgr.ssh.check_execute_cephadm_exec(host, SYSCTL_SYSTEM_CMD) self.mgr.cache.last_tuned_profile_update[host] = datetime_now()