]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: execute commands run over ssh via asyncssh
authorMelissa <li.melissa.kun@gmail.com>
Tue, 20 Jul 2021 21:02:40 +0000 (17:02 -0400)
committerMelissa Li <li.melissa.kun@gmail.com>
Fri, 20 Aug 2021 18:27:45 +0000 (14:27 -0400)
_execute_command will run commands over ssh using the asyncssh `run` method: https://asyncssh.readthedocs.io/en/latest/api.html#asyncssh.SSHClientConnection.run
_check_execute_command will check the output of _execute_command and raise OrchestratorError if command fails on the remote host.
All commands run over ssh are prepended with sudo in `_execute_command` and shell-escaped with shlex quote.
If the cached ssh connection is closed or broken, the connection object will be removed from the cache, added to the `offline_hosts`, and an OrchestratorError will be raised. On the next call, the connection object will attempt to be recreated.
Exceptions involving asyncssh methods should be handled otherwise errors like TypeError: __init__() missing 1 required positional argument: 'reason' could occur due to the asyncssh error interacting with `raise_if_exception`

Fixes: https://tracker.ceph.com/issues/44676
Signed-off-by: Melissa Li <li.melissa.kun@gmail.com>
src/pybind/mgr/cephadm/ssh.py

index 8eeeadf2993c5fdbe0629ce267e8896612bdd466..9ccfb3064a5c8d4728988cf015c20401ad2d5704 100644 (file)
@@ -1,6 +1,7 @@
 import logging
 from contextlib import contextmanager
 from io import StringIO
+from shlex import quote
 from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Any, Iterator
 from orchestrator import OrchestratorError
 
@@ -88,3 +89,40 @@ class SSHManager:
             log_string.flush()
             asyncssh_logger.removeHandler(ch)
 
+    async def _execute_command(self,
+                               host: str,
+                               cmd: List[str],
+                               stdin: Optional[bytes] = b"",
+                               addr: Optional[str] = None,
+                               **kwargs: Any,
+                               ) -> Tuple[str, str, int]:
+        conn = await self._remote_connection(host, addr)
+        cmd = "sudo " + " ".join(quote(x) for x in cmd)
+        logger.debug(f'Running command: {cmd}')
+        try:
+            r = await conn.run(cmd, input=stdin.decode() if stdin else None)
+        # handle these Exceptions otherwise you might get a weird error like TypeError: __init__() missing 1 required positional argument: 'reason' (due to the asyncssh error interacting with raise_if_exception)
+        except (asyncssh.ChannelOpenError, Exception) as e:
+            # SSH connection closed or broken, will create new connection next call
+            logger.debug(f'Connection to {host} failed. {str(e)}')
+            self._reset_con(host)
+            self.mgr.offline_hosts.add(host)
+            raise OrchestratorError(f'Unable to reach remote host {host}. {str(e)}')
+        out = r.stdout.rstrip('\n')
+        err = r.stderr.rstrip('\n')
+        return out, err, r.returncode
+
+    async def _check_execute_command(self,
+                                     host: str,
+                                     cmd: List[str],
+                                     stdin: Optional[bytes] = b"",
+                                     addr: Optional[str] = None,
+                                     **kwargs: Any,
+                                     ) -> str:
+        out, err, code = await self._execute_command(host, cmd, stdin, addr)
+        if code != 0:
+            msg = f'Command {cmd} failed. {err}'
+            logger.debug(msg)
+            raise OrchestratorError(msg)
+        return out
+