]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: move nfs and osd to services/
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 4 May 2020 10:47:38 +0000 (12:47 +0200)
committerSebastian Wagner <sebastian.wagner@suse.com>
Thu, 7 May 2020 10:58:59 +0000 (12:58 +0200)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/nfs.py [deleted file]
src/pybind/mgr/cephadm/osd.py [deleted file]
src/pybind/mgr/cephadm/services/__init__.py [new file with mode: 0644]
src/pybind/mgr/cephadm/services/nfs.py [new file with mode: 0644]
src/pybind/mgr/cephadm/services/osd.py [new file with mode: 0644]
src/pybind/mgr/cephadm/tests/test_cephadm.py

index ca6ad58e2085ddfcd4f3d40c27928f989880c344..541737bd5e906f7df28e5472744881e804625f24 100644 (file)
@@ -39,8 +39,8 @@ from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpe
 
 from . import remotes
 from . import utils
-from .nfs import NFSGanesha
-from .osd import RemoveUtil, OSDRemoval
+from .services.nfs import NFSGanesha
+from .services.osd import RemoveUtil, OSDRemoval
 from .inventory import Inventory, SpecStore, HostCache
 
 try:
diff --git a/src/pybind/mgr/cephadm/nfs.py b/src/pybind/mgr/cephadm/nfs.py
deleted file mode 100644 (file)
index 1c0a452..0000000
+++ /dev/null
@@ -1,155 +0,0 @@
-import logging
-import rados
-
-from typing import Dict, Optional
-
-from ceph.deployment.service_spec import NFSServiceSpec
-
-import cephadm
-from orchestrator import OrchestratorError
-
-from . import utils
-
-logger = logging.getLogger(__name__)
-
-class NFSGanesha(object):
-    def __init__(self,
-                 mgr,
-                 daemon_id,
-                 spec):
-        # type: (cephadm.CephadmOrchestrator, str, NFSServiceSpec) -> None
-        assert spec.service_id and daemon_id.startswith(spec.service_id)
-        self.mgr = mgr
-        self.daemon_id = daemon_id
-        self.spec = spec
-
-    def get_daemon_name(self):
-        # type: () -> str
-        return '%s.%s' % (self.spec.service_type, self.daemon_id)
-
-    def get_rados_user(self):
-        # type: () -> str
-        return '%s.%s' % (self.spec.service_type, self.daemon_id)
-
-    def get_keyring_entity(self):
-        # type: () -> str
-        return utils.name_to_config_section(self.get_rados_user())
-
-    def get_or_create_keyring(self, entity=None):
-        # type: (Optional[str]) -> str
-        if not entity:
-            entity = self.get_keyring_entity()
-
-        logger.info('Create keyring: %s' % entity)
-        ret, keyring, err = self.mgr.mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': entity,
-        })
-
-        if ret != 0:
-            raise OrchestratorError(
-                    'Unable to create keyring %s: %s %s' \
-                            % (entity, ret, err))
-        return keyring
-
-    def update_keyring_caps(self, entity=None):
-        # type: (Optional[str]) -> None
-        if not entity:
-            entity = self.get_keyring_entity()
-
-        osd_caps='allow rw pool=%s' % (self.spec.pool)
-        if self.spec.namespace:
-            osd_caps='%s namespace=%s' % (osd_caps, self.spec.namespace)
-
-        logger.info('Updating keyring caps: %s' % entity)
-        ret, out, err = self.mgr.mon_command({
-            'prefix': 'auth caps',
-            'entity': entity,
-            'caps': ['mon', 'allow r',
-                     'osd', osd_caps,
-                     'mds', 'allow rw'],
-        })
-
-        if ret != 0:
-            raise OrchestratorError(
-                    'Unable to update keyring caps %s: %s %s' \
-                            % (entity, ret, err))
-
-    def create_rados_config_obj(self, clobber=False):
-        # type: (Optional[bool]) -> None
-        obj = self.spec.rados_config_name()
-
-        with self.mgr.rados.open_ioctx(self.spec.pool) as ioctx:
-            if self.spec.namespace:
-                ioctx.set_namespace(self.spec.namespace)
-
-            exists = True
-            try:
-                ioctx.stat(obj)
-            except rados.ObjectNotFound as e:
-                exists = False
-
-            if exists and not clobber:
-                # Assume an existing config
-                logger.info('Rados config object exists: %s' % obj)
-            else:
-                # Create an empty config object
-                logger.info('Creating rados config object: %s' % obj)
-                ioctx.write_full(obj, ''.encode('utf-8'))
-
-    def get_ganesha_conf(self):
-        # type: () -> str
-        return '''# generated by cephadm
-NFS_CORE_PARAM {{
-        Enable_NLM = false;
-        Enable_RQUOTA = false;
-        Protocols = 4;
-}}
-
-CACHEINODE {{
-        Dir_Chunk = 0;
-        NParts = 1;
-        Cache_Size = 1;
-}}
-
-EXPORT_DEFAULTS {{
-        Attr_Expiration_Time = 0;
-}}
-
-NFSv4 {{
-        Delegations = false;
-        RecoveryBackend = 'rados_cluster';
-        Minor_Versions = 1, 2;
-}}
-
-RADOS_KV {{
-        UserId = "{user}";
-        nodeid = "{nodeid}";
-        pool = "{pool}";
-        namespace = "{namespace}";
-}}
-
-RADOS_URLS {{
-        UserId = "{user}";
-        watch_url = "{url}";
-}}
-
-%url    {url}
-'''.format(user=self.get_rados_user(),
-           nodeid=self.get_daemon_name(),
-           pool=self.spec.pool,
-           namespace=self.spec.namespace if self.spec.namespace else '',
-           url=self.spec.rados_config_location())
-
-    def get_cephadm_config(self):
-        # type: () -> Dict
-        config = {'pool' : self.spec.pool} # type: Dict
-        if self.spec.namespace:
-            config['namespace'] = self.spec.namespace
-        config['userid'] = self.get_rados_user()
-        config['extra_args'] = ['-N', 'NIV_EVENT']
-        config['files'] = {
-            'ganesha.conf' : self.get_ganesha_conf(),
-        }
-        logger.debug('Generated cephadm config-json: %s' % config)
-        return config
diff --git a/src/pybind/mgr/cephadm/osd.py b/src/pybind/mgr/cephadm/osd.py
deleted file mode 100644 (file)
index 2ead0cb..0000000
+++ /dev/null
@@ -1,191 +0,0 @@
-import datetime
-import json
-import logging
-import time
-
-from typing import List, Dict, Any, Set, Union
-
-import orchestrator
-from orchestrator import OrchestratorError
-
-logger = logging.getLogger(__name__)
-
-
-class OSDRemoval(object):
-    def __init__(self,
-                 osd_id: str,
-                 replace: bool,
-                 force: bool,
-                 nodename: str,
-                 fullname: str,
-                 start_at: datetime.datetime,
-                 pg_count: int):
-        self.osd_id = osd_id
-        self.replace = replace
-        self.force = force
-        self.nodename = nodename
-        self.fullname = fullname
-        self.started_at = start_at
-        self.pg_count = pg_count
-
-    # needed due to changing 'started_at' attr
-    def __eq__(self, other):
-        return self.osd_id == other.osd_id
-
-    def __hash__(self):
-        return hash(self.osd_id)
-
-    def __repr__(self):
-        return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}'
-                ', fullname={}, started_at={}, pg_count={})').format(
-            self.osd_id, self.replace, self.force, self.nodename,
-            self.fullname, self.started_at, self.pg_count)
-
-    @property
-    def pg_count_str(self) -> str:
-        return 'n/a' if self.pg_count < 0 else str(self.pg_count)
-
-
-class RemoveUtil(object):
-    def __init__(self, mgr):
-        self.mgr = mgr
-        self.to_remove_osds: Set[OSDRemoval] = set()
-        self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict()
-
-    @property
-    def report(self) -> Set[OSDRemoval]:
-        return self.to_remove_osds.copy()
-
-    def queue_osds_for_removal(self, osds: Set[OSDRemoval]):
-        self.to_remove_osds.update(osds)
-
-    def _remove_osds_bg(self) -> None:
-        """
-        Performs actions in the _serve() loop to remove an OSD
-        when criteria is met.
-        """
-        logger.debug(
-            f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
-        self._update_osd_removal_status()
-        remove_osds: set = self.to_remove_osds.copy()
-        for osd in remove_osds:
-            if not osd.force:
-                self.drain_osd(osd.osd_id)
-                # skip criteria
-                if not self.is_empty(osd.osd_id):
-                    logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
-                    continue
-
-            if not self.ok_to_destroy([osd.osd_id]):
-                logger.info(
-                    f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
-                continue
-
-            # abort criteria
-            if not self.down_osd([osd.osd_id]):
-                # also remove it from the remove_osd list and set a health_check warning?
-                raise orchestrator.OrchestratorError(
-                    f"Could not set OSD <{osd.osd_id}> to 'down'")
-
-            if osd.replace:
-                if not self.destroy_osd(osd.osd_id):
-                    # also remove it from the remove_osd list and set a health_check warning?
-                    raise orchestrator.OrchestratorError(
-                        f"Could not destroy OSD <{osd.osd_id}>")
-            else:
-                if not self.purge_osd(osd.osd_id):
-                    # also remove it from the remove_osd list and set a health_check warning?
-                    raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
-
-            self.mgr._remove_daemon(osd.fullname, osd.nodename)
-            logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}")
-            logger.debug(f"Removing {osd.osd_id} from the queue.")
-            self.to_remove_osds.remove(osd)
-
-    def _update_osd_removal_status(self):
-        """
-        Generate a OSD report that can be printed to the CLI
-        """
-        logger.debug("Update OSD removal status")
-        for osd in self.to_remove_osds:
-            osd.pg_count = self.get_pg_count(str(osd.osd_id))
-        logger.debug(f"OSD removal status: {self.to_remove_osds}")
-
-    def drain_osd(self, osd_id: str) -> bool:
-        """
-        Uses `osd_support` module to schedule a drain operation of an OSD
-        """
-        cmd_args = {
-            'prefix': 'osd drain',
-            'osd_ids': [int(osd_id)]
-        }
-        return self._run_mon_cmd(cmd_args)
-
-    def get_pg_count(self, osd_id: str) -> int:
-        """ Queries for PG count of an OSD """
-        self.mgr.log.debug("Querying for drain status")
-        ret, out, err = self.mgr.mon_command({
-            'prefix': 'osd drain status',
-        })
-        if ret != 0:
-            self.mgr.log.error(f"Calling osd drain status failed with {err}")
-            raise OrchestratorError("Could not query `osd drain status`")
-        out = json.loads(out)
-        for o in out:
-            if str(o.get('osd_id', '')) == str(osd_id):
-                return int(o.get('pgs', -1))
-        return -1
-
-    def is_empty(self, osd_id: str) -> bool:
-        """ Checks if an OSD is empty """
-        return self.get_pg_count(osd_id) == 0
-
-    def ok_to_destroy(self, osd_ids: List[int]) -> bool:
-        """ Queries the safe-to-destroy flag for OSDs """
-        cmd_args = {'prefix': 'osd safe-to-destroy',
-                    'ids': osd_ids}
-        return self._run_mon_cmd(cmd_args)
-
-    def destroy_osd(self, osd_id: int) -> bool:
-        """ Destroys an OSD (forcefully) """
-        cmd_args = {'prefix': 'osd destroy-actual',
-                    'id': int(osd_id),
-                    'yes_i_really_mean_it': True}
-        return self._run_mon_cmd(cmd_args)
-
-    def down_osd(self, osd_ids: List[int]) -> bool:
-        """ Sets `out` flag to OSDs """
-        cmd_args = {
-            'prefix': 'osd down',
-            'ids': osd_ids,
-        }
-        return self._run_mon_cmd(cmd_args)
-
-    def purge_osd(self, osd_id: int) -> bool:
-        """ Purges an OSD from the cluster (forcefully) """
-        cmd_args = {
-            'prefix': 'osd purge-actual',
-            'id': int(osd_id),
-            'yes_i_really_mean_it': True
-        }
-        return self._run_mon_cmd(cmd_args)
-
-    def out_osd(self, osd_ids: List[int]) -> bool:
-        """ Sets `down` flag to OSDs """
-        cmd_args = {
-            'prefix': 'osd out',
-            'ids': osd_ids,
-        }
-        return self._run_mon_cmd(cmd_args)
-
-    def _run_mon_cmd(self, cmd_args: dict) -> bool:
-        """
-        Generic command to run mon_command and evaluate/log the results
-        """
-        ret, out, err = self.mgr.mon_command(cmd_args)
-        if ret != 0:
-            self.mgr.log.debug(f"ran {cmd_args} with mon_command")
-            self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
-            return False
-        self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
-        return True
diff --git a/src/pybind/mgr/cephadm/services/__init__.py b/src/pybind/mgr/cephadm/services/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/pybind/mgr/cephadm/services/nfs.py b/src/pybind/mgr/cephadm/services/nfs.py
new file mode 100644 (file)
index 0000000..2eaa934
--- /dev/null
@@ -0,0 +1,155 @@
+import logging
+import rados
+
+from typing import Dict, Optional
+
+from ceph.deployment.service_spec import NFSServiceSpec
+
+import cephadm
+from orchestrator import OrchestratorError
+
+from .. import utils
+
+logger = logging.getLogger(__name__)
+
+class NFSGanesha(object):
+    def __init__(self,
+                 mgr,
+                 daemon_id,
+                 spec):
+        # type: (cephadm.CephadmOrchestrator, str, NFSServiceSpec) -> None
+        assert spec.service_id and daemon_id.startswith(spec.service_id)
+        self.mgr = mgr
+        self.daemon_id = daemon_id
+        self.spec = spec
+
+    def get_daemon_name(self):
+        # type: () -> str
+        return '%s.%s' % (self.spec.service_type, self.daemon_id)
+
+    def get_rados_user(self):
+        # type: () -> str
+        return '%s.%s' % (self.spec.service_type, self.daemon_id)
+
+    def get_keyring_entity(self):
+        # type: () -> str
+        return utils.name_to_config_section(self.get_rados_user())
+
+    def get_or_create_keyring(self, entity=None):
+        # type: (Optional[str]) -> str
+        if not entity:
+            entity = self.get_keyring_entity()
+
+        logger.info('Create keyring: %s' % entity)
+        ret, keyring, err = self.mgr.mon_command({
+            'prefix': 'auth get-or-create',
+            'entity': entity,
+        })
+
+        if ret != 0:
+            raise OrchestratorError(
+                    'Unable to create keyring %s: %s %s' \
+                            % (entity, ret, err))
+        return keyring
+
+    def update_keyring_caps(self, entity=None):
+        # type: (Optional[str]) -> None
+        if not entity:
+            entity = self.get_keyring_entity()
+
+        osd_caps='allow rw pool=%s' % (self.spec.pool)
+        if self.spec.namespace:
+            osd_caps='%s namespace=%s' % (osd_caps, self.spec.namespace)
+
+        logger.info('Updating keyring caps: %s' % entity)
+        ret, out, err = self.mgr.mon_command({
+            'prefix': 'auth caps',
+            'entity': entity,
+            'caps': ['mon', 'allow r',
+                     'osd', osd_caps,
+                     'mds', 'allow rw'],
+        })
+
+        if ret != 0:
+            raise OrchestratorError(
+                    'Unable to update keyring caps %s: %s %s' \
+                            % (entity, ret, err))
+
+    def create_rados_config_obj(self, clobber=False):
+        # type: (Optional[bool]) -> None
+        obj = self.spec.rados_config_name()
+
+        with self.mgr.rados.open_ioctx(self.spec.pool) as ioctx:
+            if self.spec.namespace:
+                ioctx.set_namespace(self.spec.namespace)
+
+            exists = True
+            try:
+                ioctx.stat(obj)
+            except rados.ObjectNotFound as e:
+                exists = False
+
+            if exists and not clobber:
+                # Assume an existing config
+                logger.info('Rados config object exists: %s' % obj)
+            else:
+                # Create an empty config object
+                logger.info('Creating rados config object: %s' % obj)
+                ioctx.write_full(obj, ''.encode('utf-8'))
+
+    def get_ganesha_conf(self):
+        # type: () -> str
+        return '''# generated by cephadm
+NFS_CORE_PARAM {{
+        Enable_NLM = false;
+        Enable_RQUOTA = false;
+        Protocols = 4;
+}}
+
+CACHEINODE {{
+        Dir_Chunk = 0;
+        NParts = 1;
+        Cache_Size = 1;
+}}
+
+EXPORT_DEFAULTS {{
+        Attr_Expiration_Time = 0;
+}}
+
+NFSv4 {{
+        Delegations = false;
+        RecoveryBackend = 'rados_cluster';
+        Minor_Versions = 1, 2;
+}}
+
+RADOS_KV {{
+        UserId = "{user}";
+        nodeid = "{nodeid}";
+        pool = "{pool}";
+        namespace = "{namespace}";
+}}
+
+RADOS_URLS {{
+        UserId = "{user}";
+        watch_url = "{url}";
+}}
+
+%url    {url}
+'''.format(user=self.get_rados_user(),
+           nodeid=self.get_daemon_name(),
+           pool=self.spec.pool,
+           namespace=self.spec.namespace if self.spec.namespace else '',
+           url=self.spec.rados_config_location())
+
+    def get_cephadm_config(self):
+        # type: () -> Dict
+        config = {'pool' : self.spec.pool} # type: Dict
+        if self.spec.namespace:
+            config['namespace'] = self.spec.namespace
+        config['userid'] = self.get_rados_user()
+        config['extra_args'] = ['-N', 'NIV_EVENT']
+        config['files'] = {
+            'ganesha.conf' : self.get_ganesha_conf(),
+        }
+        logger.debug('Generated cephadm config-json: %s' % config)
+        return config
diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py
new file mode 100644 (file)
index 0000000..2ead0cb
--- /dev/null
@@ -0,0 +1,191 @@
+import datetime
+import json
+import logging
+import time
+
+from typing import List, Dict, Any, Set, Union
+
+import orchestrator
+from orchestrator import OrchestratorError
+
+logger = logging.getLogger(__name__)
+
+
+class OSDRemoval(object):
+    def __init__(self,
+                 osd_id: str,
+                 replace: bool,
+                 force: bool,
+                 nodename: str,
+                 fullname: str,
+                 start_at: datetime.datetime,
+                 pg_count: int):
+        self.osd_id = osd_id
+        self.replace = replace
+        self.force = force
+        self.nodename = nodename
+        self.fullname = fullname
+        self.started_at = start_at
+        self.pg_count = pg_count
+
+    # needed due to changing 'started_at' attr
+    def __eq__(self, other):
+        return self.osd_id == other.osd_id
+
+    def __hash__(self):
+        return hash(self.osd_id)
+
+    def __repr__(self):
+        return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}'
+                ', fullname={}, started_at={}, pg_count={})').format(
+            self.osd_id, self.replace, self.force, self.nodename,
+            self.fullname, self.started_at, self.pg_count)
+
+    @property
+    def pg_count_str(self) -> str:
+        return 'n/a' if self.pg_count < 0 else str(self.pg_count)
+
+
+class RemoveUtil(object):
+    def __init__(self, mgr):
+        self.mgr = mgr
+        self.to_remove_osds: Set[OSDRemoval] = set()
+        self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict()
+
+    @property
+    def report(self) -> Set[OSDRemoval]:
+        return self.to_remove_osds.copy()
+
+    def queue_osds_for_removal(self, osds: Set[OSDRemoval]):
+        self.to_remove_osds.update(osds)
+
+    def _remove_osds_bg(self) -> None:
+        """
+        Performs actions in the _serve() loop to remove an OSD
+        when criteria is met.
+        """
+        logger.debug(
+            f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
+        self._update_osd_removal_status()
+        remove_osds: set = self.to_remove_osds.copy()
+        for osd in remove_osds:
+            if not osd.force:
+                self.drain_osd(osd.osd_id)
+                # skip criteria
+                if not self.is_empty(osd.osd_id):
+                    logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+                    continue
+
+            if not self.ok_to_destroy([osd.osd_id]):
+                logger.info(
+                    f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+                continue
+
+            # abort criteria
+            if not self.down_osd([osd.osd_id]):
+                # also remove it from the remove_osd list and set a health_check warning?
+                raise orchestrator.OrchestratorError(
+                    f"Could not set OSD <{osd.osd_id}> to 'down'")
+
+            if osd.replace:
+                if not self.destroy_osd(osd.osd_id):
+                    # also remove it from the remove_osd list and set a health_check warning?
+                    raise orchestrator.OrchestratorError(
+                        f"Could not destroy OSD <{osd.osd_id}>")
+            else:
+                if not self.purge_osd(osd.osd_id):
+                    # also remove it from the remove_osd list and set a health_check warning?
+                    raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+
+            self.mgr._remove_daemon(osd.fullname, osd.nodename)
+            logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}")
+            logger.debug(f"Removing {osd.osd_id} from the queue.")
+            self.to_remove_osds.remove(osd)
+
+    def _update_osd_removal_status(self):
+        """
+        Generate a OSD report that can be printed to the CLI
+        """
+        logger.debug("Update OSD removal status")
+        for osd in self.to_remove_osds:
+            osd.pg_count = self.get_pg_count(str(osd.osd_id))
+        logger.debug(f"OSD removal status: {self.to_remove_osds}")
+
+    def drain_osd(self, osd_id: str) -> bool:
+        """
+        Uses `osd_support` module to schedule a drain operation of an OSD
+        """
+        cmd_args = {
+            'prefix': 'osd drain',
+            'osd_ids': [int(osd_id)]
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def get_pg_count(self, osd_id: str) -> int:
+        """ Queries for PG count of an OSD """
+        self.mgr.log.debug("Querying for drain status")
+        ret, out, err = self.mgr.mon_command({
+            'prefix': 'osd drain status',
+        })
+        if ret != 0:
+            self.mgr.log.error(f"Calling osd drain status failed with {err}")
+            raise OrchestratorError("Could not query `osd drain status`")
+        out = json.loads(out)
+        for o in out:
+            if str(o.get('osd_id', '')) == str(osd_id):
+                return int(o.get('pgs', -1))
+        return -1
+
+    def is_empty(self, osd_id: str) -> bool:
+        """ Checks if an OSD is empty """
+        return self.get_pg_count(osd_id) == 0
+
+    def ok_to_destroy(self, osd_ids: List[int]) -> bool:
+        """ Queries the safe-to-destroy flag for OSDs """
+        cmd_args = {'prefix': 'osd safe-to-destroy',
+                    'ids': osd_ids}
+        return self._run_mon_cmd(cmd_args)
+
+    def destroy_osd(self, osd_id: int) -> bool:
+        """ Destroys an OSD (forcefully) """
+        cmd_args = {'prefix': 'osd destroy-actual',
+                    'id': int(osd_id),
+                    'yes_i_really_mean_it': True}
+        return self._run_mon_cmd(cmd_args)
+
+    def down_osd(self, osd_ids: List[int]) -> bool:
+        """ Sets `out` flag to OSDs """
+        cmd_args = {
+            'prefix': 'osd down',
+            'ids': osd_ids,
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def purge_osd(self, osd_id: int) -> bool:
+        """ Purges an OSD from the cluster (forcefully) """
+        cmd_args = {
+            'prefix': 'osd purge-actual',
+            'id': int(osd_id),
+            'yes_i_really_mean_it': True
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def out_osd(self, osd_ids: List[int]) -> bool:
+        """ Sets `down` flag to OSDs """
+        cmd_args = {
+            'prefix': 'osd out',
+            'ids': osd_ids,
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def _run_mon_cmd(self, cmd_args: dict) -> bool:
+        """
+        Generic command to run mon_command and evaluate/log the results
+        """
+        ret, out, err = self.mgr.mon_command(cmd_args)
+        if ret != 0:
+            self.mgr.log.debug(f"ran {cmd_args} with mon_command")
+            self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
+            return False
+        self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
+        return True
index 2d9ba86ee1d1b10750fed143a29892bbea4e6a34..8226bdb2ed74f018749d44e85d12e29ec0a499db 100644 (file)
@@ -5,7 +5,7 @@ from contextlib import contextmanager
 import pytest
 
 from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
-from cephadm.osd import OSDRemoval
+from cephadm.services.osd import OSDRemoval
 
 try:
     from typing import Any, List
@@ -341,7 +341,7 @@ class TestCephadm(object):
             )
         ])
     ))
-    @mock.patch("cephadm.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
+    @mock.patch("cephadm.services.osd.RemoveUtil.get_pg_count", lambda _, __: 0)
     def test_remove_osds(self, cephadm_module):
         with self._with_host(cephadm_module, 'test'):
             c = cephadm_module.list_daemons(refresh=True)