if host not in self.mgr.offline_hosts:
try:
rcmd = ssh.RemoteCommand(ssh.Executables.TRUE)
- self.mgr.ssh.check_execute_command(host, rcmd, log_command=self.mgr.log_refresh_metadata)
+ self.mgr.ssh.check_execute_cephadm_exec(host, rcmd, log_command=self.mgr.log_refresh_metadata)
except Exception:
logger.debug(f'OfflineHostDetector: detected {host} to be offline')
# kick serve loop in case corrective action must be taken for offline host
continue
self.log.info(f'Removing {host}:{path}')
cmd = ssh.RemoteCommand(ssh.Executables.RM, ['-f', path])
- self.mgr.ssh.check_execute_command(host, cmd)
+ self.mgr.ssh.check_execute_cephadm_exec(host, cmd)
updated_files = True
self.mgr.cache.removed_client_file(host, path)
if updated_files:
# Use tee (from coreutils) to create a copy of cephadm on the target machine
self.log.info(f"Deploying cephadm binary to {host}")
await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
- self.mgr._cephadm, addr=addr)
+ self.mgr._cephadm, addr=addr, bypass_cephadm_exec=True)
+ async def run_cephadm_exec(self,
+ host: str,
+ cmd: List[str],
+ addr: Optional[str] = None,
+ stdin: Optional[str] = None,
+ log_output: Optional[bool] = True,
+ timeout: Optional[int] = None,
+ ) -> Tuple[str, str, int]:
+ """
+ Execute a bash command on the remote host via 'cephadm exec --command <bash command>'
+ """
+ self.log.debug(f"run_cephadm_exec: Executing command on {host}: {cmd}")
+
+ exec_args = ['--command'] + cmd
+ try:
+ out, err, code = await self._run_cephadm(
+ host=host,
+ entity=cephadmNoImage,
+ command='exec',
+ args=exec_args,
+ addr=addr,
+ stdin=stdin,
+ no_fsid=True, # exec doesn't need fsid
+ error_ok=True, # We'll handle errors at a higher level
+ log_output=log_output,
+ timeout=timeout
+ )
+ stdout = out[0] if out else ''
+ stderr = err[0] if err else ''
+ if log_output:
+ self.log.debug(f"run_cephadm_exec result: code={code}, stdout={stdout}, stderr={stderr}")
+ return stdout, stderr, code
+
+ except Exception as e:
+ self.log.exception(f"Error executing command via cephadm exec on {host}: {e}")
+ return '', str(e), 1
def _host_selector(svc: Any) -> Optional[HostSelector]:
addr: Optional[str] = None,
log_command: Optional[bool] = True,
) -> Tuple[str, str, int]:
-
conn = await self._remote_connection(host, addr)
# For hosts being added, always use root (no sudo) even if cluster
with self.mgr.async_timeout_handler(host, " ".join(cmd)):
return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command))
+ async def _execute_cephadm_exec(self,
+ host: str,
+ cmd_components: RemoteCommand,
+ stdin: Optional[str] = None,
+ addr: Optional[str] = None,
+ log_command: Optional[bool] = True,
+ ) -> Tuple[str, str, int]:
+ """
+ Execute a command on the remote host via 'cephadm exec --command <bash command>'
+ This routes the command through CephadmServe.run_cephadm_exec
+ """
+ if log_command:
+ logger.debug(f'Executing command via cephadm exec: {cmd_components}')
+
+ cmd_list = list(cmd_components)
+
+ from cephadm.serve import CephadmServe
+
+ out, err, code = await CephadmServe(self.mgr).run_cephadm_exec(
+ host=host,
+ cmd=cmd_list,
+ addr=addr,
+ stdin=stdin,
+ log_output=log_command
+ )
+ return out, err, code
+
+ def execute_cephadm_exec(self,
+ host: str,
+ cmd: RemoteCommand,
+ stdin: Optional[str] = None,
+ addr: Optional[str] = None,
+ log_command: Optional[bool] = True
+ ) -> Tuple[str, str, int]:
+ with self.mgr.async_timeout_handler(host, " ".join(cmd)):
+ return self.mgr.wait_async(self._execute_cephadm_exec(host, cmd, stdin, addr, log_command))
+
+ async def _check_execute_cephadm_exec(self,
+ host: str,
+ cmd: RemoteCommand,
+ stdin: Optional[str] = None,
+ addr: Optional[str] = None,
+ log_command: Optional[bool] = True
+ ) -> str:
+ """Execute a command via cephadm exec and raise error if it fails"""
+ out, err, code = await self._execute_cephadm_exec(host, cmd, stdin, addr, log_command)
+ if code != 0:
+ msg = f'Command {cmd} failed. {err}'
+ logger.debug(msg)
+ raise OrchestratorError(msg)
+ return out
+
+ def check_execute_cephadm_exec(self,
+ host: str,
+ cmd: RemoteCommand,
+ stdin: Optional[str] = None,
+ addr: Optional[str] = None,
+ log_command: Optional[bool] = True,
+ ) -> str:
+ with self.mgr.async_timeout_handler(host, " ".join(cmd)):
+ return self.mgr.wait_async(self._check_execute_cephadm_exec(host, cmd, stdin, addr, log_command))
+
async def _write_remote_file(self,
host: str,
path: str,
uid: Optional[int] = None,
gid: Optional[int] = None,
addr: Optional[str] = None,
+ bypass_cephadm_exec: Optional[bool] = False,
) -> None:
+ """
+ Write a file to a remote host.
+ """
try:
cephadm_tmp_dir = f"/tmp/cephadm-{self.mgr._cluster_fsid}"
dirname = os.path.dirname(path)
+
+ # Choose execution method based on bypass flag
+ execute_method = self._check_execute_command if bypass_cephadm_exec else self._check_execute_cephadm_exec
mkdir = RemoteCommand(Executables.MKDIR, ['-p', dirname])
- await self._check_execute_command(host, mkdir, addr=addr)
+ await execute_method(host, mkdir, addr=addr)
mkdir2 = RemoteCommand(Executables.MKDIR, ['-p', cephadm_tmp_dir + dirname])
- await self._check_execute_command(host, mkdir2, addr=addr)
+ await execute_method(host, mkdir2, addr=addr)
tmp_path = cephadm_tmp_dir + path + '.new'
touch = RemoteCommand(Executables.TOUCH, [tmp_path])
- await self._check_execute_command(host, touch, addr=addr)
+ await execute_method(host, touch, addr=addr)
if self.mgr.ssh_user != 'root':
assert self.mgr.ssh_user
chown = RemoteCommand(
Executables.CHOWN,
['-R', self.mgr.ssh_user, cephadm_tmp_dir]
)
- await self._check_execute_command(host, chown, addr=addr)
+ await execute_method(host, chown, addr=addr)
chmod = RemoteCommand(Executables.CHMOD, [str(644), tmp_path])
- await self._check_execute_command(host, chmod, addr=addr)
+ await execute_method(host, chmod, addr=addr)
with NamedTemporaryFile(prefix='cephadm-write-remote-file-') as f:
os.fchmod(f.fileno(), 0o600)
f.write(content)
Executables.CHOWN,
['-R', str(uid) + ':' + str(gid), tmp_path]
)
- await self._check_execute_command(host, chown, addr=addr)
+ await execute_method(host, chown, addr=addr)
chmod = RemoteCommand(Executables.CHMOD, [oct(mode)[2:], tmp_path])
- await self._check_execute_command(host, chmod, addr=addr)
+ await execute_method(host, chmod, addr=addr)
mv = RemoteCommand(Executables.MV, ['-Z', tmp_path, path])
- await self._check_execute_command(host, mv, addr=addr)
+ await execute_method(host, mv, addr=addr)
except Exception as e:
msg = f"Unable to write {host}:{path}: {e}"
logger.exception(msg)
uid: Optional[int] = None,
gid: Optional[int] = None,
addr: Optional[str] = None,
+ bypass_cephadm_exec: Optional[bool] = False,
) -> None:
with self.mgr.async_timeout_handler(host, f'writing file {path}'):
self.mgr.wait_async(self._write_remote_file(
- host, path, content, mode, uid, gid, addr))
+ host, path, content, mode, uid, gid, addr, bypass_cephadm_exec))
async def _reset_con(self, host: str) -> None:
conn = self.cons.get(host)
CephadmServe(cephadm_module)._write_all_client_files()
# Make sure both ceph conf locations (default and per fsid) are called
_write_file.assert_has_calls([mock.call('test', '/etc/ceph/ceph.conf', b'',
- 0o644, 0, 0, None),
+ 0o644, 0, 0, None, False),
mock.call('test', '/var/lib/ceph/fsid/config/ceph.conf', b'',
- 0o644, 0, 0, None)]
+ 0o644, 0, 0, None, False)]
)
ceph_conf_files = cephadm_module.cache.get_host_client_files('test')
assert len(ceph_conf_files) == 2
CephadmServe(cephadm_module)._write_all_client_files()
_write_file.assert_has_calls([mock.call('test',
'/etc/ceph/ceph.conf',
- b'[mon]\nk=v\n', 0o644, 0, 0, None),
+ b'[mon]\nk=v\n', 0o644, 0, 0, None, False),
mock.call('test',
'/var/lib/ceph/fsid/config/ceph.conf',
- b'[mon]\nk=v\n', 0o644, 0, 0, None)])
+ b'[mon]\nk=v\n', 0o644, 0, 0, None, False)])
# reload
cephadm_module.cache.last_client_files = {}
cephadm_module.cache.load()
{'y': 'y'}).to_json()})
return ''
+ def async_timeout_handler(self, host='', cmd='', timeout=None):
+ """Mock async_timeout_handler for tests"""
+ from contextlib import contextmanager
+
+ @contextmanager
+ def handler():
+ yield
+ return handler()
+
+ def wait_async(self, coro, timeout=None):
+ """Mock wait_async for tests - just run the coroutine"""
+ import asyncio
+ loop = asyncio.new_event_loop()
+ try:
+ return loop.run_until_complete(coro)
+ finally:
+ loop.close()
+
class TestTunedProfiles:
tspec1 = TunedProfileSpec('p1',
]
_write_profiles.assert_has_calls(calls, any_order=True)
- @mock.patch('cephadm.ssh.SSHManager.check_execute_command')
- def test_rm_stray_tuned_profiles(self, _check_execute_command):
+ @mock.patch('cephadm.ssh.SSHManager.check_execute_cephadm_exec')
+ def test_rm_stray_tuned_profiles(self, _check_execute_cephadm_exec):
profiles = {'p1': self.tspec1, 'p2': self.tspec2, 'p3': self.tspec3}
# for this test, going to use host "a" and put 4 cephadm generated
# profiles "p1" "p2", "p3" and "who" only two of which should be there ("p1", "p2")
# as well as a file not generated by cephadm. Only the "p3" and "who"
# profiles should be removed from the host. This should total to 4
- # calls to check_execute_command, 1 "ls", 2 "rm", and 1 "sysctl --system"
- _check_execute_command.return_value = '\n'.join(['p1-cephadm-tuned-profile.conf',
- 'p2-cephadm-tuned-profile.conf',
- 'p3-cephadm-tuned-profile.conf',
- 'who-cephadm-tuned-profile.conf',
- 'dont-touch-me'])
+ # calls to check_execute_cephadm_exec, 1 "ls", 2 "rm", and 1 "sysctl --system"
+ _check_execute_cephadm_exec.return_value = '\n'.join(['p1-cephadm-tuned-profile.conf',
+ 'p2-cephadm-tuned-profile.conf',
+ 'p3-cephadm-tuned-profile.conf',
+ 'who-cephadm-tuned-profile.conf',
+ 'dont-touch-me'])
mgr = FakeMgr(['a', 'b', 'c'],
['a', 'b', 'c'],
[],
'a', RemoteCommand(Executables.SYSCTL, ['--system'])
),
]
- _check_execute_command.assert_has_calls(calls, any_order=True)
+ _check_execute_cephadm_exec.assert_has_calls(calls, any_order=True)
- @mock.patch('cephadm.ssh.SSHManager.check_execute_command')
+ @mock.patch('cephadm.ssh.SSHManager.check_execute_cephadm_exec')
@mock.patch('cephadm.ssh.SSHManager.write_remote_file')
- def test_write_tuned_profiles(self, _write_remote_file, _check_execute_command):
+ def test_write_tuned_profiles(self, _write_remote_file, _check_execute_cephadm_exec):
profiles = {'p1': self.tspec1, 'p2': self.tspec2, 'p3': self.tspec3}
# for this test we will use host "a" and have it so host_needs_tuned_profile_update
# returns True for p2 and False for p1 (see FakeCache class). So we should see
# 2 ssh calls, one to write p2, one to run sysctl --system
- _check_execute_command.return_value = 'success'
+ _check_execute_cephadm_exec.return_value = 'success'
_write_remote_file.return_value = 'success'
mgr = FakeMgr(['a', 'b', 'c'],
['a', 'b', 'c'],
profiles)
tp = TunedProfileUtils(mgr)
tp._write_tuned_profiles('a', self.profiles_to_calls(tp, [self.tspec1, self.tspec2]))
- _check_execute_command.assert_called_with(
+ _check_execute_cephadm_exec.assert_called_with(
'a', RemoteCommand(Executables.SYSCTL, ['--system'])
)
_write_remote_file.assert_called_with(
if self.mgr.cache.is_host_unreachable(host):
return
cmd = ssh.RemoteCommand(ssh.Executables.LS, [SYSCTL_DIR])
- found_files = self.mgr.ssh.check_execute_command(host, cmd, log_command=self.mgr.log_refresh_metadata).split('\n')
+ found_files = self.mgr.ssh.check_execute_cephadm_exec(host, cmd, log_command=self.mgr.log_refresh_metadata).split('\n')
found_files = [s.strip() for s in found_files]
profile_names: List[str] = sum([[*p] for p in profiles], []) # extract all profiles names
profile_names = list(set(profile_names)) # remove duplicates
if file not in expected_files:
logger.info(f'Removing stray tuned profile file {file}')
cmd = ssh.RemoteCommand(ssh.Executables.RM, ['-f', f'{SYSCTL_DIR}/{file}'])
- self.mgr.ssh.check_execute_command(host, cmd)
+ self.mgr.ssh.check_execute_cephadm_exec(host, cmd)
updated = True
if updated:
- self.mgr.ssh.check_execute_command(host, SYSCTL_SYSTEM_CMD)
+ self.mgr.ssh.check_execute_cephadm_exec(host, SYSCTL_SYSTEM_CMD)
def _write_tuned_profiles(self, host: str, profiles: List[Dict[str, str]]) -> None:
if self.mgr.cache.is_host_unreachable(host):
self.mgr.ssh.write_remote_file(host, profile_filename, content.encode('utf-8'))
updated = True
if updated:
- self.mgr.ssh.check_execute_command(host, SYSCTL_SYSTEM_CMD)
+ self.mgr.ssh.check_execute_cephadm_exec(host, SYSCTL_SYSTEM_CMD)
self.mgr.cache.last_tuned_profile_update[host] = datetime_now()