--- /dev/null
+from tasks.ceph_test_case import CephTestCase
+import logging
+import json
+from tasks.netsplit import disconnect, reconnect, get_ip_and_ports
+import itertools
+import time
+from io import StringIO
+log = logging.getLogger(__name__)
+
+
+class TestNetSplit(CephTestCase):
+ MON_LIST = ["mon.a", "mon.d", "mon.g"]
+ CLIENT = "client.0"
+ CLUSTER = "ceph"
+ WRITE_PERIOD = 10
+ READ_PERIOD = 10
+ RECOVERY_PERIOD = WRITE_PERIOD * 6
+ SUCCESS_HOLD_TIME = 10
+ PEERING_CRUSH_BUCKET_COUNT = 2
+ PEERING_CRUSH_BUCKET_TARGET = 3
+ PEERING_CRUSH_BUCKET_BARRIER = 'datacenter'
+ POOL = 'pool_stretch'
+ CRUSH_RULE = 'replicated_rule_custom'
+ SIZE = 6
+ MIN_SIZE = 3
+ BUCKET_MAX = SIZE // PEERING_CRUSH_BUCKET_TARGET
+ if (BUCKET_MAX * PEERING_CRUSH_BUCKET_TARGET) < SIZE:
+ BUCKET_MAX += 1
+
+ def setUp(self):
+ """
+ Set up the cluster for the test.
+ """
+ super(TestNetSplit, self).setUp()
+
+ def tearDown(self):
+ """
+ Clean up the cluter after the test.
+ """
+ super(TestNetSplit, self).tearDown()
+
+ def _setup_pool(self, size=None, min_size=None, rule=None):
+ """
+ Create a pool and set its size.
+ """
+ self.mgr_cluster.mon_manager.create_pool(self.POOL, min_size=min_size)
+ if size is not None:
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ 'osd', 'pool', 'set', self.POOL, 'size', str(size))
+ if rule is not None:
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ 'osd', 'pool', 'set', self.POOL, 'crush_rule', rule)
+
+ def _get_pg_stats(self):
+ """
+ Dump the cluster and get pg stats
+ """
+ (client,) = self.ctx.cluster.only(self.CLIENT).remotes.keys()
+ arg = ['ceph', 'pg', 'dump', '--format=json']
+ proc = client.run(args=arg, wait=True, stdout=StringIO(), timeout=30)
+ if proc.exitstatus != 0:
+ log.error("pg dump failed")
+ raise Exception("pg dump failed")
+ out = proc.stdout.getvalue()
+ j = json.loads('\n'.join(out.split('\n')[1:]))
+ try:
+ return j['pg_map']['pg_stats']
+ except KeyError:
+ return j['pg_stats']
+
+ def _get_active_pg(self, pgs):
+ """
+ Get the number of active PGs
+ """
+ num_active = 0
+ for pg in pgs:
+ if pg['state'].count('active') and not pg['state'].count('stale'):
+ num_active += 1
+ return num_active
+
+ def _print_not_active_clean_pg(self, pgs):
+ """
+ Print the PGs that are not active+clean.
+ """
+ for pg in pgs:
+ if not (pg['state'].count('active') and
+ pg['state'].count('clean') and
+ not pg['state'].count('stale')):
+ log.debug(
+ "PG %s is not active+clean, but %s",
+ pg['pgid'], pg['state']
+ )
+
+ def _print_not_active_pg(self, pgs):
+ """
+ Print the PGs that are not active.
+ """
+ for pg in pgs:
+ if not (pg['state'].count('active') and
+ not pg['state'].count('stale')):
+ log.debug(
+ "PG %s is not active, but %s",
+ pg['pgid'], pg['state']
+ )
+
+ def _pg_all_active(self):
+ """
+ Check if all pgs are active.
+ """
+ pgs = self._get_pg_stats()
+ result = self._get_active_pg(pgs) == len(pgs)
+ if result:
+ log.debug("All PGs are active")
+ else:
+ log.debug("Not all PGs are active")
+ self._print_not_active_pg(pgs)
+ return result
+
+ def _get_active_clean_pg(self, pgs):
+ """
+ Get the number of active+clean PGs
+ """
+ num_active_clean = 0
+ for pg in pgs:
+ if (pg['state'].count('active') and
+ pg['state'].count('clean') and
+ not pg['state'].count('stale') and
+ not pg['state'].count('laggy')):
+ num_active_clean += 1
+ return num_active_clean
+
+ def _pg_all_active_clean(self):
+ """
+ Check if all pgs are active and clean.
+ """
+ pgs = self._get_pg_stats()
+ result = self._get_active_clean_pg(pgs) == len(pgs)
+ if result:
+ log.debug("All PGs are active+clean")
+ else:
+ log.debug("Not all PGs are active+clean")
+ self._print_not_active_clean_pg(pgs)
+ return result
+
+ def _disconnect_mons(self, config):
+ """
+ Disconnect the mons in the <config> list.
+ """
+ disconnect(self.ctx, config)
+
+ def _reconnect_mons(self, config):
+ """
+ Reconnect the mons in the <config> list.
+ """
+ reconnect(self.ctx, config)
+
+ def _reply_to_mon_command(self):
+ """
+ Check if the cluster is accessible.
+ """
+ try:
+ self.mgr_cluster.mon_manager.raw_cluster_cmd('status')
+ return True
+ except Exception:
+ return False
+
+ def _check_if_disconnect(self, config):
+ """
+ Check if the mons in the <config> list are disconnected.
+ """
+ assert config[0].startswith('mon.')
+ assert config[1].startswith('mon.')
+ log.info("Checking if the {} and {} are disconnected".format(
+ config[0], config[1]))
+ (ip1, _) = get_ip_and_ports(self.ctx, config[0])
+ (ip2, _) = get_ip_and_ports(self.ctx, config[1])
+ (host1,) = self.ctx.cluster.only(config[0]).remotes.keys()
+ (host2,) = self.ctx.cluster.only(config[1]).remotes.keys()
+ assert host1 is not None
+ assert host2 is not None
+ # if the mons are disconnected, the ping should fail (exitstatus = 1)
+ try:
+ if (host1.run(args=["ping", "-c", "1", ip2]).exitstatus == 0 or
+ host2.run(args=["ping", "-c", "1", ip1]).exitstatus == 0):
+ return False
+ except Exception:
+ return True
+
+ def _check_if_connect(self, config):
+ """
+ Check if the mons in the <config> list are connected.
+ """
+ assert config[0].startswith('mon.')
+ assert config[1].startswith('mon.')
+ log.info("Checking if {} and {} are connected".format(
+ config[0], config[1]))
+ (ip1, _) = get_ip_and_ports(self.ctx, config[0])
+ (ip2, _) = get_ip_and_ports(self.ctx, config[1])
+ (host1,) = self.ctx.cluster.only(config[0]).remotes.keys()
+ (host2,) = self.ctx.cluster.only(config[1]).remotes.keys()
+ assert host1 is not None
+ assert host2 is not None
+ # if the mons are connected, the ping should succeed (exitstatus = 0)
+ try:
+ if (host1.run(args=["ping", "-c", "1", ip2]).exitstatus == 0 and
+ host2.run(args=["ping", "-c", "1", ip1]).exitstatus == 0):
+ return True
+ except Exception:
+ return False
+
+ def test_mon_netsplit(self):
+ """
+ Test the mon netsplit scenario, if cluster is actually accessible.
+ """
+ log.info("Running test_mon_netsplit")
+ self._setup_pool(
+ self.SIZE,
+ min_size=self.MIN_SIZE,
+ rule=self.CRUSH_RULE
+ )
+ # set the pool to stretch
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ 'osd', 'pool', 'stretch', 'set',
+ self.POOL, str(self.PEERING_CRUSH_BUCKET_COUNT),
+ str(self.PEERING_CRUSH_BUCKET_TARGET),
+ self.PEERING_CRUSH_BUCKET_BARRIER,
+ self.CRUSH_RULE, str(self.SIZE), str(self.MIN_SIZE))
+ # check if all the mons are connected
+ self.wait_until_true(
+ lambda: all(
+ [
+ self._check_if_connect([mon1, mon2])
+ for mon1, mon2 in itertools.combinations(self.MON_LIST, 2)
+ ]
+ ),
+ timeout=self.RECOVERY_PERIOD,
+ )
+
+ # wait for all PGs to become active
+ self.wait_until_true_and_hold(
+ lambda: self._pg_all_active(),
+ timeout=self.RECOVERY_PERIOD,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
+
+ # Scenario 1: disconnect Site 1 and Site 2
+ # Site 3 is still connected to both Site 1 and Site 2
+ config = ["mon.a", "mon.d"]
+ # disconnect the mons
+ self._disconnect_mons(config)
+ # wait for the mons to be disconnected
+ time.sleep(self.RECOVERY_PERIOD)
+ # check if the mons are disconnected
+ self.wait_until_true(
+ lambda: self._check_if_disconnect(config),
+ timeout=self.RECOVERY_PERIOD,
+ )
+ # check the cluster is accessible
+ self.wait_until_true_and_hold(
+ lambda: self._reply_to_mon_command(),
+ timeout=self.RECOVERY_PERIOD * 5,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
+ # reconnect the mons
+ self._reconnect_mons(config)
+ # wait for the mons to be reconnected
+ time.sleep(self.RECOVERY_PERIOD)
+ # check if the mons are reconnected
+ self.wait_until_true(
+ lambda: self._check_if_connect(config),
+ timeout=self.RECOVERY_PERIOD,
+ )
+ # wait for the PGs to recover
+ time.sleep(self.RECOVERY_PERIOD)
+ # check if all PGs are active+clean
+ self.wait_until_true_and_hold(
+ lambda: self._pg_all_active_clean(),
+ timeout=self.RECOVERY_PERIOD,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
+ log.info("test_mon_netsplit passed!")