]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/mds_autoscaler: plugin to configure mds instances 32731/head
authorMilind Changire <mchangir@redhat.com>
Fri, 7 Aug 2020 11:39:10 +0000 (17:09 +0530)
committerMilind Changire <mchangir@redhat.com>
Fri, 7 Aug 2020 11:39:10 +0000 (17:09 +0530)
mgr plugin to deploy and configure MDSs in response to degraded file system

MDS instance management as per changes to:
* 'max_mds' option
* 'standby_count_wanted' option
* mds liveness and transitions from standby to active

mds_autoscaler plugin test credit goes to Sebastian Wagner.

Fixes: https://tracker.ceph.com/issues/40929
Signed-off-by: Milind Changire <mchangir@redhat.com>
ceph.spec.in
src/pybind/mgr/mds_autoscaler/__init__.py [new file with mode: 0644]
src/pybind/mgr/mds_autoscaler/module.py [new file with mode: 0644]
src/pybind/mgr/mds_autoscaler/tests/__init__.py [new file with mode: 0644]
src/pybind/mgr/mds_autoscaler/tests/test_autoscaler.py [new file with mode: 0644]
src/pybind/mgr/tox.ini
src/pybind/mgr/volumes/fs/operations/volume.py

index 4cd0e3e74e2a1b1a88214de27c6e37bcc8a23c7b..3a1aca596c2ef2bea8a25dc844675cfb03c54124 100644 (file)
@@ -1655,6 +1655,7 @@ fi
 %{_datadir}/ceph/mgr/insights
 %{_datadir}/ceph/mgr/iostat
 %{_datadir}/ceph/mgr/localpool
+%{_datadir}/ceph/mgr/mds_autoscaler
 %{_datadir}/ceph/mgr/orchestrator
 %{_datadir}/ceph/mgr/osd_perf_query
 %{_datadir}/ceph/mgr/pg_autoscaler
diff --git a/src/pybind/mgr/mds_autoscaler/__init__.py b/src/pybind/mgr/mds_autoscaler/__init__.py
new file mode 100644 (file)
index 0000000..85a3241
--- /dev/null
@@ -0,0 +1,7 @@
+import os
+
+if 'UNITTEST' in os.environ:
+    import tests
+    tests.mock_ceph_modules()  # type: ignore
+
+from .module import MDSAutoscaler
diff --git a/src/pybind/mgr/mds_autoscaler/module.py b/src/pybind/mgr/mds_autoscaler/module.py
new file mode 100644 (file)
index 0000000..f830e61
--- /dev/null
@@ -0,0 +1,152 @@
+"""
+Automatically scale MDSs based on status of the file-system using the FSMap
+"""
+
+import logging
+from typing import Optional, List, Set
+from mgr_module import MgrModule
+from ceph.deployment.service_spec import ServiceSpec
+import orchestrator
+import copy
+
+log = logging.getLogger(__name__)
+
+
+class MDSAutoscaler(orchestrator.OrchestratorClientMixin, MgrModule):
+    """
+    MDS autoscaler.
+    """
+    def __init__(self, *args, **kwargs):
+        MgrModule.__init__(self, *args, **kwargs)
+        self.set_mgr(self)
+
+    def get_service(self, fs_name: str) -> List[orchestrator.ServiceDescription]:
+        service = f"mds.{fs_name}"
+        completion = self.describe_service(service_type='mds',
+                                           service_name=service,
+                                           refresh=True)
+        self._orchestrator_wait([completion])
+        orchestrator.raise_if_exception(completion)
+        return completion.result
+
+    def get_daemons(self, fs_name: str) -> List[orchestrator.DaemonDescription]:
+        service = f"mds.{fs_name}"
+        completion = self.list_daemons(service_name=service)
+        self._orchestrator_wait([completion])
+        orchestrator.raise_if_exception(completion)
+        return completion.result
+
+    def update_daemon_count(self, fs_name: str, abscount: int) -> ServiceSpec:
+        svclist = self.get_service(fs_name)
+
+        assert svclist is not None
+        assert len(svclist) > 0
+        
+        svc = svclist[0]
+
+        assert svc.spec.placement.count != abscount
+
+        ps = copy.deepcopy(svc.spec.placement)
+        ps.count = abscount
+        newspec = ServiceSpec(service_type=svc.spec.service_type,
+                              service_id=svc.spec.service_id,
+                              placement=ps)
+        return newspec
+
+    def get_required_standby_count(self, fs_map: dict, fs_name: str) -> int:
+        assert fs_map is not None
+        for fs in fs_map['filesystems']:
+            if fs['mdsmap']['fs_name'] == fs_name:
+                return fs['mdsmap']['standby_count_wanted']
+        assert False
+
+    def get_current_standby_count(self, fs_map: dict, fs_name: str, daemons: List[orchestrator.DaemonDescription]) -> int:
+        # standbys are not grouped by filesystems in fs_map
+        # available = standby_replay + standby_active
+        assert fs_map is not None
+        total = 0
+        daemon_names = {
+            d.name() for d in daemons
+        }
+        for sb in fs_map['standbys']:
+            full_name = f"mds.{sb['name']}"
+            if full_name in daemon_names:
+                total += 1
+        return total
+
+    def get_active_names(self, fs_map: dict, fs_name: str) -> Set[str]:
+        active_names = set()
+        for fs in fs_map['filesystems']:
+            if fs['mdsmap']['fs_name'] == fs_name:
+                for active in fs['mdsmap']['up']:
+                    gid = fs['mdsmap']['up'][active]
+                    gid_key = f"gid_{gid}"
+                    active_names.add(f"mds.{fs['mdsmap']['info'][gid_key]['name']}")
+        return active_names
+
+    def get_current_active_count(self, fs_map: dict, fs_name: str, daemons: List[orchestrator.DaemonDescription]) -> int:
+        assert fs_map is not None
+        total = 0
+        daemon_names = {
+            d.name() for d in daemons
+        }
+        active_names = self.get_active_names(fs_map, fs_name)
+        return len(daemon_names.intersection(active_names))
+
+    def get_required_max_mds(self, fs_map: dict, fs_name: str) -> int:
+        assert fs_map is not None
+        for fs in fs_map['filesystems']:
+            if fs['mdsmap']['fs_name'] == fs_name:
+                return fs['mdsmap']['max_mds']
+        assert False
+
+    def verify_and_manage_mds_instance(self, fs_map: dict, fs_name: str):
+        assert fs_map is not None
+
+        try:
+            daemons = self.get_daemons(fs_name)
+            standbys_required = self.get_required_standby_count(fs_map, fs_name)
+            standbys_current = self.get_current_standby_count(fs_map, fs_name, daemons)
+            active = self.get_current_active_count(fs_map, fs_name, daemons)
+            max_mds_required = self.get_required_max_mds(fs_map, fs_name)
+
+            self.log.info(f"fs_name:{fs_name} "
+                          f"standbys_required:{standbys_required}, "
+                          f"standbys_current:{standbys_current}, "
+                          f"active:{active}, "
+                          f"max_mds_required:{max_mds_required}")
+
+            total_current = standbys_current + active
+            total_required = max_mds_required + standbys_required
+            self.log.info(f"fs:{fs_name} total_required:{total_required}, total_current:{total_current}")
+
+            if total_required < total_current:
+                self.log.info(f"fs:{fs_name}, killing {total_current - total_required} standby mds ...")
+            elif total_required > total_current:
+                self.log.info(f"fs:{fs_name}, spawning {total_required - total_current} standby mds ...")
+            else:
+                self.log.info(f"fs:{fs_name} no change to mds count")
+                return
+
+            newspec = self.update_daemon_count(fs_name, total_required)
+
+            self.log.info(f"fs:{fs_name}, new placement count:{newspec.placement.count}")
+
+            completion = self.apply_mds(newspec)
+            self._orchestrator_wait([completion])
+            orchestrator.raise_if_exception(completion)
+        except orchestrator.OrchestratorError as e:
+            self.log.exception(f"fs:{fs_name} exception while verifying mds status: {e}")
+            pass
+
+    def notify(self, notify_type, notify_id):
+        if notify_type != 'fs_map':
+            return
+        fs_map = self.get('fs_map')
+        if not fs_map:
+            return
+        # we don't know for which fs config has been changed
+        for fs in fs_map['filesystems']:
+            fs_name = fs['mdsmap']['fs_name']
+            self.log.info(f"processing fs:{fs_name}")
+            self.verify_and_manage_mds_instance(fs_map, fs_name)
diff --git a/src/pybind/mgr/mds_autoscaler/tests/__init__.py b/src/pybind/mgr/mds_autoscaler/tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/pybind/mgr/mds_autoscaler/tests/test_autoscaler.py b/src/pybind/mgr/mds_autoscaler/tests/test_autoscaler.py
new file mode 100644 (file)
index 0000000..efdd2dd
--- /dev/null
@@ -0,0 +1,95 @@
+import pytest
+from unittest import mock
+
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
+from orchestrator import DaemonDescription, Completion, ServiceDescription
+
+try:
+    from typing import Any, List
+except ImportError:
+    pass
+
+from mds_autoscaler.module import MDSAutoscaler
+
+
+
+@pytest.yield_fixture()
+def mds_autoscaler_module():
+
+    with mock.patch("mds_autoscaler.module.MDSAutoscaler._orchestrator_wait"):
+        m = MDSAutoscaler('cephadm', 0, 0)
+
+        yield m
+
+
+class TestCephadm(object):
+
+    @mock.patch("mds_autoscaler.module.MDSAutoscaler.get")
+    @mock.patch("mds_autoscaler.module.MDSAutoscaler.list_daemons")
+    @mock.patch("mds_autoscaler.module.MDSAutoscaler.describe_service")
+    @mock.patch("mds_autoscaler.module.MDSAutoscaler.apply_mds")
+    def test_scale_up(self, _apply_mds, _describe_service, _list_daemons, _get, mds_autoscaler_module: MDSAutoscaler):
+        daemons = Completion(value=[
+            DaemonDescription(
+                hostname='myhost',
+                daemon_type='mds',
+                daemon_id='fs_name.myhost.a'
+            ),
+            DaemonDescription(
+                hostname='myhost',
+                daemon_type='mds',
+                daemon_id='fs_name.myhost.b'
+            ),
+        ])
+        daemons.finalize()
+        _list_daemons.return_value = daemons
+
+        services = Completion(value=[
+            ServiceDescription(
+                spec=ServiceSpec(
+                    service_type='mds',
+                    service_id='fs_name',
+                    placement=PlacementSpec(
+                        count=2
+                    )
+                )
+            )
+        ])
+        services.finalize()
+        _describe_service.return_value = services
+
+        apply = Completion(value='')
+        apply.finalize()
+        _apply_mds.return_value = apply
+
+
+        _get.return_value = {
+            'filesystems': [
+                {
+                    'mdsmap': {
+                        'fs_name': 'fs_name',
+                        'in': [
+                            {
+                                'name': 'mds.fs_name.myhost.a',
+                            }
+                        ],
+                        'standby_count_wanted': 2,
+                    }
+                }
+            ],
+            'standbys': [
+                {
+                    'name': 'mds.fs_name.myhost.b',
+                }
+            ],
+
+        }
+        mds_autoscaler_module.notify('fs_map', None)
+
+        _apply_mds.assert_called_with(ServiceSpec(
+            service_type='mds',
+            service_id='fs_name',
+            placement=PlacementSpec(
+                count=3
+            )
+        ))
index 3ceba83386bb40316b6bdac95737a3eb50568c51..254d19ed340812a9536602ec95892ba43c1fc24f 100644 (file)
@@ -60,6 +60,7 @@ commands =
            progress/module.py \
            rook/module.py \
            test_orchestrator/module.py \
+           mds_autoscaler/module.py \
            volumes/__init__.py
 
 [testenv:test]
index 441a877f28eb771457194c63354701e2c30d0412..80a8090db9b88f8142a719f35acc4434dc186301 100644 (file)
@@ -50,7 +50,7 @@ def create_volume(mgr, volname, placement):
     """
     metadata_pool, data_pool = gen_pool_names(volname)
     # create pools
-    r, outs, outb = create_pool(mgr, metadata_pool)
+    r, outb, outs = create_pool(mgr, metadata_pool)
     if r != 0:
         return r, outb, outs
     r, outb, outs = create_pool(mgr, data_pool)
@@ -65,9 +65,7 @@ def create_volume(mgr, volname, placement):
         #cleanup
         remove_pool(mgr, data_pool)
         remove_pool(mgr, metadata_pool)
-        return r, outb, outs
-    # create mds
-    return create_mds(mgr, volname, placement)
+    return r, outb, outs
 
 
 def delete_volume(mgr, volname, metadata_pool, data_pools):