from contextlib import contextmanager
from io import StringIO
from shlex import quote
-from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Iterator, TypeVar, Awaitable
+from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Iterator, TypeVar, Awaitable, Union
from orchestrator import OrchestratorError
try:
import asyncssh
except ImportError:
- asyncssh = None
+ asyncssh = None # type: ignore
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
self.cons[host] = conn
self.mgr.offline_hosts_remove(host)
- conn = self.cons.get(host)
- return conn
+
+ return self.cons[host]
@contextmanager
def redirect_log(self, host: str, addr: str) -> Iterator[None]:
await self._reset_con(host)
self.mgr.offline_hosts.add(host)
raise OrchestratorError(f'Unable to reach remote host {host}. {str(e)}')
- out = r.stdout.rstrip('\n')
- err = r.stderr.rstrip('\n')
- return out, err, r.returncode
+
+ def _rstrip(v: Union[bytes, str, None]) -> str:
+ if not v:
+ return ''
+ if isinstance(v, str):
+ return v.rstrip('\n')
+ if isinstance(v, bytes):
+ return v.decode().rstrip('\n')
+ raise OrchestratorError(
+ f'Unable to parse ssh output with type {type(v)} from remote host {host}')
+
+ out = _rstrip(r.stdout)
+ err = _rstrip(r.stderr)
+ rc = r.returncode if r.returncode else 0
+
+ return out, err, rc
def execute_command(self,
host: str,