From: Melissa
Date: Tue, 20 Jul 2021 23:02:57 +0000 (-0400)
Subject: mgr/cephadm: create thread to start event loop for ssh.py, and return results of...
X-Git-Tag: v17.1.0~1051^2~9
X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=068ac34a5f44a2ffd447e6e22d899df47bf773e8;p=ceph.git
mgr/cephadm: create thread to start event loop for ssh.py, and return results of the async functions with get_result
The EventLoopThread class starts a thread and an event loop which runs forever. Coroutines are scheduled on the event loop by the `get_result` method which uses `run_coroutine_threadsafe` to return a concurrent.futures.Future, and ultimately the result with .result()
Fixes: https://tracker.ceph.com/issues/44676
Signed-off-by: Melissa Li
---
diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py
index 13d93a23941f..09ec70af7889 100644
--- a/src/pybind/mgr/cephadm/module.py
+++ b/src/pybind/mgr/cephadm/module.py
@@ -506,6 +506,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
A command handler will typically change the declarative state
of cephadm. This loop will then attempt to apply this new state.
"""
+ # for ssh in serve
+ self.event_loop = ssh.EventLoopThread()
+
serve = CephadmServe(self)
serve.serve()
diff --git a/src/pybind/mgr/cephadm/ssh.py b/src/pybind/mgr/cephadm/ssh.py
index caf837980119..3751ae1750b5 100644
--- a/src/pybind/mgr/cephadm/ssh.py
+++ b/src/pybind/mgr/cephadm/ssh.py
@@ -1,6 +1,8 @@
import logging
import os
+import asyncio
from tempfile import NamedTemporaryFile
+from threading import Thread
from contextlib import contextmanager
from io import StringIO
from shlex import quote
@@ -29,6 +31,22 @@ Host *
ConnectTimeout=30
"""
+
+class EventLoopThread(Thread):
+
+ def __init__(self) -> None:
+
+ self._loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self._loop)
+ self._loop.set_debug(True)
+
+ super().__init__(target=self._loop.run_forever)
+ self.start()
+
+ def get_result(self, coro) -> Any: # type: ignore
+ return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
+
+
class SSHManager:
def __init__(self, mgr: "CephadmOrchestrator"):