]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: use _remote_connection (ssh.py), _execute_command, _check_execute_comman...
authorMelissa <li.melissa.kun@gmail.com>
Wed, 21 Jul 2021 03:12:21 +0000 (23:12 -0400)
committerMelissa Li <li.melissa.kun@gmail.com>
Fri, 20 Aug 2021 18:27:45 +0000 (14:27 -0400)
remove _get_connection from module.py and _remote_connection in serve.py, replacing with _remote_connection in ssh.py.
also, replace remoto.process.check with _execute_command and _check_execute_command in ssh.py

Fixes: https://tracker.ceph.com/issues/44676
Signed-off-by: Melissa Li <li.melissa.kun@gmail.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py

index 87a76debfabde76f70649916f0e28889a25e3a2b..b278cc54b87f98e79624df526ab7f9f68ab34da7 100644 (file)
@@ -1244,35 +1244,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self._kick_serve_loop()
         return HandleCommandResult()
 
-    def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection',
-                                                  'remoto.backends.LegacyModuleExecute']:
-        """
-        Setup a connection for running commands on remote host.
-        """
-        conn, r = self._cons.get(host, (None, None))
-        if conn:
-            if conn.has_connection():
-                self.log.debug('Have connection to %s' % host)
-                return conn, r
-            else:
-                self._reset_con(host)
-        assert self.ssh_user
-        n = self.ssh_user + '@' + host
-        self.log.debug("Opening connection to {} with ssh options '{}'".format(
-            n, self._ssh_options))
-        child_logger = self.log.getChild(n)
-        child_logger.setLevel('WARNING')
-        conn = remoto.Connection(
-            n,
-            logger=child_logger,
-            ssh_options=self._ssh_options,
-            sudo=True if self.ssh_user != 'root' else False)
-
-        r = conn.import_module(remotes)
-        self._cons[host] = conn, r
-
-        return conn, r
-
     def _get_container_image(self, daemon_name: str) -> Optional[str]:
         daemon_type = daemon_name.split('.', 1)[0]  # type: ignore
         image: Optional[str] = None
index f09e8a033a6837d755aea1118c0ed8379dc3002a..8cc13076c7bd72021304935d01583c8bb720bb62 100644 (file)
@@ -3,7 +3,6 @@ import json
 import logging
 import uuid
 from collections import defaultdict
-from contextlib import contextmanager
 from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator
 try:
     import remoto
@@ -1161,97 +1160,88 @@ class CephadmServe:
 
         :env_vars: in format -> [KEY=VALUE, ..]
         """
+
+        self.mgr.ssh.remote_connection(host, addr)
+
         self.log.debug(f"_run_cephadm : command = {command}")
         self.log.debug(f"_run_cephadm : args = {args}")
 
         bypass_image = ('cephadm-exporter',)
 
-        with self._remote_connection(host, addr) as tpl:
-            conn, connr = tpl
-            assert image or entity
-            # Skip the image check for daemons deployed that are not ceph containers
-            if not str(entity).startswith(bypass_image):
-                if not image and entity is not cephadmNoImage:
-                    image = self.mgr._get_container_image(entity)
+        assert image or entity
+        # Skip the image check for daemons deployed that are not ceph containers
+        if not str(entity).startswith(bypass_image):
+            if not image and entity is not cephadmNoImage:
+                image = self.mgr._get_container_image(entity)
 
-            final_args = []
+        final_args = []
 
-            # global args
-            if env_vars:
-                for env_var_pair in env_vars:
-                    final_args.extend(['--env', env_var_pair])
+        # global args
+        if env_vars:
+            for env_var_pair in env_vars:
+                final_args.extend(['--env', env_var_pair])
 
-            if image:
-                final_args.extend(['--image', image])
+        if image:
+            final_args.extend(['--image', image])
 
-            if not self.mgr.container_init:
-                final_args += ['--no-container-init']
+        if not self.mgr.container_init:
+            final_args += ['--no-container-init']
 
-            # subcommand
-            final_args.append(command)
+        # subcommand
+        final_args.append(command)
 
-            # subcommand args
-            if not no_fsid:
-                final_args += ['--fsid', self.mgr._cluster_fsid]
+        # subcommand args
+        if not no_fsid:
+            final_args += ['--fsid', self.mgr._cluster_fsid]
 
-            final_args += args
+        final_args += args
 
-            # exec
-            self.log.debug('args: %s' % (' '.join(final_args)))
-            if self.mgr.mode == 'root':
-                if stdin:
-                    self.log.debug('stdin: %s' % stdin)
+        # exec
+        self.log.debug('args: %s' % (' '.join(final_args)))
+        if self.mgr.mode == 'root':
+            if stdin:
+                self.log.debug('stdin: %s' % stdin)
 
-                python = connr.choose_python()
-                if not python:
-                    raise RuntimeError(
-                        'unable to find python on %s (tried %s in %s)' % (
-                            host, remotes.PYTHONS, remotes.PATH))
-                try:
-                    out, err, code = remoto.process.check(
-                        conn,
-                        [python, self.mgr.cephadm_binary_path] + final_args,
-                        stdin=stdin.encode('utf-8') if stdin is not None else None)
-                    if code == 2:
-                        out_ls, err_ls, code_ls = remoto.process.check(
-                            conn, ['ls', self.mgr.cephadm_binary_path])
-                        if code_ls == 2:
-                            self._deploy_cephadm_binary_conn(conn, host)
-                            out, err, code = remoto.process.check(
-                                conn,
-                                [python, self.mgr.cephadm_binary_path] + final_args,
-                                stdin=stdin.encode('utf-8') if stdin is not None else None)
-
-                except RuntimeError as e:
-                    self.mgr._reset_con(host)
-                    if error_ok:
-                        return [], [str(e)], 1
-                    raise
+            cmd = ['which', 'python3']
+            python = self.mgr.ssh.check_execute_command(host, cmd, addr=addr)
+            cmd = [python, self.mgr.cephadm_binary_path] + final_args
 
-            elif self.mgr.mode == 'cephadm-package':
-                try:
-                    out, err, code = remoto.process.check(
-                        conn,
-                        ['sudo', '/usr/bin/cephadm'] + final_args,
-                        stdin=stdin)
-                except RuntimeError as e:
-                    self.mgr._reset_con(host)
-                    if error_ok:
-                        return [], [str(e)], 1
-                    raise
-            else:
-                assert False, 'unsupported mode'
-
-            self.log.debug('code: %d' % code)
-            if out:
-                self.log.debug('out: %s' % '\n'.join(out))
-            if err:
-                self.log.debug('err: %s' % '\n'.join(err))
-            if code and not error_ok:
-                raise OrchestratorError(
-                    'cephadm exited with an error code: %d, stderr:%s' % (
-                        code, '\n'.join(err)))
-            return out, err, code
+            try:
+                out, err, code = self.mgr.ssh.execute_command(host, cmd, stdin=stdin.encode('utf-8') if stdin else None, addr=addr)
+                if code == 2:
+                    ls_cmd = ['ls', self.mgr.cephadm_binary_path]
+                    out_ls, err_ls, code_ls = self.mgr.ssh.execute_command(host, ls_cmd, addr=addr)
+                    if code_ls == 2:
+                        self._deploy_cephadm_binary(host, addr)
+                        out, err, code = self.mgr.ssh.execute_command(host, cmd, stdin=stdin.encode('utf-8') if stdin else None, addr=addr)
+
+            except Exception as e:
+                self.mgr.ssh._reset_con(host)
+                if error_ok:
+                    return [], [str(e)], 1
+                raise
+
+        elif self.mgr.mode == 'cephadm-package':
+            try:
+                cmd = ['/usr/bin/cephadm'] + final_args
+                out, err, code = self.mgr.ssh.execute_command(host, cmd, stdin=stdin.encode('utf-8') if stdin else None, addr=addr)
+            except Exception as e:
+                self.mgr.ssh._reset_con(host)
+                if error_ok:
+                    return [], [str(e)], 1
+                raise
+        else:
+            assert False, 'unsupported mode'
+
+        self.log.debug(f'code: {code}')
+        if out:
+            self.log.debug(f'out: {out}')
+        if err:
+            self.log.debug(f'err: {err}')
+        if code and not error_ok:
+            raise OrchestratorError(
+                f'cephadm exited with an error code: {code}, stderr: {err}')
+        return [out], [err], code
 
     def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
         # pick a random host...
@@ -1314,55 +1304,3 @@ class CephadmServe:
             msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
             self.log.warning(msg)
             raise OrchestratorError(msg)
-    @contextmanager
-    def _remote_connection(self,
-                           host: str,
-                           addr: Optional[str] = None,
-                           ) -> Iterator[Tuple["BaseConnection", Any]]:
-        if not addr and host in self.mgr.inventory:
-            addr = self.mgr.inventory.get_addr(host)
-
-        self.mgr.offline_hosts_remove(host)
-
-        try:
-            try:
-                if not addr:
-                    raise OrchestratorError("host address is empty")
-                conn, connr = self.mgr._get_connection(addr)
-            except OSError as e:
-                self.mgr._reset_con(host)
-                msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
-                raise execnet.gateway_bootstrap.HostNotFound(msg)
-
-            yield (conn, connr)
-
-        except execnet.gateway_bootstrap.HostNotFound as e:
-            # this is a misleading exception as it seems to be thrown for
-            # any sort of connection failure, even those having nothing to
-            # do with "host not found" (e.g., ssh key permission denied).
-            self.mgr.offline_hosts.add(host)
-            self.mgr._reset_con(host)
-
-            user = self.mgr.ssh_user if self.mgr.mode == 'root' else 'cephadm'
-            if str(e).startswith("Can't communicate"):
-                msg = str(e)
-            else:
-                msg = f'''Failed to connect to {host} ({addr}).
-Please make sure that the host is reachable and accepts connections using the cephadm SSH key
-
-To add the cephadm SSH key to the host:
-> ceph cephadm get-pub-key > ~/ceph.pub
-> ssh-copy-id -f -i ~/ceph.pub {user}@{addr}
-
-To check that the host is reachable open a new shell with the --no-hosts flag:
-> cephadm shell --no-hosts
-
-Then run the following:
-> ceph cephadm get-ssh-config > ssh_config
-> ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
-> chmod 0600 ~/cephadm_private_key
-> ssh -F ssh_config -i ~/cephadm_private_key {user}@{addr}'''
-            raise OrchestratorError(msg) from e
-        except Exception as ex:
-            self.log.exception(ex)
-            raise