]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: Move remove osd code to osd.py
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 28 Feb 2020 09:11:04 +0000 (10:11 +0100)
committerSage Weil <sage@redhat.com>
Fri, 6 Mar 2020 18:29:15 +0000 (12:29 -0600)
Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/cephadm/_utils.py [deleted file]
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/osd.py [new file with mode: 0644]
src/pybind/mgr/tox.ini

diff --git a/src/pybind/mgr/cephadm/_utils.py b/src/pybind/mgr/cephadm/_utils.py
deleted file mode 100644 (file)
index 22c8ed4..0000000
+++ /dev/null
@@ -1,91 +0,0 @@
-import json
-try:
-    from typing import List
-except ImportError:
-    pass
-
-from orchestrator import OrchestratorError
-
-
-class RemoveUtil(object):
-    def __init__(self, mgr):
-        self.mgr = mgr
-
-    def drain_osd(self, osd_id: str) -> bool:
-        """
-        Uses `osd_support` module to schedule a drain operation of an OSD
-        """
-        cmd_args = {
-            'prefix': 'osd drain',
-            'osd_ids': [int(osd_id)]
-        }
-        return self._run_mon_cmd(cmd_args)
-
-    def get_pg_count(self, osd_id: str) -> int:
-        """ Queries for PG count of an OSD """
-        self.mgr.log.debug("Querying for drain status")
-        ret, out, err = self.mgr.mon_command({
-            'prefix': 'osd drain status',
-        })
-        if ret != 0:
-            self.mgr.log.error(f"Calling osd drain status failed with {err}")
-            raise OrchestratorError("Could not query `osd drain status`")
-        out = json.loads(out)
-        for o in out:
-            if str(o.get('osd_id', '')) == str(osd_id):
-                return int(o.get('pgs', -1))
-        return -1
-
-    def is_empty(self, osd_id: str) -> bool:
-        """ Checks if an OSD is empty """
-        return self.get_pg_count(osd_id) == 0
-
-    def ok_to_destroy(self, osd_ids: List[int]) -> bool:
-        """ Queries the safe-to-destroy flag for OSDs """
-        cmd_args = {'prefix': 'osd safe-to-destroy',
-                    'ids': osd_ids}
-        return self._run_mon_cmd(cmd_args)
-
-    def destroy_osd(self, osd_id: int) -> bool:
-        """ Destroys an OSD (forcefully) """
-        cmd_args = {'prefix': 'osd destroy-actual',
-                    'id': int(osd_id),
-                    'yes_i_really_mean_it': True}
-        return self._run_mon_cmd(cmd_args)
-
-    def down_osd(self, osd_ids: List[int]) -> bool:
-        """ Sets `out` flag to OSDs """
-        cmd_args = {
-            'prefix': 'osd down',
-            'ids': osd_ids,
-        }
-        return self._run_mon_cmd(cmd_args)
-
-    def purge_osd(self, osd_id: int) -> bool:
-        """ Purges an OSD from the cluster (forcefully) """
-        cmd_args = {
-            'prefix': 'osd purge-actual',
-            'id': int(osd_id),
-            'yes_i_really_mean_it': True
-        }
-        return self._run_mon_cmd(cmd_args)
-
-    def out_osd(self, osd_ids: List[int]) -> bool:
-        """ Sets `down` flag to OSDs """
-        cmd_args = {
-            'prefix': 'osd out',
-            'ids': osd_ids,
-        }
-        return self._run_mon_cmd(cmd_args)
-
-    def _run_mon_cmd(self, cmd_args: dict) -> bool:
-        """
-        Generic command to run mon_command and evaluate/log the results
-        """
-        ret, out, err = self.mgr.mon_command(cmd_args)
-        if ret != 0:
-            self.mgr.log.debug(f"ran {cmd_args} with mon_command")
-            self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
-            return False
-        self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
-        return True
index 8c43daff2afedadfc294736a0f2340ae743e41d6..5db8f2d54429bb80ed0f7793aec138f8d3afc742 100644 (file)
@@ -10,7 +10,7 @@ from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException
 
 import string
 try:
-    from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple, Iterator
+    from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple, Iterator, Set
     from typing import TYPE_CHECKING
 except ImportError:
     TYPE_CHECKING = False  # just for type checking
@@ -37,7 +37,7 @@ from orchestrator import OrchestratorError, HostPlacementSpec, OrchestratorValid
     CLICommandMeta, ServiceSpec
 
 from . import remotes
-from ._utils import RemoveUtil
+from .osd import RemoveUtil, OSDRemoval
 
 
 try:
@@ -85,20 +85,6 @@ except ImportError:
         def __exit__(self, exc_type, exc_value, traceback):
             self.cleanup()
 
-class OSDRemoval(NamedTuple):
-    osd_id: int
-    replace: bool
-    force: bool
-    nodename: str
-    fullname: str
-    started_at: datetime.datetime
-
-    # needed due to changing 'started_at' attr
-    def __eq__(self, other):
-        return self.osd_id == other.osd_id
-
-    def __hash__(self):
-        return hash(self.osd_id)
 
 # high-level TODO:
 #  - bring over some of the protections from ceph-deploy that guard against
@@ -668,8 +654,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         self.cache = HostCache(self)
         self.cache.load()
-        self.to_remove_osds: set = set()
-        self.osd_removal_report: dict = dict()
         self.rm_util = RemoveUtil(self)
 
         self.spec_store = SpecStore(self)
@@ -1068,8 +1052,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
             self._check_for_strays()
 
-            self._remove_osds_bg()
-
             if self._apply_all_services():
                 continue  # did something, refresh
 
@@ -2856,7 +2838,7 @@ receivers:
         """
 
         daemons = self.cache.get_daemons_by_service('osd')
-        found = set()
+        found: Set[OSDRemoval] = set()
         for daemon in daemons:
             if daemon.daemon_id not in osd_ids:
                 continue
@@ -2864,86 +2846,21 @@ receivers:
                                  daemon.hostname, daemon.name(),
                                  datetime.datetime.utcnow()))
 
-        not_found: set = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
+        not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
         if not_found:
             raise OrchestratorError('Unable to find OSD: %s' % not_found)
 
         for osd in found:
-            self.to_remove_osds.add(osd)
+            self.rm_util.to_remove_osds.add(osd)
             # trigger the serve loop to initiate the removal
         self._kick_serve_loop()
         return trivial_result(f"Scheduled OSD(s) for removal")
 
-    def _remove_osds_bg(self) -> None:
-        """
-        Performs actions in the _serve() loop to remove an OSD
-        when criteria is met.
-        """
-        self.log.debug(f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
-        self.osd_removal_report = self._generate_osd_removal_status()
-        remove_osds: set = self.to_remove_osds.copy()
-        for osd in remove_osds:
-            if not osd.force:
-                self.rm_util.drain_osd(osd.osd_id)
-                # skip criteria
-                if not self.rm_util.is_empty(osd.osd_id):
-                    self.log.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
-                    continue
-
-            if not self.rm_util.ok_to_destroy([osd.osd_id]):
-                self.log.info(f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
-                continue
-
-            # abort criteria
-            if not self.rm_util.down_osd([osd.osd_id]):
-                # also remove it from the remove_osd list and set a health_check warning?
-                raise orchestrator.OrchestratorError(f"Could not set OSD <{osd.osd_id}> to 'down'")
-
-            if osd.replace:
-                if not self.rm_util.destroy_osd(osd.osd_id):
-                    # also remove it from the remove_osd list and set a health_check warning?
-                    raise orchestrator.OrchestratorError(f"Could not destroy OSD <{osd.osd_id}>")
-            else:
-                if not self.rm_util.purge_osd(osd.osd_id):
-                    # also remove it from the remove_osd list and set a health_check warning?
-                    raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
-
-            completion = self._remove_daemons([(osd.fullname, osd.nodename, True)])
-            completion.add_progress('Removing OSDs', self)
-            completion.update_progress = True
-            if completion:
-                while not completion.has_result:
-                    self.process([completion])
-                    if completion.needs_result:
-                        time.sleep(1)
-                    else:
-                        break
-                if completion.exception is not None:
-                    self.log.error(str(completion.exception))
-            else:
-                raise orchestrator.OrchestratorError("Did not receive a completion from _remove_daemon")
-
-            self.log.info(f"Successfully removed removed OSD <{osd.osd_id}> on {osd.nodename}")
-            self.log.debug(f"Removing {osd.osd_id} from the queue.")
-            self.to_remove_osds.remove(osd)
-
-    def _generate_osd_removal_status(self) -> Dict[Any, object]:
-        """
-        Generate a OSD report that can be printed to the CLI
-        """
-        self.log.debug("Assembling report for osd rm status")
-        report = {}
-        for osd in self.to_remove_osds:
-            pg_count = self.rm_util.get_pg_count(osd.osd_id)
-            report[osd] = pg_count if pg_count != -1 else 'n/a'
-        self.log.debug(f"Reporting: {report}")
-        return report
-
     def remove_osds_status(self) -> orchestrator.Completion:
         """
         The CLI call to retrieve an osd removal report
         """
-        return trivial_result(self.osd_removal_report)
+        return trivial_result(self.rm_util.osd_removal_report)
 
     def list_specs(self) -> orchestrator.Completion:
         """
diff --git a/src/pybind/mgr/cephadm/osd.py b/src/pybind/mgr/cephadm/osd.py
new file mode 100644 (file)
index 0000000..0a8d9c4
--- /dev/null
@@ -0,0 +1,186 @@
+import datetime
+import json
+import logging
+import time
+
+from typing import List, NamedTuple, Dict, Any, Set
+
+import orchestrator
+from orchestrator import OrchestratorError
+
+logger = logging.getLogger(__name__)
+
+
+class OSDRemoval(NamedTuple):
+    osd_id: int
+    replace: bool
+    force: bool
+    nodename: str
+    fullname: str
+    started_at: datetime.datetime
+
+    # needed due to changing 'started_at' attr
+    def __eq__(self, other):
+        return self.osd_id == other.osd_id
+
+    def __hash__(self):
+        return hash(self.osd_id)
+
+
+class RemoveUtil(object):
+    def __init__(self, mgr):
+        self.mgr = mgr
+        self.to_remove_osds: Set[OSDRemoval] = set()
+        self.osd_removal_report: dict = dict()
+        self.log = logger
+        self.rm_util = self
+
+
+    def _remove_osds_bg(self) -> None:
+        """
+        Performs actions in the _serve() loop to remove an OSD
+        when criteria is met.
+        """
+        self.log.debug(
+            f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
+        self.osd_removal_report = self._generate_osd_removal_status()
+        remove_osds: set = self.to_remove_osds.copy()
+        for osd in remove_osds:
+            if not osd.force:
+                self.rm_util.drain_osd(osd.osd_id)
+                # skip criteria
+                if not self.rm_util.is_empty(osd.osd_id):
+                    self.log.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+                    continue
+
+            if not self.rm_util.ok_to_destroy([osd.osd_id]):
+                self.log.info(
+                    f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+                continue
+
+            # abort criteria
+            if not self.rm_util.down_osd([osd.osd_id]):
+                # also remove it from the remove_osd list and set a health_check warning?
+                raise orchestrator.OrchestratorError(
+                    f"Could not set OSD <{osd.osd_id}> to 'down'")
+
+            if osd.replace:
+                if not self.rm_util.destroy_osd(osd.osd_id):
+                    # also remove it from the remove_osd list and set a health_check warning?
+                    raise orchestrator.OrchestratorError(
+                        f"Could not destroy OSD <{osd.osd_id}>")
+            else:
+                if not self.rm_util.purge_osd(osd.osd_id):
+                    # also remove it from the remove_osd list and set a health_check warning?
+                    raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+
+            completion = self.mgr._remove_daemon([(osd.fullname, osd.nodename, True)])
+            completion.add_progress('Removing OSDs', self)
+            completion.update_progress = True
+            if completion:
+                while not completion.has_result:
+                    self.mgr.process([completion])
+                    if completion.needs_result:
+                        time.sleep(1)
+                    else:
+                        break
+                if completion.exception is not None:
+                    self.log.error(str(completion.exception))
+            else:
+                raise orchestrator.OrchestratorError(
+                    "Did not receive a completion from _remove_daemon")
+
+            self.log.info(f"Successfully removed removed OSD <{osd.osd_id}> on {osd.nodename}")
+            self.log.debug(f"Removing {osd.osd_id} from the queue.")
+            self.to_remove_osds.remove(osd)
+
+    def _generate_osd_removal_status(self) -> Dict[Any, object]:
+        """
+        Generate a OSD report that can be printed to the CLI
+        """
+        self.log.debug("Assembling report for osd rm status")
+        report = {}
+        for osd in self.to_remove_osds:
+            pg_count = self.get_pg_count(str(osd.osd_id))
+            report[osd] = pg_count if pg_count != -1 else 'n/a'
+        self.log.debug(f"Reporting: {report}")
+        return report
+
+    def drain_osd(self, osd_id: str) -> bool:
+        """
+        Uses `osd_support` module to schedule a drain operation of an OSD
+        """
+        cmd_args = {
+            'prefix': 'osd drain',
+            'osd_ids': [int(osd_id)]
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def get_pg_count(self, osd_id: str) -> int:
+        """ Queries for PG count of an OSD """
+        self.mgr.log.debug("Querying for drain status")
+        ret, out, err = self.mgr.mon_command({
+            'prefix': 'osd drain status',
+        })
+        if ret != 0:
+            self.mgr.log.error(f"Calling osd drain status failed with {err}")
+            raise OrchestratorError("Could not query `osd drain status`")
+        out = json.loads(out)
+        for o in out:
+            if str(o.get('osd_id', '')) == str(osd_id):
+                return int(o.get('pgs', -1))
+        return -1
+
+    def is_empty(self, osd_id: str) -> bool:
+        """ Checks if an OSD is empty """
+        return self.get_pg_count(osd_id) == 0
+
+    def ok_to_destroy(self, osd_ids: List[int]) -> bool:
+        """ Queries the safe-to-destroy flag for OSDs """
+        cmd_args = {'prefix': 'osd safe-to-destroy',
+                    'ids': osd_ids}
+        return self._run_mon_cmd(cmd_args)
+
+    def destroy_osd(self, osd_id: int) -> bool:
+        """ Destroys an OSD (forcefully) """
+        cmd_args = {'prefix': 'osd destroy-actual',
+                    'id': int(osd_id),
+                    'yes_i_really_mean_it': True}
+        return self._run_mon_cmd(cmd_args)
+
+    def down_osd(self, osd_ids: List[int]) -> bool:
+        """ Sets `out` flag to OSDs """
+        cmd_args = {
+            'prefix': 'osd down',
+            'ids': osd_ids,
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def purge_osd(self, osd_id: int) -> bool:
+        """ Purges an OSD from the cluster (forcefully) """
+        cmd_args = {
+            'prefix': 'osd purge-actual',
+            'id': int(osd_id),
+            'yes_i_really_mean_it': True
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def out_osd(self, osd_ids: List[int]) -> bool:
+        """ Sets `down` flag to OSDs """
+        cmd_args = {
+            'prefix': 'osd out',
+            'ids': osd_ids,
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def _run_mon_cmd(self, cmd_args: dict) -> bool:
+        """
+        Generic command to run mon_command and evaluate/log the results
+        """
+        ret, out, err = self.mgr.mon_command(cmd_args)
+        if ret != 0:
+            self.mgr.log.debug(f"ran {cmd_args} with mon_command")
+            self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
+            return False
+        self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
+        return True
index 22803ab3ebadd89ec42fdeea509aba47067ebf3b..ee20bbf51ce6ac57ba527d5877dbffdf6ddcabdf 100644 (file)
@@ -14,7 +14,6 @@ deps =
     mypy
 commands = mypy --config-file=../../mypy.ini \
            cephadm/module.py \
-           cephadm/_utils.py \
            mgr_module.py \
            dashboard/module.py \
            mgr_util.py \