]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
qa/suites/rados/singleton/all: init mon-stretch-pool.yaml
authorKamoltat <ksirivad@redhat.com>
Mon, 25 Mar 2024 19:42:25 +0000 (19:42 +0000)
committerKamoltat <ksirivad@redhat.com>
Wed, 17 Jul 2024 22:12:04 +0000 (22:12 +0000)
Test the following new Ceph CLI commands:

`ceph osd pool stretch set`
`ceph osd pool stretch unset`
`ceph osd pool stretch show`

`qa/workunits/mon/mon-stretch-pool.sh`

will create the stretch cluster
while performing input validation for the CLI
Commands mentioned above.

`qa/tasks/stretch_cluster.py`

is in charge of
setting a pool to stretch cluster
and checks whether it prevents PGs
from the going active when there is not
enough buckets available in the acting
set of PGs to go active.

Also, test different MON fail over scenarios
after setting pool as stretch

`qa/suites/rados/singleton/all/mon-stretch-pool.yaml`

brings the scripts together.

Fixes: https://tracker.ceph.com/issues/64802
Signed-off-by: Kamoltat <ksirivad@redhat.com>
qa/suites/rados/singleton/all/mon-stretch-pool.yaml [new file with mode: 0755]
qa/tasks/ceph_manager.py
qa/tasks/ceph_test_case.py
qa/tasks/stretch_cluster.py [new file with mode: 0644]
qa/workunits/mon/mon-stretch-pool.sh [new file with mode: 0755]

diff --git a/qa/suites/rados/singleton/all/mon-stretch-pool.yaml b/qa/suites/rados/singleton/all/mon-stretch-pool.yaml
new file mode 100755 (executable)
index 0000000..1c55a21
--- /dev/null
@@ -0,0 +1,67 @@
+roles:
+- - mon.a
+  - mon.b
+  - mon.c
+  - osd.0
+  - osd.1
+  - osd.2
+  - mgr.x
+  - client.0
+  - mon.d
+  - mon.e
+  - mon.f
+  - osd.3
+  - osd.4
+  - osd.5
+  - mon.g
+  - mon.h
+  - mon.i
+  - osd.6
+  - osd.7
+  - osd.8
+
+openstack:
+  - volumes: # attached to each instance
+      count: 3
+      size: 10 # GB
+overrides:
+  ceph:
+    conf:
+      global:
+        mon election default strategy: 3
+      mon:
+        client mount timeout: 60
+        osd pool default size: 6
+        osd_pool_default_min_size: 3
+        osd_pool_default_pg_autoscale_mode: off
+        debug mon: 30
+tasks:
+- install:
+- ceph:
+    pre-mgr-commands:
+      - sudo ceph config set mgr mgr_pool false --force
+    log-ignorelist:
+      - overall HEALTH_
+      - \(OSDMAP_FLAGS\)
+      - \(OSD_
+      - \(PG_
+      - \(POOL_
+      - \(CACHE_POOL_
+      - \(OBJECT_
+      - \(SLOW_OPS\)
+      - \(REQUEST_SLOW\)
+      - \(TOO_FEW_PGS\)
+      - slow request
+      - \(POOL_APP_NOT_ENABLED\)
+      - overall HEALTH_
+      - \(MGR_DOWN\)
+      - \(MON_DOWN\)
+      - \(PG_AVAILABILITY\)
+      - \(SLOW_OPS\)
+- workunit:
+    clients:
+      client.0:
+        - mon/mon-stretch-pool.sh
+- cephfs_test_runner:
+    modules:
+      - tasks.stretch_cluster
\ No newline at end of file
index 4486e554ed587cb8d561d975c14a0cebc45f3d01..0fed7f53666264a5718685c41d38f522fc0da68c 100644 (file)
@@ -3213,6 +3213,14 @@ class CephManager:
         j = json.loads(out)
         return j['quorum']
 
+    def get_mon_quorum_names(self):
+        """
+        Extract monitor quorum names from the cluster
+        """
+        out = self.raw_cluster_cmd('quorum_status')
+        j = json.loads(out)
+        return j['quorum_names']
+
     def wait_for_mon_quorum_size(self, size, timeout=300):
         """
         Loop until quorum size is reached.
index 8347b890629494bb983e56562bc45efaaba3f039..add5b23162d5e2292ae499417406acccbfa09986 100644 (file)
@@ -361,13 +361,18 @@ class CephTestCase(unittest.TestCase, RunCephCmd):
                     else:
                         break
                 if success_time_elapsed == success_hold_time:
+<<<<<<< HEAD
                     log.debug("wait_until_true_and_hold: success for {0}s".format(success_hold_time))
+=======
+                    log.debug("wait_until_true: success for {0}s".format(success_hold_time))
+>>>>>>> b8b8b268706 (qa/suites/rados/singleton/all: init mon-stretch-pool.yaml)
                     return
             else:
                 if elapsed >= timeout:
                     if check_fn and check_fn() and retry_count < 5:
                         elapsed = 0
                         retry_count += 1
+<<<<<<< HEAD
                         log.debug("wait_until_true_and_hold: making progress, waiting (timeout={0} retry_count={1})...".format(timeout, retry_count))
                     else:
                         raise TestTimeoutError("Timed out after {0}s and {1} retries".format(elapsed, retry_count))
@@ -375,3 +380,12 @@ class CephTestCase(unittest.TestCase, RunCephCmd):
                     log.debug("wait_until_true_and_hold waiting (timeout={0} retry_count={1})...".format(timeout, retry_count))
                 time.sleep(period)
                 elapsed += period
+=======
+                        log.debug("wait_until_true: making progress, waiting (timeout={0} retry_count={1})...".format(timeout, retry_count))
+                    else:
+                        raise TestTimeoutError("Timed out after {0}s and {1} retries".format(elapsed, retry_count))
+                else:
+                    log.debug("wait_until_true: waiting (timeout={0} retry_count={1})...".format(timeout, retry_count))
+                time.sleep(period)
+                elapsed += period
+>>>>>>> b8b8b268706 (qa/suites/rados/singleton/all: init mon-stretch-pool.yaml)
diff --git a/qa/tasks/stretch_cluster.py b/qa/tasks/stretch_cluster.py
new file mode 100644 (file)
index 0000000..48acf00
--- /dev/null
@@ -0,0 +1,695 @@
+import json
+import logging
+import random
+from tasks.mgr.mgr_test_case import MgrTestCase
+from time import sleep
+
+log = logging.getLogger(__name__)
+
+
+class TestStretchCluster(MgrTestCase):
+    """
+    Test the stretch cluster feature.
+    """
+    # Define some constants
+    POOL = 'pool_stretch'
+    CLUSTER = "ceph"
+    WRITE_PERIOD = 10
+    RECOVERY_PERIOD = WRITE_PERIOD * 6
+    SUCCESS_HOLD_TIME = 7
+    # This dictionary maps the datacenter to the osd ids and hosts
+    DC_OSDS = {
+        'dc1': {
+            "node-1": 0,
+            "node-2": 1,
+            "node-3": 2,
+        },
+        'dc2': {
+            "node-4": 3,
+            "node-5": 4,
+            "node-6": 5,
+        },
+        'dc3': {
+            "node-7": 6,
+            "node-8": 7,
+            "node-9": 8,
+        }
+    }
+
+    # This dictionary maps the datacenter to the mon ids and hosts
+    DC_MONS = {
+        'dc1': {
+            "node-1": 'a',
+            "node-2": 'b',
+            "node-3": 'c',
+        },
+        'dc2': {
+            "node-4": 'd',
+            "node-5": 'e',
+            "node-6": 'f',
+        },
+        'dc3': {
+            "node-7": 'g',
+            "node-8": 'h',
+            "node-9": 'i',
+        }
+    }
+    PEERING_CRUSH_BUCKET_COUNT = 2
+    PEERING_CRUSH_BUCKET_TARGET = 3
+    PEERING_CRUSH_BUCKET_BARRIER = 'datacenter'
+    CRUSH_RULE = 'replicated_rule_custom'
+    SIZE = 6
+    MIN_SIZE = 3
+    BUCKET_MAX = SIZE // PEERING_CRUSH_BUCKET_TARGET
+    if (BUCKET_MAX * PEERING_CRUSH_BUCKET_TARGET) < SIZE:
+        BUCKET_MAX += 1
+
+    def setUp(self):
+        """
+        Setup the cluster and
+        ensure we have a clean condition before the test.
+        """
+        # Ensure we have at least 6 OSDs
+        super(TestStretchCluster, self).setUp()
+        if self._osd_count() < 6:
+            self.skipTest("Not enough OSDS!")
+
+        # Remove any filesystems so that we can remove their pools
+        if self.mds_cluster:
+            self.mds_cluster.mds_stop()
+            self.mds_cluster.mds_fail()
+            self.mds_cluster.delete_all_filesystems()
+
+        # Remove all other pools
+        for pool in self.mgr_cluster.mon_manager.get_osd_dump_json()['pools']:
+            self.mgr_cluster.mon_manager.remove_pool(pool['pool_name'])
+
+    def tearDown(self):
+        """
+        Clean up the cluster after the test.
+        """
+        # Remove the pool
+        if self.POOL in self.mgr_cluster.mon_manager.pools:
+            self.mgr_cluster.mon_manager.remove_pool(self.POOL)
+
+        osd_map = self.mgr_cluster.mon_manager.get_osd_dump_json()
+        for osd in osd_map['osds']:
+            # mark all the osds in
+            if osd['weight'] == 0.0:
+                self.mgr_cluster.mon_manager.raw_cluster_cmd(
+                    'osd', 'in', str(osd['osd']))
+            # Bring back all the osds and move it back to the host.
+            if osd['up'] == 0:
+                self._bring_back_osd(osd['osd'])
+                self._move_osd_back_to_host(osd['osd'])
+
+        # Bring back all the MONS
+        mons = self._get_all_mons_from_all_dc()
+        for mon in mons:
+            self._bring_back_mon(mon)
+        super(TestStretchCluster, self).tearDown()
+
+    def _setup_pool(self, size=None, min_size=None, rule=None):
+        """
+        Create a pool and set its size.
+        """
+        self.mgr_cluster.mon_manager.create_pool(self.POOL, min_size=min_size)
+        if size is not None:
+            self.mgr_cluster.mon_manager.raw_cluster_cmd(
+                'osd', 'pool', 'set', self.POOL, 'size', str(size))
+        if rule is not None:
+            self.mgr_cluster.mon_manager.raw_cluster_cmd(
+                'osd', 'pool', 'set', self.POOL, 'crush_rule', rule)
+
+    def _osd_count(self):
+        """
+        Get the number of OSDs in the cluster.
+        """
+        osd_map = self.mgr_cluster.mon_manager.get_osd_dump_json()
+        return len(osd_map['osds'])
+
+    def _write_some_data(self, t):
+        """
+        Write some data to the pool to simulate a workload.
+        """
+
+        args = [
+            "rados", "-p", self.POOL, "bench", str(t), "write", "-t", "16"]
+
+        self.mgr_cluster.admin_remote.run(args=args, wait=True)
+
+    def _get_pg_stats(self):
+        """
+        Dump the cluster and get pg stats
+        """
+        out = self.mgr_cluster.mon_manager.raw_cluster_cmd(
+                'pg', 'dump', '--format=json')
+        j = json.loads('\n'.join(out.split('\n')[1:]))
+        try:
+            return j['pg_map']['pg_stats']
+        except KeyError:
+            return j['pg_stats']
+
+    def _get_active_pg(self, pgs):
+        """
+        Get the number of active PGs
+        """
+        num_active = 0
+        for pg in pgs:
+            if pg['state'].count('active') and not pg['state'].count('stale'):
+                num_active += 1
+        return num_active
+
+    def _get_active_clean_pg(self, pgs):
+        """
+        Get the number of active+clean PGs
+        """
+        num_active_clean = 0
+        for pg in pgs:
+            if (pg['state'].count('active') and
+                pg['state'].count('clean') and
+                    not pg['state'].count('stale')):
+                num_active_clean += 1
+        return num_active_clean
+
+    def _get_acting_set(self, pgs):
+        """
+        Get the acting set of the PGs
+        """
+        acting_set = []
+        for pg in pgs:
+            acting_set.append(pg['acting'])
+        return acting_set
+
+    def _surviving_osds_in_acting_set_dont_exceed(self, n, osds):
+        """
+        Check if the acting set of the PGs doesn't contain more
+        than n OSDs of the surviving DC.
+        NOTE: Only call this function after we set the pool to stretch.
+        """
+        pgs = self._get_pg_stats()
+        acting_set = self._get_acting_set(pgs)
+        for acting in acting_set:
+            log.debug("Acting set: %s", acting)
+            intersect = list(set(acting) & set(osds))
+            if len(intersect) > n:
+                log.error(
+                    "Acting set: %s contains more than %d \
+                    OSDs from the same %s which are: %s",
+                    acting, n, self.PEERING_CRUSH_BUCKET_BARRIER,
+                    intersect
+                )
+                return False
+        return True
+
+    def _print_not_active_clean_pg(self, pgs):
+        """
+        Print the PGs that are not active+clean.
+        """
+        for pg in pgs:
+            if not (pg['state'].count('active') and
+                    pg['state'].count('clean') and
+                    not pg['state'].count('stale')):
+                log.debug(
+                    "PG %s is not active+clean, but %s",
+                    pg['pgid'], pg['state']
+                )
+
+    def _print_not_active_pg(self, pgs):
+        """
+        Print the PGs that are not active.
+        """
+        for pg in pgs:
+            if not (pg['state'].count('active') and
+                    not pg['state'].count('stale')):
+                log.debug(
+                    "PG %s is not active, but %s",
+                    pg['pgid'], pg['state']
+                )
+
+    def _pg_all_active_clean(self):
+        """
+        Check if all pgs are active and clean.
+        """
+        pgs = self._get_pg_stats()
+        result = self._get_active_clean_pg(pgs) == len(pgs)
+        if result:
+            log.debug("All PGs are active+clean")
+        else:
+            log.debug("Not all PGs are active+clean")
+            self._print_not_active_clean_pg(pgs)
+        return result
+
+    def _pg_all_active(self):
+        """
+        Check if all pgs are active.
+        """
+        pgs = self._get_pg_stats()
+        result = self._get_active_pg(pgs) == len(pgs)
+        if result:
+            log.debug("All PGs are active")
+        else:
+            log.debug("Not all PGs are active")
+            self._print_not_active_pg(pgs)
+        return result
+
+    def _pg_all_unavailable(self):
+        """
+        Check if all pgs are unavailable.
+        """
+        pgs = self._get_pg_stats()
+        return self._get_active_pg(pgs) == 0
+
+    def _pg_partial_active(self):
+        """
+        Check if some pgs are active.
+        """
+        pgs = self._get_pg_stats()
+        return 0 < self._get_active_pg(pgs) <= len(pgs)
+
+    def _kill_osd(self, osd):
+        """
+        Kill the osd.
+        """
+        try:
+            self.ctx.daemons.get_daemon('osd', osd, self.CLUSTER).stop()
+        except Exception:
+            log.error("Failed to stop osd.{}".format(str(osd)))
+            pass
+
+    def _get_osds_by_dc(self, dc):
+        """
+        Get osds by datacenter.
+        """
+        return [osd for _, osd in self.DC_OSDS[dc].items()]
+
+    def _get_all_osds_from_all_dc(self):
+        """
+        Get all osds from all datacenters.
+        """
+        return [osd for nodes in self.DC_OSDS.values()
+                for osd in nodes.values()]
+
+    def _get_osds_data(self, want_osds):
+        """
+        Get the osd data
+        """
+        all_osds_data = \
+            self.mgr_cluster.mon_manager.get_osd_dump_json()['osds']
+        return [
+            osd_data for osd_data in all_osds_data
+            if int(osd_data['osd']) in want_osds
+        ]
+
+    def _get_host(self, osd):
+        """
+        Get the host of the osd.
+        """
+        for dc, nodes in self.DC_OSDS.items():
+            for node, osd_id in nodes.items():
+                if osd_id == osd:
+                    return node
+        return None
+
+    def _move_osd_back_to_host(self, osd):
+        """
+        Move the osd back to the host.
+        """
+        host = self._get_host(osd)
+        assert host is not None, "The host of osd {} is not found.".format(osd)
+        log.debug("Moving osd.%d back to %s", osd, host)
+        self.mgr_cluster.mon_manager.raw_cluster_cmd(
+            'osd', 'crush', 'move', 'osd.{}'.format(str(osd)),
+            'host={}'.format(host)
+        )
+
+    def _bring_back_osd(self, osd):
+        """
+        Bring back the osd.
+        """
+        try:
+            self.ctx.daemons.get_daemon('osd', osd, self.CLUSTER).restart()
+        except Exception:
+            log.error("Failed to bring back osd.{}".format(str(osd)))
+            pass
+
+    def _bring_back_all_osds_in_dc(self, dc):
+        """
+        Bring back all osds in the specified <datacenter>
+        """
+        if not isinstance(dc, str):
+            raise ValueError("dc must be a string")
+        if dc not in self.DC_OSDS:
+            raise ValueError("dc must be one of the following: %s" %
+                             self.DC_OSDS.keys())
+        log.debug("Bringing back %s", dc)
+        osds = self._get_osds_by_dc(dc)
+        # Bring back all the osds in the DC and move it back to the host.
+        for osd_id in osds:
+            # Bring back the osd
+            self._bring_back_osd(osd_id)
+            # Wait until the osd is up since we need it to be up before we can
+            # move it back to the host
+            self.wait_until_true(
+                lambda: all([int(osd['up']) == 1
+                            for osd in self._get_osds_data([osd_id])]),
+                timeout=self.RECOVERY_PERIOD
+            )
+            # Move the osd back to the host
+            self._move_osd_back_to_host(osd_id)
+
+    def _fail_over_all_osds_in_dc(self, dc):
+        """
+        Fail over all osds in specified <datacenter>
+        """
+        if not isinstance(dc, str):
+            raise ValueError("dc must be a string")
+        if dc not in self.DC_OSDS:
+            raise ValueError(
+                "dc must be one of the following: %s" % self.DC_OSDS.keys()
+                )
+        log.debug("Failing over %s", dc)
+        osds = self._get_osds_by_dc(dc)
+        # fail over all the OSDs in the DC
+        for osd_id in osds:
+            self._kill_osd(osd_id)
+        # wait until all the osds are down
+        self.wait_until_true(
+            lambda: all([int(osd['up']) == 0
+                        for osd in self._get_osds_data(osds)]),
+            timeout=self.RECOVERY_PERIOD
+        )
+
+    def _fail_over_one_osd_from_dc(self, dc):
+        """
+        Fail over one random OSD from the specified <datacenter>
+        """
+        if not isinstance(dc, str):
+            raise ValueError("dc must be a string")
+        if dc not in self.DC_OSDS:
+            raise ValueError("dc must be one of the following: %s" %
+                             self.DC_OSDS.keys())
+        log.debug("Failing over one random OSD from %s", dc)
+        # filter out failed osds
+        osds_data = self._get_osds_data(self._get_osds_by_dc(dc))
+        osds = [int(osd['osd']) for osd in osds_data if int(osd['up']) == 1]
+        # fail over one random OSD in the DC
+        osd_id = random.choice(osds)
+        self._kill_osd(osd_id)
+        # wait until the osd is down
+        self.wait_until_true(
+            lambda: int(self._get_osds_data([osd_id])[0]['up']) == 0,
+            timeout=self.RECOVERY_PERIOD
+        )
+
+    def _fail_over_one_mon_from_dc(self, dc, no_wait=False):
+        """
+        Fail over one random mon from the specified <datacenter>
+        no_wait: if True, don't wait for the mon to be out of quorum
+        """
+        if not isinstance(dc, str):
+            raise ValueError("dc must be a string")
+        if dc not in self.DC_MONS:
+            raise ValueError("dc must be one of the following: %s" %
+                             ", ".join(self.DC_MONS.keys()))
+        log.debug("Failing over one random mon from %s", dc)
+        mons = self._get_mons_by_dc(dc)
+        # filter out failed mons
+        mon_quorum = self.mgr_cluster.mon_manager.get_mon_quorum_names()
+        mons = [mon for mon in mons if mon in mon_quorum]
+        # fail over one random mon in the DC
+        mon = random.choice(mons)
+        self._kill_mon(mon)
+        if no_wait:
+            return
+        else:
+            # wait until the mon is out of quorum
+            self.wait_until_true(
+                lambda: self._check_mons_out_of_quorum([mon]),
+                timeout=self.RECOVERY_PERIOD
+            )
+
+    def _fail_over_all_mons_in_dc(self, dc):
+        """
+        Fail over all mons in the specified <datacenter>
+        """
+        if not isinstance(dc, str):
+            raise ValueError("dc must be a string")
+        if dc not in self.DC_MONS:
+            raise ValueError("dc must be one of the following: %s" %
+                             ", ".join(self.DC_MONS.keys()))
+        log.debug("Failing over %s", dc)
+        mons = self._get_mons_by_dc(dc)
+        for mon in mons:
+            self._kill_mon(mon)
+        # wait until all the mons are out of quorum
+        self.wait_until_true(
+            lambda: self._check_mons_out_of_quorum(mons),
+            timeout=self.RECOVERY_PERIOD
+        )
+
+    def _kill_mon(self, mon):
+        """
+        Kill the mon.
+        """
+        try:
+            self.ctx.daemons.get_daemon('mon', mon, self.CLUSTER).stop()
+        except Exception:
+            log.error("Failed to stop mon.{}".format(str(mon)))
+            pass
+
+    def _get_mons_by_dc(self, dc):
+        """
+        Get mons by datacenter.
+        """
+        return [mon for _, mon in self.DC_MONS[dc].items()]
+
+    def _get_all_mons_from_all_dc(self):
+        """
+        Get all mons from all datacenters.
+        """
+        return [mon for nodes in self.DC_MONS.values()
+                for mon in nodes.values()]
+
+    def _check_mons_out_of_quorum(self, want_mons):
+        """
+        Check if the mons are not in quorum.
+        """
+        quorum_names = self.mgr_cluster.mon_manager.get_mon_quorum_names()
+        return all([mon not in quorum_names for mon in want_mons])
+
+    def _check_mons_in_quorum(self, want_mons):
+        """
+        Check if the mons are in quorum.
+        """
+        quorum_names = self.mgr_cluster.mon_manager.get_mon_quorum_names()
+        return all([mon in quorum_names for mon in want_mons])
+
+    def _check_mon_quorum_size(self, size):
+        """
+        Check if the mon quorum size is equal to <size>
+        """
+        return len(self.mgr_cluster.mon_manager.get_mon_quorum_names()) == size
+
+    def _bring_back_mon(self, mon):
+        """
+        Bring back the mon.
+        """
+        try:
+            self.ctx.daemons.get_daemon('mon', mon, self.CLUSTER).restart()
+        except Exception:
+            log.error("Failed to bring back mon.{}".format(str(mon)))
+            pass
+
+    def _bring_back_all_mons_in_dc(self, dc):
+        """
+        Bring back all mons in the specified <datacenter>
+        """
+        if not isinstance(dc, str):
+            raise ValueError("dc must be a string")
+        if dc not in self.DC_MONS:
+            raise ValueError("dc must be one of the following: %s" %
+                             ", ".join(self.DC_MONS.keys()))
+        log.debug("Bringing back %s", dc)
+        mons = self._get_mons_by_dc(dc)
+        for mon in mons:
+            self._bring_back_mon(mon)
+        # wait until all the mons are up
+        self.wait_until_true(
+            lambda: self._check_mons_in_quorum(mons),
+            timeout=self.RECOVERY_PERIOD
+        )
+
+    def _no_reply_to_mon_command(self):
+        """
+        Check if the cluster is inaccessible.
+        """
+        try:
+            self.mgr_cluster.mon_manager.raw_cluster_cmd('status')
+            return False
+        except Exception:
+            return True
+
+    def test_mon_failures_in_stretch_pool(self):
+        """
+        Test mon failures in stretch pool.
+        """
+        self._setup_pool(
+            self.SIZE,
+            min_size=self.MIN_SIZE,
+            rule=self.CRUSH_RULE
+        )
+        self._write_some_data(self.WRITE_PERIOD)
+        # Set the pool to stretch
+        self.mgr_cluster.mon_manager.raw_cluster_cmd(
+            'osd', 'pool', 'stretch', 'set',
+            self.POOL, str(self.PEERING_CRUSH_BUCKET_COUNT),
+            str(self.PEERING_CRUSH_BUCKET_TARGET),
+            self.PEERING_CRUSH_BUCKET_BARRIER,
+            self.CRUSH_RULE, str(self.SIZE), str(self.MIN_SIZE))
+
+        # SCENARIO 1: MONS in DC1 down
+
+        # Fail over mons in DC1
+        self._fail_over_all_mons_in_dc('dc1')
+        # Expects mons in DC2 and DC3 to be in quorum
+        mons_dc2_dc3 = (
+            self._get_mons_by_dc('dc2') +
+            self._get_mons_by_dc('dc3')
+        )
+        self.wait_until_true_and_hold(
+            lambda: self._check_mons_in_quorum(mons_dc2_dc3),
+            timeout=self.RECOVERY_PERIOD,
+            success_hold_time=self.SUCCESS_HOLD_TIME
+        )
+
+        # SCENARIO 2: MONS in DC1 down + 1 MON in DC2 down
+
+        # Fail over 1 random MON from DC2
+        self._fail_over_one_mon_from_dc('dc2')
+        # Expects quorum size to be 5
+        self.wait_until_true_and_hold(
+            lambda: self._check_mon_quorum_size(5),
+            timeout=self.RECOVERY_PERIOD,
+            success_hold_time=self.SUCCESS_HOLD_TIME
+        )
+
+        # SCENARIO 3: MONS in DC1 down + 2 MONS in DC2 down
+
+        # Fail over 1 random MON from DC2
+        self._fail_over_one_mon_from_dc('dc2', no_wait=True)
+        # sleep for 30 seconds to allow the mon to be out of quorum
+        sleep(30)
+        # Expects cluster to be inaccesible
+        self.wait_until_true(
+            lambda: self._no_reply_to_mon_command(),
+            timeout=self.RECOVERY_PERIOD,
+        )
+        # Bring back all mons in DC2 to unblock the cluster
+        self._bring_back_all_mons_in_dc('dc2')
+        # Expects mons in DC2 and DC3 to be in quorum
+        self.wait_until_true_and_hold(
+            lambda: self._check_mons_in_quorum(mons_dc2_dc3),
+            timeout=self.RECOVERY_PERIOD,
+            success_hold_time=self.SUCCESS_HOLD_TIME
+        )
+
+    def test_set_stretch_pool_no_active_pgs(self):
+        """
+        Test setting a pool to stretch cluster and checks whether
+        it prevents PGs from the going active when there is not
+        enough buckets available in the acting set of PGs to
+        go active.
+        """
+        self._setup_pool(
+            self.SIZE,
+            min_size=self.MIN_SIZE,
+            rule=self.CRUSH_RULE
+        )
+        self._write_some_data(self.WRITE_PERIOD)
+        # 1. We test the case where we didn't make the pool stretch
+        #   and we expect the PGs to go active even when there is only
+        #   one bucket available in the acting set of PGs.
+
+        # Fail over osds in DC1 expects PGs to be 100% active
+        self._fail_over_all_osds_in_dc('dc1')
+        self.wait_until_true_and_hold(
+            lambda: self._pg_all_active(),
+            timeout=self.RECOVERY_PERIOD,
+            success_hold_time=self.SUCCESS_HOLD_TIME
+        )
+        # Fail over osds in DC2 expects PGs to be partially active
+        self._fail_over_all_osds_in_dc('dc2')
+        self.wait_until_true_and_hold(
+            lambda: self._pg_partial_active,
+            timeout=self.RECOVERY_PERIOD,
+            success_hold_time=self.SUCCESS_HOLD_TIME
+        )
+
+        # Bring back osds in DC1 expects PGs to be 100% active
+        self._bring_back_all_osds_in_dc('dc1')
+        self.wait_until_true_and_hold(
+            lambda: self._pg_all_active(),
+            timeout=self.RECOVERY_PERIOD,
+            success_hold_time=self.SUCCESS_HOLD_TIME
+        )
+        # Bring back osds DC2 expects PGs to be 100% active+clean
+        self._bring_back_all_osds_in_dc('dc2')
+        self.wait_until_true_and_hold(
+            lambda: self._pg_all_active_clean(),
+            timeout=self.RECOVERY_PERIOD,
+            success_hold_time=self.SUCCESS_HOLD_TIME
+        )
+        # 2. We test the case where we make the pool stretch
+        #   and we expect the PGs to not go active even when there is only
+        #   one bucket available in the acting set of PGs.
+
+        # Set the pool to stretch
+        self.mgr_cluster.mon_manager.raw_cluster_cmd(
+            'osd', 'pool', 'stretch', 'set',
+            self.POOL, str(self.PEERING_CRUSH_BUCKET_COUNT),
+            str(self.PEERING_CRUSH_BUCKET_TARGET),
+            self.PEERING_CRUSH_BUCKET_BARRIER,
+            self.CRUSH_RULE, str(self.SIZE), str(self.MIN_SIZE))
+
+        # Fail over osds in DC1 expects PGs to be 100% active
+        self._fail_over_all_osds_in_dc('dc1')
+        self.wait_until_true_and_hold(lambda: self._pg_all_active(),
+                                      timeout=self.RECOVERY_PERIOD,
+                                      success_hold_time=self.SUCCESS_HOLD_TIME)
+
+        # Fail over 1 random OSD from DC2 expects PGs to be 100% active
+        self._fail_over_one_osd_from_dc('dc2')
+        self.wait_until_true_and_hold(lambda: self._pg_all_active(),
+                                      timeout=self.RECOVERY_PERIOD,
+                                      success_hold_time=self.SUCCESS_HOLD_TIME)
+
+        # Fail over osds in DC2 completely expects PGs to be 100% inactive
+        self._fail_over_all_osds_in_dc('dc2')
+        self.wait_until_true_and_hold(lambda: self._pg_all_unavailable,
+                                      timeout=self.RECOVERY_PERIOD,
+                                      success_hold_time=self.SUCCESS_HOLD_TIME)
+
+        # We expect that there will be no more than BUCKET_MAX osds from DC3
+        # in the acting set of the PGs.
+        self.wait_until_true(
+            lambda: self._surviving_osds_in_acting_set_dont_exceed(
+                        self.BUCKET_MAX,
+                        self._get_osds_by_dc('dc3')
+                    ),
+            timeout=self.RECOVERY_PERIOD)
+
+        # Bring back osds in DC1 expects PGs to be 100% active
+        self._bring_back_all_osds_in_dc('dc1')
+        self.wait_until_true_and_hold(
+            lambda: self._pg_all_active(),
+            timeout=self.RECOVERY_PERIOD,
+            success_hold_time=self.SUCCESS_HOLD_TIME)
+
+        # Bring back osds iin DC2 expects PGs to be 100% active+clean
+        self._bring_back_all_osds_in_dc('dc2')
+        self.wait_until_true_and_hold(
+            lambda: self._pg_all_active_clean(),
+            timeout=self.RECOVERY_PERIOD,
+            success_hold_time=self.SUCCESS_HOLD_TIME
+        )
diff --git a/qa/workunits/mon/mon-stretch-pool.sh b/qa/workunits/mon/mon-stretch-pool.sh
new file mode 100755 (executable)
index 0000000..2c62082
--- /dev/null
@@ -0,0 +1,148 @@
+#!/bin/bash -ex
+
+# A CLI test for ceph osd pool stretch set and ceph osd pool stretch show.
+# Sets up the cluster with 3 datacenters and 3 hosts in each datacenter
+
+NUM_OSDS_UP=$(ceph osd df | grep "up" | wc -l)
+
+if [ $NUM_OSDS_UP -lt 6 ]; then
+    echo "test requires at least 6 OSDs up and running"
+    exit 1
+fi
+
+function expect_false()
+{
+  # expect the command to return false
+       if "$@"; then return 1; else return 0; fi
+}
+
+function expect_true()
+{
+    # expect the command to return true
+    if "$@"; then return 0; else return 1; fi
+}
+
+function teardown()
+{
+    # cleanup
+    for pool in `ceph osd pool ls`
+    do
+      ceph osd pool rm $pool $pool --yes-i-really-really-mean-it
+    done
+}
+
+for dc in dc1 dc2 dc3
+    do
+      ceph osd crush add-bucket $dc datacenter
+      ceph osd crush move $dc root=default
+    done
+
+ceph osd crush add-bucket node-1 host
+ceph osd crush add-bucket node-2 host
+ceph osd crush add-bucket node-3 host
+ceph osd crush add-bucket node-4 host
+ceph osd crush add-bucket node-5 host
+ceph osd crush add-bucket node-6 host
+ceph osd crush add-bucket node-7 host
+ceph osd crush add-bucket node-8 host
+ceph osd crush add-bucket node-9 host
+
+ceph osd crush move node-1 datacenter=dc1
+ceph osd crush move node-2 datacenter=dc1
+ceph osd crush move node-3 datacenter=dc1
+ceph osd crush move node-4 datacenter=dc2
+ceph osd crush move node-5 datacenter=dc2
+ceph osd crush move node-6 datacenter=dc2
+ceph osd crush move node-7 datacenter=dc3
+ceph osd crush move node-8 datacenter=dc3
+ceph osd crush move node-9 datacenter=dc3
+
+ceph osd crush move osd.0 host=node-1
+ceph osd crush move osd.1 host=node-2
+ceph osd crush move osd.2 host=node-3
+ceph osd crush move osd.3 host=node-4
+ceph osd crush move osd.4 host=node-5
+ceph osd crush move osd.5 host=node-6
+ceph osd crush move osd.6 host=node-7
+ceph osd crush move osd.7 host=node-8
+ceph osd crush move osd.8 host=node-9
+
+ceph mon set_location a datacenter=dc1 host=node-1
+ceph mon set_location b datacenter=dc1 host=node-2
+ceph mon set_location c datacenter=dc1 host=node-3
+ceph mon set_location d datacenter=dc2 host=node-4
+ceph mon set_location e datacenter=dc2 host=node-5
+ceph mon set_location f datacenter=dc2 host=node-6
+ceph mon set_location g datacenter=dc3 host=node-7
+ceph mon set_location h datacenter=dc3 host=node-8
+ceph mon set_location i datacenter=dc3 host=node-9
+
+
+TEST_POOL_STRETCH=pool_stretch
+TEST_CRUSH_RULE=replicated_rule_custom
+
+# Non existence pool should return error
+expect_false ceph osd pool stretch show $TEST_POOL_STRETCH
+
+ceph osd pool create $TEST_POOL_STRETCH 1
+
+# pool must be a stretch pool for this command to show anything.
+expect_false ceph osd pool stretch show $TEST_POOL_STRETCH
+
+# All Argument must present
+expect_false ceph osd pool stretch set $TEST_POOL_STRETCH 2 3 datacenter $TEST_CRUSH_RULE
+# Non existence pool should return error
+expect_false ceph osd pool stretch set non_exist_pool 2 3 datacenter $TEST_CRUSH_RULE 6 3
+# Non existence barrier should return appropriate error
+expect_false ceph osd pool stretch set $TEST_POOL_STRETCH 2 3 non_exist_barrier $TEST_CRUSH_RULE 6 3
+# Non existence crush_rule should return appropriate error
+expect_false ceph osd pool stretch set $TEST_POOL_STRETCH 2 3 datacenter $TEST_CRUSH_RULE 6 3
+# Unsetting a non existence pool should return error
+expect_false ceph osd pool stretch unset non_exist_pool
+# Unsetting a non-stretch pool should return error
+expect_false ceph osd pool stretch unset $TEST_POOL_STRETCH
+
+# Create a custom crush rule
+ceph osd getcrushmap > crushmap
+crushtool --decompile crushmap > crushmap.txt
+sed 's/^# end crush map$//' crushmap.txt > crushmap_modified.txt
+cat >> crushmap_modified.txt << EOF
+rule replicated_rule_custom {
+        id 1
+        type replicated
+        step take default
+        step choose firstn 3 type datacenter
+        step chooseleaf firstn 2 type host
+        step emit
+}
+# end crush map
+EOF
+
+# compile the modified crushmap and set it
+crushtool --compile crushmap_modified.txt -o crushmap.bin
+ceph osd setcrushmap -i crushmap.bin
+
+# Set the election strategy to connectivity
+ceph mon set election_strategy connectivity
+
+# peer_crush_bucket_count > 3 datacenters throws Error EPERM
+expect_false ceph osd pool stretch set $TEST_POOL_STRETCH 4 3 datacenter $TEST_CRUSH_RULE 6 3
+
+# peer_crush_bucket_target > 3 datacenters throws Error EPERM
+expect_false ceph osd pool stretch set $TEST_POOL_STRETCH 2 4 datacenter $TEST_CRUSH_RULE 6 3
+
+# peer_crush_bucket_target > 3 datacenters success when add --yes-i-really-mean-it flag
+expect_true ceph osd pool stretch set $TEST_POOL_STRETCH 2 4 datacenter $TEST_CRUSH_RULE 6 3 --yes-i-really-mean-it
+
+# pool must be a stretch pool for this command to show anything.
+expect_true ceph osd pool stretch set $TEST_POOL_STRETCH 2 3 datacenter $TEST_CRUSH_RULE 6 3
+expect_true ceph osd pool stretch show $TEST_POOL_STRETCH
+
+# Unset the stretch pool and expects it to work
+expect_true ceph osd pool stretch unset $TEST_POOL_STRETCH
+# try to show the stretch pool values again, should return error since
+# the pool is not a stretch pool anymore.
+expect_false ceph osd pool stretch show $TEST_POOL_STRETCH
+
+# cleanup
+teardown
\ No newline at end of file