import string
from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
- Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type
+ Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Coroutine, Awaitable
import datetime
import os
serve = CephadmServe(self)
serve.serve()
+ def wait_async(self, coro: Awaitable[T]) -> T:
+ return self.event_loop.get_result(coro)
+
def set_container_image(self, entity: str, image: str) -> None:
self.check_mon_command({
'prefix': 'config set',
from contextlib import contextmanager
from io import StringIO
from shlex import quote
-from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Any, Iterator
+from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Iterator, TypeVar, Awaitable
from orchestrator import OrchestratorError
try:
from cephadm.module import CephadmOrchestrator
from asyncssh.connection import SSHClientConnection
+T = TypeVar('T')
+
+
logger = logging.getLogger(__name__)
asyncssh_logger = logging.getLogger('asyncssh')
super().__init__(target=self._loop.run_forever)
self.start()
- def get_result(self, coro) -> Any: # type: ignore
+ def get_result(self, coro: Awaitable[T]) -> T:
return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
host: str,
addr: Optional[str] = None,
) -> "SSHClientConnection":
- return self.mgr.event_loop.get_result(self._remote_connection(host, addr))
+ return self.mgr.wait_async(self._remote_connection(host, addr))
async def _execute_command(self,
host: str,
stdin: Optional[str] = None,
addr: Optional[str] = None,
) -> Tuple[str, str, int]:
- return self.mgr.event_loop.get_result(self._execute_command(host, cmd, stdin, addr))
+ return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr))
async def _check_execute_command(self,
host: str,
stdin: Optional[str] = None,
addr: Optional[str] = None,
) -> str:
- return self.mgr.event_loop.get_result(self._check_execute_command(host, cmd, stdin, addr))
+ return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr))
async def _write_remote_file(self,
host: str,
gid: Optional[int] = None,
addr: Optional[str] = None,
) -> None:
- self.mgr.event_loop.get_result(self._write_remote_file(
+ self.mgr.wait_async(self._write_remote_file(
host, path, content, mode, uid, gid, addr))
async def _reset_con(self, host: str) -> None:
del self.cons[host]
def reset_con(self, host: str) -> None:
- self.mgr.event_loop.get_result(self._reset_con(host))
+ self.mgr.wait_async(self._reset_con(host))
def _reset_cons(self) -> None:
for host, conn in self.cons.items():