log = logging.getLogger(__name__)
+
def get_ip_and_ports(ctx, daemon):
+ """
+ Get the IP and port list for the <daemon>.
+ """
assert daemon.startswith('mon.')
addr = ctx.ceph['ceph'].mons['{a}'.format(a=daemon)]
ips = re.findall("[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+[:[0-9]*]*", addr)
port_list.append(port_str)
return (plain_ip, port_list)
+
def disconnect(ctx, config):
- assert len(config) == 2 # we can only disconnect pairs right now
+ """
+ Disconnect the mons in the <config> list.
+ """
+ assert len(config) == 2 # we can only disconnect pairs right now
# and we can only disconnect mons right now
assert config[0].startswith('mon.')
assert config[1].startswith('mon.')
+ log.info("Disconnecting {a} and {b}".format(a=config[0], b=config[1]))
(ip1, _) = get_ip_and_ports(ctx, config[0])
(ip2, _) = get_ip_and_ports(ctx, config[1])
assert host1 is not None
assert host2 is not None
- host1.run(
- args = ["sudo", "iptables", "-A", "INPUT", "-p", "tcp", "-s",
- ip2, "-j", "DROP"]
- )
- host2.run(
- args = ["sudo", "iptables", "-A", "INPUT", "-p", "tcp", "-s",
- ip1, "-j", "DROP"]
- )
+ host1.run(args=["sudo", "iptables", "-A", "INPUT",
+ "-s", ip2, "-j", "DROP"])
+ host1.run(args=["sudo", "iptables", "-A", "OUTPUT",
+ "-d", ip2, "-j", "DROP"])
+
+ host2.run(args=["sudo", "iptables", "-A", "INPUT",
+ "-s", ip1, "-j", "DROP"])
+ host2.run(args=["sudo", "iptables", "-A", "OUTPUT",
+ "-d", ip1, "-j", "DROP"])
+
def reconnect(ctx, config):
- assert len(config) == 2 # we can only disconnect pairs right now
+ """
+ Reconnect the mons in the <config> list.
+ """
+ assert len(config) == 2 # we can only disconnect pairs right now
# and we can only disconnect mons right now
assert config[0].startswith('mon.')
assert config[1].startswith('mon.')
-
+ log.info("Reconnecting {a} and {b}".format(a=config[0], b=config[1]))
(ip1, _) = get_ip_and_ports(ctx, config[0])
(ip2, _) = get_ip_and_ports(ctx, config[1])
assert host1 is not None
assert host2 is not None
- host1.run(
- args = ["sudo", "iptables", "-D", "INPUT", "-p", "tcp", "-s",
- ip2, "-j", "DROP"]
- )
- host2.run(
- args = ["sudo", "iptables", "-D", "INPUT", "-p", "tcp", "-s",
- ip1, "-j", "DROP"]
- )
+ host1.run(args=["sudo", "iptables", "-D", "INPUT",
+ "-s", ip2, "-j", "DROP"])
+ host1.run(args=["sudo", "iptables", "-D", "OUTPUT",
+ "-d", ip2, "-j", "DROP"])
+
+ host2.run(args=["sudo", "iptables", "-D", "INPUT",
+ "-s", ip1, "-j", "DROP"])
+ host2.run(args=["sudo", "iptables", "-D", "OUTPUT",
+ "-d", ip1, "-j", "DROP"])
--- /dev/null
+from tasks.ceph_test_case import CephTestCase
+import logging
+import json
+from tasks.netsplit import disconnect, reconnect, get_ip_and_ports
+import itertools
+import time
+from io import StringIO
+log = logging.getLogger(__name__)
+
+
+class TestNetSplit(CephTestCase):
+ MON_LIST = ["mon.a", "mon.c", "mon.e"]
+ CLUSTER = "ceph"
+ WRITE_PERIOD = 10
+ READ_PERIOD = 10
+ RECOVERY_PERIOD = WRITE_PERIOD * 6
+ SUCCESS_HOLD_TIME = 10
+ CLIENT = "client.0"
+ POOL = "stretch_pool"
+
+ def setUp(self):
+ """
+ Set up the cluster for the test.
+ """
+ super(TestNetSplit, self).setUp()
+
+ def tearDown(self):
+ """
+ Clean up the cluter after the test.
+ """
+ super(TestNetSplit, self).tearDown()
+
+ def _get_pg_stats(self):
+ """
+ Dump the cluster and get pg stats
+ """
+ (client,) = self.ctx.cluster.only(self.CLIENT).remotes.keys()
+ arg = ['ceph', 'pg', 'dump', '--format=json']
+ proc = client.run(args=arg, wait=True, stdout=StringIO(), timeout=30)
+ if proc.exitstatus != 0:
+ log.error("pg dump failed")
+ raise Exception("pg dump failed")
+ out = proc.stdout.getvalue()
+ j = json.loads('\n'.join(out.split('\n')[1:]))
+ try:
+ return j['pg_map']['pg_stats']
+ except KeyError:
+ return j['pg_stats']
+
+ def _get_active_pg(self, pgs):
+ """
+ Get the number of active PGs
+ """
+ num_active = 0
+ for pg in pgs:
+ if (pg['state'].count('active') and
+ not pg['state'].count('stale') and
+ not pg['state'].count('laggy')):
+ num_active += 1
+ return num_active
+
+ def _print_not_active_clean_pg(self, pgs):
+ """
+ Print the PGs that are not active+clean.
+ """
+ for pg in pgs:
+ if not (pg['state'].count('active') and
+ pg['state'].count('clean') and
+ not pg['state'].count('stale')):
+ log.debug(
+ "PG %s is not active+clean, but %s",
+ pg['pgid'], pg['state']
+ )
+
+ def _print_not_active_pg(self, pgs):
+ """
+ Print the PGs that are not active.
+ """
+ for pg in pgs:
+ if not (pg['state'].count('active') and
+ not pg['state'].count('stale')):
+ log.debug(
+ "PG %s is not active, but %s",
+ pg['pgid'], pg['state']
+ )
+
+ def _pg_all_active(self):
+ """
+ Check if all pgs are active.
+ """
+ pgs = self._get_pg_stats()
+ result = self._get_active_pg(pgs) == len(pgs)
+ if result:
+ log.debug("All PGs are active")
+ else:
+ log.debug("Not all PGs are active")
+ self._print_not_active_pg(pgs)
+ return result
+
+ def _get_active_clean_pg(self, pgs):
+ """
+ Get the number of active+clean PGs
+ """
+ num_active_clean = 0
+ for pg in pgs:
+ if (pg['state'].count('active') and
+ pg['state'].count('clean') and
+ not pg['state'].count('stale') and
+ not pg['state'].count('laggy')):
+ num_active_clean += 1
+ return num_active_clean
+
+ def _pg_all_active_clean(self):
+ """
+ Check if all pgs are active and clean.
+ """
+ pgs = self._get_pg_stats()
+ result = self._get_active_clean_pg(pgs) == len(pgs)
+ if result:
+ log.debug("All PGs are active+clean")
+ else:
+ log.debug("Not all PGs are active+clean")
+ self._print_not_active_clean_pg(pgs)
+ return result
+
+ def _disconnect_mons(self, config):
+ """
+ Disconnect the mons in the <config> list.
+ """
+ disconnect(self.ctx, config)
+
+ def _reconnect_mons(self, config):
+ """
+ Reconnect the mons in the <config> list.
+ """
+ reconnect(self.ctx, config)
+
+ def _reply_to_mon_command(self):
+ """
+ Check if the cluster is accessible.
+ """
+ (client,) = self.ctx.cluster.only(self.CLIENT).remotes.keys()
+ arg = ['ceph', '-s']
+ proc = client.run(args=arg, wait=True, stdout=StringIO(), timeout=30)
+ if proc.exitstatus != 0:
+ log.error("ceph -s failed, cluster is not accessible")
+ return False
+ else:
+ log.info("Cluster is accessible")
+ return True
+
+ def _check_if_disconnect(self, config):
+ """
+ Check if the mons in the <config> list are disconnected.
+ """
+ assert config[0].startswith('mon.')
+ assert config[1].startswith('mon.')
+ log.info("Checking if the {} and {} are disconnected".format(
+ config[0], config[1]))
+ (ip1, _) = get_ip_and_ports(self.ctx, config[0])
+ (ip2, _) = get_ip_and_ports(self.ctx, config[1])
+ (host1,) = self.ctx.cluster.only(config[0]).remotes.keys()
+ (host2,) = self.ctx.cluster.only(config[1]).remotes.keys()
+ assert host1 is not None
+ assert host2 is not None
+ # if the mons are disconnected, the ping should fail (exitstatus = 1)
+ try:
+ if (host1.run(args=["ping", "-c", "1", ip2]).exitstatus == 0 or
+ host2.run(args=["ping", "-c", "1", ip1]).exitstatus == 0):
+ return False
+ except Exception:
+ return True
+
+ def _check_if_connect(self, config):
+ """
+ Check if the mons in the <config> list are connected.
+ """
+ assert config[0].startswith('mon.')
+ assert config[1].startswith('mon.')
+ log.info("Checking if {} and {} are connected".format(
+ config[0], config[1]))
+ (ip1, _) = get_ip_and_ports(self.ctx, config[0])
+ (ip2, _) = get_ip_and_ports(self.ctx, config[1])
+ (host1,) = self.ctx.cluster.only(config[0]).remotes.keys()
+ (host2,) = self.ctx.cluster.only(config[1]).remotes.keys()
+ assert host1 is not None
+ assert host2 is not None
+ # if the mons are connected, the ping should succeed (exitstatus = 0)
+ try:
+ if (host1.run(args=["ping", "-c", "1", ip2]).exitstatus == 0 and
+ host2.run(args=["ping", "-c", "1", ip1]).exitstatus == 0):
+ return True
+ except Exception:
+ return False
+
+ def test_netsplit_dc1_dc2(self):
+ """
+ Test Netsplit between dc1 and dc2
+ """
+ log.info("Running test_mon_netsplit_dc1_dc2")
+ # check if all the mons are connected
+ self.wait_until_true(
+ lambda: all(
+ [
+ self._check_if_connect([mon1, mon2])
+ for mon1, mon2 in itertools.combinations(self.MON_LIST, 2)
+ ]
+ ),
+ timeout=self.RECOVERY_PERIOD,
+ )
+ # Scenario 1: disconnect Site 1 and Site 2
+ # Arbiter node is still connected to both sites
+ config = ["mon.a", "mon.c"]
+ # disconnect the mons
+ self._disconnect_mons(config)
+ # wait for the mons to be disconnected (2 minutes)
+ time.sleep(self.RECOVERY_PERIOD*2)
+ # check if the mons are disconnected
+ self.wait_until_true(
+ lambda: self._check_if_disconnect(config),
+ timeout=self.RECOVERY_PERIOD,
+ )
+ # check the cluster is accessible
+ self.wait_until_true_and_hold(
+ lambda: self._reply_to_mon_command(),
+ timeout=self.RECOVERY_PERIOD * 5,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
+ # see how many PGs are active or inactive
+ start_time = time.time()
+ while time.time() - start_time < self.RECOVERY_PERIOD:
+ self._pg_all_active()
+ time.sleep(1)
+ # get the client from the cluster
+ (client,) = self.ctx.cluster.only(self.CLIENT).remotes.keys()
+ # check if the cluster accepts writes
+ args = [
+ "rados", "-p", self.POOL, "bench", str(self.WRITE_PERIOD), 'write',
+ '-b', '1024', '--no-cleanup'
+ ]
+ try:
+ client.run(args=args, wait=True, timeout=self.WRITE_PERIOD*2)
+ log.info("Write operation successful")
+ except Exception:
+ log.error("Write operation failed")
+ assert False, "Write operation failed"
+ # check if the cluster accepts random reads
+ args = [
+ "rados", "-p", self.POOL, "bench", str(self.READ_PERIOD), 'rand'
+ ]
+ try:
+ client.run(args=args, wait=True, timeout=self.READ_PERIOD*2)
+ log.info("Read operation successful")
+ except Exception:
+ log.error("Read operation failed")
+ assert False, "Read operation failed"
+ # reconnect the mons
+ self._reconnect_mons(config)
+ # wait for the mons to be reconnected
+ time.sleep(self.RECOVERY_PERIOD)
+ # check if the mons are reconnected
+ self.wait_until_true(
+ lambda: self._check_if_connect(config),
+ timeout=self.RECOVERY_PERIOD,
+ )
+ # check if all the PGs are active+clean
+ self.wait_until_true_and_hold(
+ lambda: self._pg_all_active_clean(),
+ timeout=self.RECOVERY_PERIOD * 5,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
+ log.info("test_mon_netsplit_dc1_dc2 passed!")
+
+ def test_netsplit_arbiter_dc1_and_dc1_dc2(self):
+ """
+ Test Netsplit arbiter-dc1, dc1-dc2
+ """
+ # check if all the mons are connected
+ self.wait_until_true(
+ lambda: all(
+ [
+ self._check_if_connect([mon1, mon2])
+ for mon1, mon2 in itertools.combinations(self.MON_LIST, 2)
+ ]
+ ),
+ timeout=self.RECOVERY_PERIOD,
+ )
+ arb_dc1 = ["mon.e", "mon.a"]
+ # disconnect the mons
+ self._disconnect_mons(arb_dc1)
+ # wait for the mons to be disconnected (2 minutes)
+ time.sleep(self.RECOVERY_PERIOD*2)
+ # check if the mons are disconnected
+ self.wait_until_true(
+ lambda: self._check_if_disconnect(arb_dc1),
+ timeout=self.RECOVERY_PERIOD,
+ )
+ dc1_dc2 = ["mon.a", "mon.c"]
+ # disconnect the mons
+ self._disconnect_mons(dc1_dc2)
+ # wait for the mons to be disconnected (2 minutes)
+ time.sleep(self.RECOVERY_PERIOD*2)
+ # check if the mons are disconnected
+ self.wait_until_true(
+ lambda: self._check_if_disconnect(dc1_dc2),
+ timeout=self.RECOVERY_PERIOD,
+ )
+ # check the cluster is accessible
+ self.wait_until_true_and_hold(
+ lambda: self._reply_to_mon_command(),
+ timeout=self.RECOVERY_PERIOD * 5,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
+ # get the client from the cluster
+ (client,) = self.ctx.cluster.only(self.CLIENT).remotes.keys()
+ # check if the cluster accepts writes
+ args = [
+ "rados", "-p", self.POOL, "bench", str(self.WRITE_PERIOD), 'write',
+ '-b', '1024', '--no-cleanup'
+ ]
+ try:
+ client.run(args=args, wait=True, timeout=self.WRITE_PERIOD*2)
+ log.info("Write operation successful")
+ except Exception:
+ log.error("Write operation failed")
+ assert False, "Write operation failed"
+ # check if the cluster accepts random reads
+ args = [
+ "rados", "-p", self.POOL, "bench", str(self.READ_PERIOD), 'rand'
+ ]
+ try:
+ client.run(args=args, wait=True, timeout=self.READ_PERIOD*2)
+ log.info("Read operation successful")
+ except Exception:
+ log.error("Read operation failed")
+ assert False, "Read operation failed"
+ # reconnect the mons
+ self._reconnect_mons(arb_dc1)
+ # wait for the mons to be reconnected
+ time.sleep(self.RECOVERY_PERIOD)
+ # check if the mons are reconnected
+ self.wait_until_true(
+ lambda: self._check_if_connect(arb_dc1),
+ timeout=self.RECOVERY_PERIOD,
+ )
+ # reconnect the mons
+ self._reconnect_mons(dc1_dc2)
+ # wait for the mons to be reconnected
+ time.sleep(self.RECOVERY_PERIOD)
+ # check if the mons are reconnected
+ self.wait_until_true(
+ lambda: self._check_if_connect(dc1_dc2),
+ timeout=self.RECOVERY_PERIOD,
+ )
+ # check if all the PGs are active+clean
+ self.wait_until_true_and_hold(
+ lambda: self._pg_all_active_clean(),
+ timeout=self.RECOVERY_PERIOD * 5,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
+ log.info("test_netsplit_arbiter_dc1_and_dc1_dc2 passed!")
--- /dev/null
+#!/usr/bin/env bash
+
+set -ex
+
+NUM_OSDS_UP=$(ceph osd df | grep "up" | wc -l)
+
+if [ $NUM_OSDS_UP -lt 8 ]; then
+ echo "test requires at least 8 OSDs up and running"
+ exit 1
+fi
+
+ceph mon set election_strategy connectivity
+ceph mon add disallowed_leader e
+
+for dc in dc1 dc2
+ do
+ ceph osd crush add-bucket $dc datacenter
+ ceph osd crush move $dc root=default
+ done
+
+ceph osd crush add-bucket node-2 host
+ceph osd crush add-bucket node-3 host
+ceph osd crush add-bucket node-4 host
+ceph osd crush add-bucket node-5 host
+ceph osd crush add-bucket node-6 host
+ceph osd crush add-bucket node-7 host
+ceph osd crush add-bucket node-8 host
+ceph osd crush add-bucket node-9 host
+
+ceph osd crush move node-2 datacenter=dc1
+ceph osd crush move node-3 datacenter=dc1
+ceph osd crush move node-4 datacenter=dc1
+ceph osd crush move node-5 datacenter=dc1
+
+ceph osd crush move node-6 datacenter=dc2
+ceph osd crush move node-7 datacenter=dc2
+ceph osd crush move node-8 datacenter=dc2
+ceph osd crush move node-9 datacenter=dc2
+
+ceph osd crush move osd.0 host=node-2
+ceph osd crush move osd.1 host=node-3
+ceph osd crush move osd.2 host=node-4
+ceph osd crush move osd.3 host=node-5
+
+ceph osd crush move osd.4 host=node-6
+ceph osd crush move osd.5 host=node-7
+ceph osd crush move osd.6 host=node-8
+ceph osd crush move osd.7 host=node-9
+
+
+ceph mon set_location a datacenter=dc1 host=node-2
+ceph mon set_location b datacenter=dc1 host=node-3
+ceph mon set_location c datacenter=dc2 host=node-6
+ceph mon set_location d datacenter=dc2 host=node-7
+
+hostname=$(hostname -s)
+ceph osd crush remove $hostname || { echo 'command failed' ; exit 1; }
+ceph osd getcrushmap > crushmap || { echo 'command failed' ; exit 1; }
+crushtool --decompile crushmap > crushmap.txt || { echo 'command failed' ; exit 1; }
+sed 's/^# end crush map$//' crushmap.txt > crushmap_modified.txt || { echo 'command failed' ; exit 1; }
+cat >> crushmap_modified.txt << EOF
+rule stretch_rule {
+ id 1
+ type replicated
+ step take dc1
+ step chooseleaf firstn 2 type host
+ step emit
+ step take dc2
+ step chooseleaf firstn 2 type host
+ step emit
+}
+# rule stretch_rule {
+# id 1
+# type replicated
+# step take default
+# step chooseleaf firstn 2 type datacenter
+# step chooseleaf firstn 2 type host
+# step emit
+# }
+# end crush map
+EOF
+
+crushtool --compile crushmap_modified.txt -o crushmap.bin || { echo 'command failed' ; exit 1; }
+ceph osd setcrushmap -i crushmap.bin || { echo 'command failed' ; exit 1; }
+stretched_poolname=stretch_pool
+ceph osd pool create $stretched_poolname 32 32 stretch_rule || { echo 'command failed' ; exit 1; }
+ceph osd pool set $stretched_poolname size 4 || { echo 'command failed' ; exit 1; }
+ceph osd pool application enable $stretched_poolname rados || { echo 'command failed' ; exit 1; }
+ceph mon set_location e datacenter=arbiter host=node-1 || { echo 'command failed' ; exit 1; }
+ceph mon enable_stretch_mode e stretch_rule datacenter || { echo 'command failed' ; exit 1; } # Enter strech mode