"""
def __init__(self, controller, ctx=None, config=None, logger=None,
- cluster='ceph', cephadm=False):
+ cluster='ceph', cephadm=False) -> None:
self.lock = threading.RLock()
self.ctx = ctx
self.config = config
kwargs['args'] = prefix + list(kwargs['args'])
return self.controller.run(**kwargs)
- def raw_cluster_cmd(self, *args, **kwargs):
+ def raw_cluster_cmd(self, *args, **kwargs) -> str:
"""
Start ceph on a raw cluster. Return count
"""
+from typing import Optional, TYPE_CHECKING
import unittest
import time
import logging
from teuthology.orchestra.run import CommandFailedError
+if TYPE_CHECKING:
+ from tasks.mgr.mgr_test_case import MgrCluster
+
log = logging.getLogger(__name__)
class TestTimeoutError(RuntimeError):
backup_fs = None
ceph_cluster = None
mds_cluster = None
- mgr_cluster = None
+ mgr_cluster: Optional['MgrCluster'] = None
ctx = None
mon_manager = None
class TestCephadmCLI(MgrTestCase):
- def _cmd(self, *args):
+ def _cmd(self, *args) -> str:
+ assert self.mgr_cluster is not None
return self.mgr_cluster.mon_manager.raw_cluster_cmd(*args)
- def _orch_cmd(self, *args):
+ def _orch_cmd(self, *args) -> str:
return self._cmd("orch", *args)
def setUp(self):
(result,) = self._ctx.cluster.only(first_mon).remotes.keys()
return result
- def __init__(self, ctx):
+ def __init__(self, ctx) -> None:
self._ctx = ctx
self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
kwargs['args'] = [CEPH_CMD] + list(kwargs['args'])
return self.controller.run(**kwargs)
- def raw_cluster_cmd(self, *args, **kwargs):
+ def raw_cluster_cmd(self, *args, **kwargs) -> str:
"""
args like ["osd", "dump"}
return stdout string