]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: move process_removal_queue into OSDRemovalQueue
authorSebastian Wagner <sebastian.wagner@suse.com>
Fri, 8 Jan 2021 11:34:40 +0000 (12:34 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Fri, 22 Jan 2021 12:05:22 +0000 (13:05 +0100)
`process_removal_queue` belongs to OSDRemovalQueue
instead of RemoveUtil

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
(cherry picked from commit af52ba47a16703aa1aadbd59d84e8876bdad2f17)

src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/osd.py
src/pybind/mgr/cephadm/tests/test_cephadm.py
src/pybind/mgr/cephadm/tests/test_osd_removal.py

index 94367cb7bf589b3e9d85b37d966915388aa8dc69..d289e92735d78cab6a123d9ff086ae3eb8728dc3 100644 (file)
@@ -354,8 +354,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.cache.load()
 
         self.rm_util = RemoveUtil(self)
-        self.to_remove_osds = OSDRemovalQueue()
-        self.rm_util.load_from_store()
+        self.to_remove_osds = OSDRemovalQueue(self)
+        self.to_remove_osds.load_from_store()
 
         self.spec_store = SpecStore(self)
         self.spec_store.load()
@@ -503,7 +503,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                     self.log.debug(f"Found empty osd. Starting removal process")
                     # if the osd that is now empty is also part of the removal queue
                     # start the process
-                    self.rm_util.process_removal_queue()
+                    self._kick_serve_loop()
 
     def pause(self) -> None:
         if not self.paused:
index e1a90ee4314bf1b97bd390dcc80030a76f156d27..7808dc5baded94bac0433aa411476901e7e1d059 100644 (file)
@@ -63,7 +63,7 @@ class CephadmServe:
                 self._update_paused_health()
 
                 if not self.mgr.paused:
-                    self.mgr.rm_util.process_removal_queue()
+                    self.mgr.to_remove_osds.process_removal_queue()
 
                     self.mgr.migration.migrate()
                     if self.mgr.migration.is_migration_ongoing():
index 15f3bd04cb7681abfdb5eefe40f6360234f5f24d..8f6d7ff511c6b36a88501ebfef65e1a04445e01c 100644 (file)
@@ -304,76 +304,6 @@ class RemoveUtil(object):
     def __init__(self, mgr: "CephadmOrchestrator") -> None:
         self.mgr: "CephadmOrchestrator" = mgr
 
-    def process_removal_queue(self) -> None:
-        """
-        Performs actions in the _serve() loop to remove an OSD
-        when criteria is met.
-        """
-
-        # make sure that we don't run on OSDs that are not in the cluster anymore.
-        self.cleanup()
-
-        logger.debug(
-            f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled "
-            f"for removal: {self.mgr.to_remove_osds.all_osds()}")
-
-        # find osds that are ok-to-stop and not yet draining
-        ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds())
-        if ok_to_stop_osds:
-            # start draining those
-            _ = [osd.start_draining() for osd in ok_to_stop_osds]
-
-        # Check all osds for their state and take action (remove, purge etc)
-        to_remove_osds = self.mgr.to_remove_osds.all_osds()
-        new_queue: Set[OSD] = set()
-        for osd in to_remove_osds:  # type: OSD
-            if not osd.force:
-                # skip criteria
-                if not osd.is_empty:
-                    logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
-                    new_queue.add(osd)
-                    continue
-
-            if not osd.safe_to_destroy():
-                logger.info(
-                    f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
-                new_queue.add(osd)
-                continue
-
-            # abort criteria
-            if not osd.down():
-                # also remove it from the remove_osd list and set a health_check warning?
-                raise orchestrator.OrchestratorError(
-                    f"Could not set OSD <{osd.osd_id}> to 'down'")
-
-            if osd.replace:
-                if not osd.destroy():
-                    raise orchestrator.OrchestratorError(
-                        f"Could not destroy OSD <{osd.osd_id}>")
-            else:
-                if not osd.purge():
-                    raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
-
-            if not osd.exists:
-                continue
-            assert osd.fullname is not None
-            assert osd.hostname is not None
-            self.mgr._remove_daemon(osd.fullname, osd.hostname)
-            logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
-            logger.debug(f"Removing {osd.osd_id} from the queue.")
-
-        # self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI)
-        # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
-        # osds that were added while this method was executed'
-        self.mgr.to_remove_osds.intersection_update(new_queue)
-        self.save_to_store()
-
-    def cleanup(self) -> None:
-        # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
-        not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster()
-        for osd in not_in_cluster_osds:
-            self.mgr.to_remove_osds.remove(osd)
-
     def get_osds_in_cluster(self) -> List[str]:
         osd_map = self.mgr.get_osdmap()
         return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
@@ -481,19 +411,6 @@ class RemoveUtil(object):
         self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
         return True
 
-    def save_to_store(self) -> None:
-        osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()]
-        logger.debug(f"Saving {osd_queue} to store")
-        self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
-
-    def load_from_store(self) -> None:
-        for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
-            for osd in json.loads(v):
-                logger.debug(f"Loading osd ->{osd} from store")
-                osd_obj = OSD.from_json(osd, ctx=self)
-                if osd_obj is not None:
-                    self.mgr.to_remove_osds.add(osd_obj)
-
 
 class NotFoundError(Exception):
     pass
@@ -652,13 +569,13 @@ class OSD:
         return out
 
     @classmethod
-    def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]:
+    def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
         if not inp:
             return None
         for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
             if inp.get(date_field):
                 inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
-        inp.update({'remove_util': ctx})
+        inp.update({'remove_util': rm_util})
         if 'nodename' in inp:
             hostname = inp.pop('nodename')
             inp['hostname'] = hostname
@@ -678,9 +595,91 @@ class OSD:
 
 class OSDRemovalQueue(object):
 
-    def __init__(self) -> None:
+    def __init__(self, mgr: "CephadmOrchestrator") -> None:
+        self.mgr: "CephadmOrchestrator" = mgr
         self.osds: Set[OSD] = set()
 
+    def process_removal_queue(self) -> None:
+        """
+        Performs actions in the _serve() loop to remove an OSD
+        when criteria is met.
+        """
+
+        # make sure that we don't run on OSDs that are not in the cluster anymore.
+        self.cleanup()
+
+        logger.debug(
+            f"{self.queue_size()} OSDs are scheduled "
+            f"for removal: {self.all_osds()}")
+
+        # find osds that are ok-to-stop and not yet draining
+        ok_to_stop_osds = self.mgr.rm_util.find_osd_stop_threshold(self.idling_osds())
+        if ok_to_stop_osds:
+            # start draining those
+            _ = [osd.start_draining() for osd in ok_to_stop_osds]
+
+        # Check all osds for their state and take action (remove, purge etc)
+        new_queue: Set[OSD] = set()
+        for osd in self.all_osds():  # type: OSD
+            if not osd.force:
+                # skip criteria
+                if not osd.is_empty:
+                    logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+                    new_queue.add(osd)
+                    continue
+
+            if not osd.safe_to_destroy():
+                logger.info(
+                    f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+                new_queue.add(osd)
+                continue
+
+            # abort criteria
+            if not osd.down():
+                # also remove it from the remove_osd list and set a health_check warning?
+                raise orchestrator.OrchestratorError(
+                    f"Could not set OSD <{osd.osd_id}> to 'down'")
+
+            if osd.replace:
+                if not osd.destroy():
+                    raise orchestrator.OrchestratorError(
+                        f"Could not destroy OSD <{osd.osd_id}>")
+            else:
+                if not osd.purge():
+                    raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+
+            if not osd.exists:
+                continue
+            assert osd.fullname is not None
+            assert osd.hostname is not None
+            self.mgr._remove_daemon(osd.fullname, osd.hostname)
+            logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.hostname}")
+            logger.debug(f"Removing {osd.osd_id} from the queue.")
+
+        # self could change while this is processing (osds get added from the CLI)
+        # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
+        # osds that were added while this method was executed'
+        self.intersection_update(new_queue)
+        self.save_to_store()
+
+    def cleanup(self) -> None:
+        # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
+        for osd in self.not_in_cluster():
+            self.remove(osd)
+
+    def save_to_store(self) -> None:
+        osd_queue = [osd.to_json() for osd in self.all_osds()]
+        logger.debug(f"Saving {osd_queue} to store")
+        self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
+
+    def load_from_store(self) -> None:
+        for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
+            for osd in json.loads(v):
+                logger.debug(f"Loading osd ->{osd} from store")
+                osd_obj = OSD.from_json(osd, rm_util=self.mgr.rm_util)
+                if osd_obj is not None:
+                    self.osds.add(osd_obj)
+
     def as_osd_ids(self) -> List[int]:
         return [osd.osd_id for osd in self.osds]
 
index 2a26aa47c6dfa28b6967fd82951b5ada9d20bae3..0b1600c2ad0c1512183b0b6d3cfafdd68abb617b 100644 (file)
@@ -507,8 +507,8 @@ class TestCephadm(object):
                                                       process_started_at=datetime.datetime.utcnow(),
                                                       remove_util=cephadm_module.rm_util
                                                       ))
-            cephadm_module.rm_util.process_removal_queue()
-            assert cephadm_module.to_remove_osds == OSDRemovalQueue()
+            cephadm_module.to_remove_osds.process_removal_queue()
+            assert cephadm_module.to_remove_osds == OSDRemovalQueue(cephadm_module)
 
             c = cephadm_module.remove_osds_status()
             out = wait(cephadm_module, c)
index 76f067bb55a43ce1a3168c9ebbb1c0ea9900616e..9fd5f9b760ea7513ce2a2fda1b2df3fad94e7b30 100644 (file)
@@ -90,9 +90,9 @@ class TestOSDRemoval:
             }
         ])
         cephadm_module.set_store('osd_remove_queue', data)
-        cephadm_module.rm_util.load_from_store()
+        cephadm_module.to_remove_osds.load_from_store()
 
-        expected = OSDRemovalQueue()
+        expected = OSDRemovalQueue(cephadm_module)
         expected.add(OSD(osd_id=35, remove_util=rm_util, draining=True))
         assert cephadm_module.to_remove_osds == expected
 
@@ -218,7 +218,7 @@ class TestOSD:
 class TestOSDRemovalQueue:
 
     def test_queue_size(self, osd_obj):
-        q = OSDRemovalQueue()
+        q = OSDRemovalQueue(mock.Mock())
         assert q.queue_size() == 0
         q.add(osd_obj)
         assert q.queue_size() == 1
@@ -226,14 +226,14 @@ class TestOSDRemovalQueue:
     @mock.patch("cephadm.services.osd.OSD.start")
     @mock.patch("cephadm.services.osd.OSD.exists")
     def test_enqueue(self, exist, start, osd_obj):
-        q = OSDRemovalQueue()
+        q = OSDRemovalQueue(mock.Mock())
         q.enqueue(osd_obj)
         osd_obj.start.assert_called_once()
 
     @mock.patch("cephadm.services.osd.OSD.stop")
     @mock.patch("cephadm.services.osd.OSD.exists")
     def test_rm_raise(self, exist, stop, osd_obj):
-        q = OSDRemovalQueue()
+        q = OSDRemovalQueue(mock.Mock())
         with pytest.raises(KeyError):
             q.rm(osd_obj)
             osd_obj.stop.assert_called_once()
@@ -241,7 +241,7 @@ class TestOSDRemovalQueue:
     @mock.patch("cephadm.services.osd.OSD.stop")
     @mock.patch("cephadm.services.osd.OSD.exists")
     def test_rm(self, exist, stop, osd_obj):
-        q = OSDRemovalQueue()
+        q = OSDRemovalQueue(mock.Mock())
         q.add(osd_obj)
         q.rm(osd_obj)
         osd_obj.stop.assert_called_once()