From: Melissa Date: Tue, 20 Jul 2021 23:02:57 +0000 (-0400) Subject: mgr/cephadm: create thread to start event loop for ssh.py, and return results of... X-Git-Tag: v17.1.0~1051^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=068ac34a5f44a2ffd447e6e22d899df47bf773e8;p=ceph.git mgr/cephadm: create thread to start event loop for ssh.py, and return results of the async functions with get_result The EventLoopThread class starts a thread and an event loop which runs forever. Coroutines are scheduled on the event loop by the `get_result` method which uses `run_coroutine_threadsafe` to return a concurrent.futures.Future, and ultimately the result with .result() Fixes: https://tracker.ceph.com/issues/44676 Signed-off-by: Melissa Li --- diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 13d93a23941f..09ec70af7889 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -506,6 +506,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, A command handler will typically change the declarative state of cephadm. This loop will then attempt to apply this new state. """ + # for ssh in serve + self.event_loop = ssh.EventLoopThread() + serve = CephadmServe(self) serve.serve() diff --git a/src/pybind/mgr/cephadm/ssh.py b/src/pybind/mgr/cephadm/ssh.py index caf837980119..3751ae1750b5 100644 --- a/src/pybind/mgr/cephadm/ssh.py +++ b/src/pybind/mgr/cephadm/ssh.py @@ -1,6 +1,8 @@ import logging import os +import asyncio from tempfile import NamedTemporaryFile +from threading import Thread from contextlib import contextmanager from io import StringIO from shlex import quote @@ -29,6 +31,22 @@ Host * ConnectTimeout=30 """ + +class EventLoopThread(Thread): + + def __init__(self) -> None: + + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + self._loop.set_debug(True) + + super().__init__(target=self._loop.run_forever) + self.start() + + def get_result(self, coro) -> Any: # type: ignore + return asyncio.run_coroutine_threadsafe(coro, self._loop).result() + + class SSHManager: def __init__(self, mgr: "CephadmOrchestrator"):