]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: Use 'cephadm exec' to execute bash commands, just commands requires...
authorShweta Bhosale <Shweta.Bhosale1@ibm.com>
Tue, 23 Dec 2025 09:33:02 +0000 (15:03 +0530)
committerShweta Bhosale <Shweta.Bhosale1@ibm.com>
Tue, 10 Feb 2026 05:00:41 +0000 (10:30 +0530)
Fixes: https://tracker.ceph.com/issues/74045
Signed-off-by: Shweta Bhosale <Shweta.Bhosale1@ibm.com>
src/pybind/mgr/cephadm/offline_watcher.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/ssh.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/cephadm/tests/test_tuned_profiles.py
src/pybind/mgr/cephadm/tuned_profiles.py

index 4aa07e2f584ae8590ab636bcc3f8bd5d339806b1..37c9e3bee58d6f90a9983e2ac7500ece671aaec0 100644 (file)
@@ -41,7 +41,7 @@ class OfflineHostWatcher(threading.Thread):
         if host not in self.mgr.offline_hosts:
             try:
                 rcmd = ssh.RemoteCommand(ssh.Executables.TRUE)
-                self.mgr.ssh.check_execute_command(host, rcmd, log_command=self.mgr.log_refresh_metadata)
+                self.mgr.ssh.check_execute_cephadm_exec(host, rcmd, log_command=self.mgr.log_refresh_metadata)
             except Exception:
                 logger.debug(f'OfflineHostDetector: detected {host} to be offline')
                 # kick serve loop in case corrective action must be taken for offline host
index 3f1c63bcda114821d6aecda836a2ebfbd1dcb017..c2de2c6a755cdbabdd1a05ff87ebef649872e635 100644 (file)
@@ -1382,7 +1382,7 @@ class CephadmServe:
                 continue
             self.log.info(f'Removing {host}:{path}')
             cmd = ssh.RemoteCommand(ssh.Executables.RM, ['-f', path])
-            self.mgr.ssh.check_execute_command(host, cmd)
+            self.mgr.ssh.check_execute_cephadm_exec(host, cmd)
             updated_files = True
             self.mgr.cache.removed_client_file(host, path)
         if updated_files:
@@ -1882,7 +1882,43 @@ class CephadmServe:
         # Use tee (from coreutils) to create a copy of cephadm on the target machine
         self.log.info(f"Deploying cephadm binary to {host}")
         await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
-                                              self.mgr._cephadm, addr=addr)
+                                              self.mgr._cephadm, addr=addr, bypass_cephadm_exec=True)
+    async def run_cephadm_exec(self,
+                               host: str,
+                               cmd: List[str],
+                               addr: Optional[str] = None,
+                               stdin: Optional[str] = None,
+                               log_output: Optional[bool] = True,
+                               timeout: Optional[int] = None,
+                               ) -> Tuple[str, str, int]:
+        """
+        Execute a bash command on the remote host via 'cephadm exec --command <bash command>'
+        """
+        self.log.debug(f"run_cephadm_exec: Executing command on {host}: {cmd}")
+
+        exec_args = ['--command'] + cmd
+        try:
+            out, err, code = await self._run_cephadm(
+                host=host,
+                entity=cephadmNoImage,
+                command='exec',
+                args=exec_args,
+                addr=addr,
+                stdin=stdin,
+                no_fsid=True,  # exec doesn't need fsid
+                error_ok=True,  # We'll handle errors at a higher level
+                log_output=log_output,
+                timeout=timeout
+            )
+            stdout = out[0] if out else ''
+            stderr = err[0] if err else ''
+            if log_output:
+                self.log.debug(f"run_cephadm_exec result: code={code}, stdout={stdout}, stderr={stderr}")
+            return stdout, stderr, code
+
+        except Exception as e:
+            self.log.exception(f"Error executing command via cephadm exec on {host}: {e}")
+            return '', str(e), 1
 
 
 def _host_selector(svc: Any) -> Optional[HostSelector]:
index 7cf0c4de3950a1e0aabf9bd4eb4115937584c596..63f26b511b8c46f34dca32f33817e1bbc7c3b6c4 100644 (file)
@@ -236,7 +236,6 @@ class SSHManager:
                                addr: Optional[str] = None,
                                log_command: Optional[bool] = True,
                                ) -> Tuple[str, str, int]:
-
         conn = await self._remote_connection(host, addr)
 
         # For hosts being added, always use root (no sudo) even if cluster
@@ -326,6 +325,68 @@ class SSHManager:
         with self.mgr.async_timeout_handler(host, " ".join(cmd)):
             return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command))
 
+    async def _execute_cephadm_exec(self,
+                                    host: str,
+                                    cmd_components: RemoteCommand,
+                                    stdin: Optional[str] = None,
+                                    addr: Optional[str] = None,
+                                    log_command: Optional[bool] = True,
+                                    ) -> Tuple[str, str, int]:
+        """
+        Execute a command on the remote host via 'cephadm exec --command <bash command>'
+        This routes the command through CephadmServe.run_cephadm_exec
+        """
+        if log_command:
+            logger.debug(f'Executing command via cephadm exec: {cmd_components}')
+
+        cmd_list = list(cmd_components)
+
+        from cephadm.serve import CephadmServe
+
+        out, err, code = await CephadmServe(self.mgr).run_cephadm_exec(
+            host=host,
+            cmd=cmd_list,
+            addr=addr,
+            stdin=stdin,
+            log_output=log_command
+        )
+        return out, err, code
+
+    def execute_cephadm_exec(self,
+                             host: str,
+                             cmd: RemoteCommand,
+                             stdin: Optional[str] = None,
+                             addr: Optional[str] = None,
+                             log_command: Optional[bool] = True
+                             ) -> Tuple[str, str, int]:
+        with self.mgr.async_timeout_handler(host, " ".join(cmd)):
+            return self.mgr.wait_async(self._execute_cephadm_exec(host, cmd, stdin, addr, log_command))
+
+    async def _check_execute_cephadm_exec(self,
+                                          host: str,
+                                          cmd: RemoteCommand,
+                                          stdin: Optional[str] = None,
+                                          addr: Optional[str] = None,
+                                          log_command: Optional[bool] = True
+                                          ) -> str:
+        """Execute a command via cephadm exec and raise error if it fails"""
+        out, err, code = await self._execute_cephadm_exec(host, cmd, stdin, addr, log_command)
+        if code != 0:
+            msg = f'Command {cmd} failed. {err}'
+            logger.debug(msg)
+            raise OrchestratorError(msg)
+        return out
+
+    def check_execute_cephadm_exec(self,
+                                   host: str,
+                                   cmd: RemoteCommand,
+                                   stdin: Optional[str] = None,
+                                   addr: Optional[str] = None,
+                                   log_command: Optional[bool] = True,
+                                   ) -> str:
+        with self.mgr.async_timeout_handler(host, " ".join(cmd)):
+            return self.mgr.wait_async(self._check_execute_cephadm_exec(host, cmd, stdin, addr, log_command))
+
     async def _write_remote_file(self,
                                  host: str,
                                  path: str,
@@ -334,26 +395,33 @@ class SSHManager:
                                  uid: Optional[int] = None,
                                  gid: Optional[int] = None,
                                  addr: Optional[str] = None,
+                                 bypass_cephadm_exec: Optional[bool] = False,
                                  ) -> None:
+        """
+        Write a file to a remote host.
+        """
         try:
             cephadm_tmp_dir = f"/tmp/cephadm-{self.mgr._cluster_fsid}"
             dirname = os.path.dirname(path)
+
+            # Choose execution method based on bypass flag
+            execute_method = self._check_execute_command if bypass_cephadm_exec else self._check_execute_cephadm_exec
             mkdir = RemoteCommand(Executables.MKDIR, ['-p', dirname])
-            await self._check_execute_command(host, mkdir, addr=addr)
+            await execute_method(host, mkdir, addr=addr)
             mkdir2 = RemoteCommand(Executables.MKDIR, ['-p', cephadm_tmp_dir + dirname])
-            await self._check_execute_command(host, mkdir2, addr=addr)
+            await execute_method(host, mkdir2, addr=addr)
             tmp_path = cephadm_tmp_dir + path + '.new'
             touch = RemoteCommand(Executables.TOUCH, [tmp_path])
-            await self._check_execute_command(host, touch, addr=addr)
+            await execute_method(host, touch, addr=addr)
             if self.mgr.ssh_user != 'root':
                 assert self.mgr.ssh_user
                 chown = RemoteCommand(
                     Executables.CHOWN,
                     ['-R', self.mgr.ssh_user, cephadm_tmp_dir]
                 )
-                await self._check_execute_command(host, chown, addr=addr)
+                await execute_method(host, chown, addr=addr)
                 chmod = RemoteCommand(Executables.CHMOD, [str(644), tmp_path])
-                await self._check_execute_command(host, chmod, addr=addr)
+                await execute_method(host, chmod, addr=addr)
             with NamedTemporaryFile(prefix='cephadm-write-remote-file-') as f:
                 os.fchmod(f.fileno(), 0o600)
                 f.write(content)
@@ -367,11 +435,11 @@ class SSHManager:
                     Executables.CHOWN,
                     ['-R', str(uid) + ':' + str(gid), tmp_path]
                 )
-                await self._check_execute_command(host, chown, addr=addr)
+                await execute_method(host, chown, addr=addr)
                 chmod = RemoteCommand(Executables.CHMOD, [oct(mode)[2:], tmp_path])
-                await self._check_execute_command(host, chmod, addr=addr)
+                await execute_method(host, chmod, addr=addr)
             mv = RemoteCommand(Executables.MV, ['-Z', tmp_path, path])
-            await self._check_execute_command(host, mv, addr=addr)
+            await execute_method(host, mv, addr=addr)
         except Exception as e:
             msg = f"Unable to write {host}:{path}: {e}"
             logger.exception(msg)
@@ -385,10 +453,11 @@ class SSHManager:
                           uid: Optional[int] = None,
                           gid: Optional[int] = None,
                           addr: Optional[str] = None,
+                          bypass_cephadm_exec: Optional[bool] = False,
                           ) -> None:
         with self.mgr.async_timeout_handler(host, f'writing file {path}'):
             self.mgr.wait_async(self._write_remote_file(
-                host, path, content, mode, uid, gid, addr))
+                host, path, content, mode, uid, gid, addr, bypass_cephadm_exec))
 
     async def _reset_con(self, host: str) -> None:
         conn = self.cons.get(host)
index a9d40ec7f07d5020f23226bc3d6b141cf62a02ab..4b17f82b10aa9914da1040fad79c39505529e921 100644 (file)
@@ -2080,9 +2080,9 @@ class TestCephadm(object):
             CephadmServe(cephadm_module)._write_all_client_files()
             # Make sure both ceph conf locations (default and per fsid) are called
             _write_file.assert_has_calls([mock.call('test', '/etc/ceph/ceph.conf', b'',
-                                          0o644, 0, 0, None),
+                                          0o644, 0, 0, None, False),
                                          mock.call('test', '/var/lib/ceph/fsid/config/ceph.conf', b'',
-                                          0o644, 0, 0, None)]
+                                          0o644, 0, 0, None, False)]
                                          )
             ceph_conf_files = cephadm_module.cache.get_host_client_files('test')
             assert len(ceph_conf_files) == 2
@@ -2094,10 +2094,10 @@ class TestCephadm(object):
             CephadmServe(cephadm_module)._write_all_client_files()
             _write_file.assert_has_calls([mock.call('test',
                                                     '/etc/ceph/ceph.conf',
-                                                    b'[mon]\nk=v\n', 0o644, 0, 0, None),
+                                                    b'[mon]\nk=v\n', 0o644, 0, 0, None, False),
                                           mock.call('test',
                                                     '/var/lib/ceph/fsid/config/ceph.conf',
-                                                    b'[mon]\nk=v\n', 0o644, 0, 0, None)])
+                                                    b'[mon]\nk=v\n', 0o644, 0, 0, None, False)])
             # reload
             cephadm_module.cache.last_client_files = {}
             cephadm_module.cache.load()
index 9db971f6f216e4a851eb26204e574ef5a5aeef26..2eba05dfa57289bab0b7e6963bb3f59c22628c2e 100644 (file)
@@ -80,6 +80,24 @@ class FakeMgr:
                                                      {'y': 'y'}).to_json()})
         return ''
 
+    def async_timeout_handler(self, host='', cmd='', timeout=None):
+        """Mock async_timeout_handler for tests"""
+        from contextlib import contextmanager
+
+        @contextmanager
+        def handler():
+            yield
+        return handler()
+
+    def wait_async(self, coro, timeout=None):
+        """Mock wait_async for tests - just run the coroutine"""
+        import asyncio
+        loop = asyncio.new_event_loop()
+        try:
+            return loop.run_until_complete(coro)
+        finally:
+            loop.close()
+
 
 class TestTunedProfiles:
     tspec1 = TunedProfileSpec('p1',
@@ -128,19 +146,19 @@ class TestTunedProfiles:
         ]
         _write_profiles.assert_has_calls(calls, any_order=True)
 
-    @mock.patch('cephadm.ssh.SSHManager.check_execute_command')
-    def test_rm_stray_tuned_profiles(self, _check_execute_command):
+    @mock.patch('cephadm.ssh.SSHManager.check_execute_cephadm_exec')
+    def test_rm_stray_tuned_profiles(self, _check_execute_cephadm_exec):
         profiles = {'p1': self.tspec1, 'p2': self.tspec2, 'p3': self.tspec3}
         # for this test, going to use host "a" and put 4 cephadm generated
         # profiles "p1" "p2", "p3" and "who" only two of which should be there ("p1", "p2")
         # as well as a file not generated by cephadm. Only the "p3" and "who"
         # profiles should be removed from the host. This should total to 4
-        # calls to check_execute_command, 1 "ls", 2 "rm", and 1 "sysctl --system"
-        _check_execute_command.return_value = '\n'.join(['p1-cephadm-tuned-profile.conf',
-                                                         'p2-cephadm-tuned-profile.conf',
-                                                         'p3-cephadm-tuned-profile.conf',
-                                                         'who-cephadm-tuned-profile.conf',
-                                                         'dont-touch-me'])
+        # calls to check_execute_cephadm_exec, 1 "ls", 2 "rm", and 1 "sysctl --system"
+        _check_execute_cephadm_exec.return_value = '\n'.join(['p1-cephadm-tuned-profile.conf',
+                                                              'p2-cephadm-tuned-profile.conf',
+                                                              'p3-cephadm-tuned-profile.conf',
+                                                              'who-cephadm-tuned-profile.conf',
+                                                              'dont-touch-me'])
         mgr = FakeMgr(['a', 'b', 'c'],
                       ['a', 'b', 'c'],
                       [],
@@ -169,16 +187,16 @@ class TestTunedProfiles:
                 'a', RemoteCommand(Executables.SYSCTL, ['--system'])
             ),
         ]
-        _check_execute_command.assert_has_calls(calls, any_order=True)
+        _check_execute_cephadm_exec.assert_has_calls(calls, any_order=True)
 
-    @mock.patch('cephadm.ssh.SSHManager.check_execute_command')
+    @mock.patch('cephadm.ssh.SSHManager.check_execute_cephadm_exec')
     @mock.patch('cephadm.ssh.SSHManager.write_remote_file')
-    def test_write_tuned_profiles(self, _write_remote_file, _check_execute_command):
+    def test_write_tuned_profiles(self, _write_remote_file, _check_execute_cephadm_exec):
         profiles = {'p1': self.tspec1, 'p2': self.tspec2, 'p3': self.tspec3}
         # for this test we will use host "a" and have it so host_needs_tuned_profile_update
         # returns True for p2 and False for p1 (see FakeCache class). So we should see
         # 2 ssh calls, one to write p2, one to run sysctl --system
-        _check_execute_command.return_value = 'success'
+        _check_execute_cephadm_exec.return_value = 'success'
         _write_remote_file.return_value = 'success'
         mgr = FakeMgr(['a', 'b', 'c'],
                       ['a', 'b', 'c'],
@@ -186,7 +204,7 @@ class TestTunedProfiles:
                       profiles)
         tp = TunedProfileUtils(mgr)
         tp._write_tuned_profiles('a', self.profiles_to_calls(tp, [self.tspec1, self.tspec2]))
-        _check_execute_command.assert_called_with(
+        _check_execute_cephadm_exec.assert_called_with(
             'a', RemoteCommand(Executables.SYSCTL, ['--system'])
         )
         _write_remote_file.assert_called_with(
index 7a37d9379044b6a73db7965e40274225b2950002..5dbc2c102d21a90106efc49835c4e4d56bf1df72 100644 (file)
@@ -73,7 +73,7 @@ class TunedProfileUtils():
         if self.mgr.cache.is_host_unreachable(host):
             return
         cmd = ssh.RemoteCommand(ssh.Executables.LS, [SYSCTL_DIR])
-        found_files = self.mgr.ssh.check_execute_command(host, cmd, log_command=self.mgr.log_refresh_metadata).split('\n')
+        found_files = self.mgr.ssh.check_execute_cephadm_exec(host, cmd, log_command=self.mgr.log_refresh_metadata).split('\n')
         found_files = [s.strip() for s in found_files]
         profile_names: List[str] = sum([[*p] for p in profiles], [])  # extract all profiles names
         profile_names = list(set(profile_names))  # remove duplicates
@@ -85,10 +85,10 @@ class TunedProfileUtils():
             if file not in expected_files:
                 logger.info(f'Removing stray tuned profile file {file}')
                 cmd = ssh.RemoteCommand(ssh.Executables.RM, ['-f', f'{SYSCTL_DIR}/{file}'])
-                self.mgr.ssh.check_execute_command(host, cmd)
+                self.mgr.ssh.check_execute_cephadm_exec(host, cmd)
                 updated = True
         if updated:
-            self.mgr.ssh.check_execute_command(host, SYSCTL_SYSTEM_CMD)
+            self.mgr.ssh.check_execute_cephadm_exec(host, SYSCTL_SYSTEM_CMD)
 
     def _write_tuned_profiles(self, host: str, profiles: List[Dict[str, str]]) -> None:
         if self.mgr.cache.is_host_unreachable(host):
@@ -102,5 +102,5 @@ class TunedProfileUtils():
                     self.mgr.ssh.write_remote_file(host, profile_filename, content.encode('utf-8'))
                     updated = True
         if updated:
-            self.mgr.ssh.check_execute_command(host, SYSCTL_SYSTEM_CMD)
+            self.mgr.ssh.check_execute_cephadm_exec(host, SYSCTL_SYSTEM_CMD)
         self.mgr.cache.last_tuned_profile_update[host] = datetime_now()