From: Shilpa Jagannath Date: Mon, 12 Oct 2020 12:51:16 +0000 (+0530) Subject: rgw: reshard tests with fault injection X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=9ac9fc01726fa1b49ef7102c3e6d00373838f51a;p=ceph.git rgw: reshard tests with fault injection Signed-off-by: Shilpa Jagannath --- diff --git a/qa/workunits/rgw/test_rgw_reshard.py b/qa/workunits/rgw/test_rgw_reshard.py index 14f4fe5099469..c414713dd6851 100755 --- a/qa/workunits/rgw/test_rgw_reshard.py +++ b/qa/workunits/rgw/test_rgw_reshard.py @@ -5,6 +5,8 @@ import time import subprocess import json import boto3 +from pprint import pprint +import re """ Rgw manual and dynamic resharding testing against a running instance @@ -16,7 +18,7 @@ Rgw manual and dynamic resharding testing against a running instance # # -log.basicConfig(level=log.DEBUG) +log.basicConfig(format = '%(message)s', level=log.DEBUG) log.getLogger('botocore').setLevel(log.CRITICAL) log.getLogger('boto3').setLevel(log.CRITICAL) log.getLogger('urllib3').setLevel(log.CRITICAL) @@ -26,33 +28,35 @@ USER = 'tester' DISPLAY_NAME = 'Testing' ACCESS_KEY = 'NX5QOQKC6BH2IDN8HC7A' SECRET_KEY = 'LnEsqNNqZIpkzauboDcLXLcYaWwLQ3Kop0zAnKIn' -BUCKET_NAME1 = 'myfoo' -BUCKET_NAME2 = 'mybar' +BUCKET_NAME1 = 'a-bucket' +BUCKET_NAME2 = 'b-bucket' +BUCKET_NAME3 = 'c-bucket' +BUCKET_NAME4 = 'd-bucket' +BUCKET_NAME5 = 'e-bucket' VER_BUCKET_NAME = 'myver' - def exec_cmd(cmd): try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) out, err = proc.communicate() + log.info(proc.args) if proc.returncode == 0: - log.info('command succeeded') - if out is not None: log.info(out) + if out is not None: log.debug('{} \n {}'.format(out.decode('utf-8'), err.decode('utf-8'))) return out else: - raise Exception("error: %s \nreturncode: %s" % (err, proc.returncode)) + raise Exception("error: {} \nreturncode: {}".format(err, proc.returncode)) except Exception as e: - log.error('command failed') + log.error('command failed\n') log.error(e) return False def get_radosgw_port(): out = exec_cmd('sudo netstat -nltp | grep radosgw') - log.debug('output: %s' % out) + log.debug('output: {}'.format(out)) x = out.decode('utf8').split(" ") port = [i for i in x if ':' in i][0].split(':')[1] - log.info('radosgw port: %s' % port) + log.info('radosgw port: {}'.format(port)) return port @@ -72,19 +76,20 @@ def get_bucket_stats(bucket_name): """ function to get bucket stats """ - cmd = exec_cmd("radosgw-admin bucket stats --bucket %s" % bucket_name) + cmd = exec_cmd("radosgw-admin bucket stats --bucket {}".format(bucket_name)) json_op = json.loads(cmd) + #print(json.dumps(json_op, indent = 4, sort_keys=True)) bucket_id = json_op['id'] - num_shards_op = json_op['num_shards'] + num_shards = json_op['num_shards'] if len(json_op['usage']) > 0: num_objects = json_op['usage']['rgw.main']['num_objects'] size_kb = json_op['usage']['rgw.main']['size_kb'] else: num_objects = 0 size_kb = 0 - log.debug("bucket %s id %s num_objects %d size_kb %d num_shards %d", bucket_name, bucket_id, - num_objects, size_kb, num_shards_op) - return BucketStats(bucket_name, bucket_id, num_objects, size_kb, num_shards_op) + log.debug(" \nBUCKET_STATS: \nbucket: {} id: {} num_objects: {} size_kb: {} num_shards: {}\n".format(bucket_name, bucket_id, + num_objects, size_kb, num_shards)) + return BucketStats(bucket_name, bucket_id, num_objects, size_kb, num_shards) def get_bucket_num_shards(bucket_name, bucket_id): @@ -92,21 +97,29 @@ def get_bucket_num_shards(bucket_name, bucket_id): function to get bucket num shards """ metadata = 'bucket.instance:' + bucket_name + ':' + bucket_id - log.debug("metadata %s", metadata) - cmd = exec_cmd('radosgw-admin metadata get %s' % metadata) + cmd = exec_cmd('radosgw-admin metadata get {}'.format(metadata)) json_op = json.loads(cmd) num_shards = json_op['data']['bucket_info']['num_shards'] - log.debug("bucket %s id %s num_shards %d", bucket_name, bucket_id, num_shards) return num_shards +def run_bucket_reshard_cmd(bucket_name, num_shards, **location): + + # TODO: Get rid of duplication. use list + if ('error_location' in location): + return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {} --inject-error-at {}'.format(bucket_name, + num_shards, location.get('error_location', None))) + elif ('abort_location' in location): + return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {} --inject-abort-at {}'.format(bucket_name, + num_shards, location.get('abort_location', None))) + else: + return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {}'.format(bucket_name, num_shards)) def main(): """ execute manual and dynamic resharding commands """ # create user - exec_cmd('radosgw-admin user create --uid %s --display-name %s --access-key %s --secret %s' - % (USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)) + exec_cmd('radosgw-admin user create --uid {} --display-name {} --access-key {} --secret {}'.format(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)) def boto_connect(portnum, ssl, proto): endpoint = proto + '://localhost:' + portnum @@ -122,90 +135,227 @@ def main(): port = get_radosgw_port() - if port == '80': - connection = boto_connect(port, ssl=False, proto='http') - elif port == '443': - connection = boto_connect(port, ssl=True, proto='https') + proto = ('https' if port == '443' else 'http') + connection = boto_connect(port, ssl=False, proto='http') # create a bucket bucket1 = connection.create_bucket(Bucket=BUCKET_NAME1) bucket2 = connection.create_bucket(Bucket=BUCKET_NAME2) + bucket3 = connection.create_bucket(Bucket=BUCKET_NAME3) + bucket4 = connection.create_bucket(Bucket=BUCKET_NAME4) + bucket5 = connection.create_bucket(Bucket=BUCKET_NAME5) ver_bucket = connection.create_bucket(Bucket=VER_BUCKET_NAME) connection.BucketVersioning('ver_bucket') - bucket_stats1 = get_bucket_stats(BUCKET_NAME1) - bucket_stats2 = get_bucket_stats(BUCKET_NAME2) - ver_bucket_stats = get_bucket_stats(VER_BUCKET_NAME) - bucket1_acl = connection.BucketAcl(BUCKET_NAME1).load() bucket2_acl = connection.BucketAcl(BUCKET_NAME2).load() ver_bucket_acl = connection.BucketAcl(VER_BUCKET_NAME).load() # TESTCASE 'reshard-add','reshard','add','add bucket to resharding queue','succeeds' - log.debug(' test: reshard add') - num_shards_expected = bucket_stats1.num_shards + 1 - cmd = exec_cmd('radosgw-admin reshard add --bucket %s --num-shards %s' % (BUCKET_NAME1, num_shards_expected)) + log.debug('TEST: reshard add\n') + + num_shards_expected = get_bucket_stats(BUCKET_NAME1).num_shards + 1 + cmd = exec_cmd('radosgw-admin reshard add --bucket {} --num-shards {}'.format(BUCKET_NAME1, num_shards_expected)) cmd = exec_cmd('radosgw-admin reshard list') json_op = json.loads(cmd) - log.debug('bucket name %s', json_op[0]['bucket_name']) + log.debug('bucket name {}'.format(json_op[0]['bucket_name'])) assert json_op[0]['bucket_name'] == BUCKET_NAME1 assert json_op[0]['new_num_shards'] == num_shards_expected # TESTCASE 'reshard-process','reshard','','process bucket resharding','succeeds' - log.debug(' test: reshard process') + log.debug('TEST: reshard process\n') cmd = exec_cmd('radosgw-admin reshard process') time.sleep(5) # check bucket shards num bucket_stats1 = get_bucket_stats(BUCKET_NAME1) - bucket_stats1.get_num_shards() if bucket_stats1.num_shards != num_shards_expected: - log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME1) + log.error("Resharding failed on bucket {}. Expected number of shards are not created\n".format(BUCKET_NAME1)) # TESTCASE 'reshard-add','reshard','add','add non empty bucket to resharding queue','succeeds' - log.debug(' test: reshard add non empty bucket') + log.debug('TEST: reshard add non empty bucket\n') # create objs num_objs = 8 for i in range(0, num_objs): connection.Object(BUCKET_NAME1, ('key'+str(i))).put(Body=b"some_data") - num_shards_expected = bucket_stats1.num_shards + 1 - cmd = exec_cmd('radosgw-admin reshard add --bucket %s --num-shards %s' % (BUCKET_NAME1, num_shards_expected)) + num_shards_expected = get_bucket_stats(BUCKET_NAME1).num_shards + 1 + cmd = exec_cmd('radosgw-admin reshard add --bucket {} --num-shards {}'.format(BUCKET_NAME1, num_shards_expected)) cmd = exec_cmd('radosgw-admin reshard list') json_op = json.loads(cmd) - log.debug('bucket name %s', json_op[0]['bucket_name']) assert json_op[0]['bucket_name'] == BUCKET_NAME1 assert json_op[0]['new_num_shards'] == num_shards_expected # TESTCASE 'reshard process ,'reshard','process','reshard non empty bucket','succeeds' - log.debug(' test: reshard process non empty bucket') + log.debug(' TEST: reshard process non empty bucket\n') cmd = exec_cmd('radosgw-admin reshard process') # check bucket shards num bucket_stats1 = get_bucket_stats(BUCKET_NAME1) - bucket_stats1.get_num_shards() if bucket_stats1.num_shards != num_shards_expected: - log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME1) + log.error("Resharding failed on bucket {}. Expected number of shards are not created\n".format(BUCKET_NAME1)) - # TESTCASE 'manual resharding','bucket', 'reshard','','manual bucket resharding','succeeds' - log.debug(' test: manual reshard bucket') + # TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with error injection\n') # create objs num_objs = 11 for i in range(0, num_objs): connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data") + time.sleep(5) + old_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected, error_location = "before_target_shard_entry") + + # check bucket shards num + cur_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME2)) + for key in bucket2.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected) + + #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with error injection') + # create objs + num_objs = 11 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data") + + time.sleep(5) + old_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected, error_location = "after_target_shard_entry") + # check bucket shards num + bucket_stats3 = get_bucket_stats(BUCKET_NAME3) + cur_shard_count = bucket_stats3.num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME3)) + for key in bucket3.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected) + + #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with error injection') + # create objs + num_objs = 11 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data") + time.sleep(10) - num_shards_expected = bucket_stats2.num_shards + 1 - cmd = exec_cmd('radosgw-admin bucket reshard --bucket %s --num-shards %s' % (BUCKET_NAME2, - num_shards_expected)) + old_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected, error_location = "before_layout_overwrite") + # check bucket shards num - bucket_stats2 = get_bucket_stats(BUCKET_NAME2) - bucket_stats2.get_num_shards() - if bucket_stats2.num_shards != num_shards_expected: - log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME2) + cur_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME3)) + for key in bucket4.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected) + + # TESTCASE 'manual bucket resharding','inject crash','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with crash injection\n') + + old_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected, abort_location = "before_target_shard_entry") + + # check bucket shards num + cur_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME2)) + for key in bucket2.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected) + + #TESTCASE 'manual bucket resharding','inject crash','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with error injection') + + old_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected, abort_location = "after_target_shard_entry") + + # check bucket shards num + cur_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME3)) + for key in bucket3.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected) + + #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with error injection') + + old_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected, abort_location = "before_layout_overwrite") + # check bucket shards num + bucket_stats4 = get_bucket_stats(BUCKET_NAME4) + cur_shard_count = bucket_stats4.num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME3)) + for key in bucket4.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected) # TESTCASE 'versioning reshard-','bucket', reshard','versioning reshard','succeeds' log.debug(' test: reshard versioned bucket') - num_shards_expected = ver_bucket_stats.num_shards + 1 - cmd = exec_cmd('radosgw-admin bucket reshard --bucket %s --num-shards %s' % (VER_BUCKET_NAME, + num_shards_expected = get_bucket_stats(VER_BUCKET_NAME).num_shards + 1 + cmd = exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {}'.format(VER_BUCKET_NAME, num_shards_expected)) # check bucket shards num ver_bucket_stats = get_bucket_stats(VER_BUCKET_NAME) @@ -220,13 +370,13 @@ def main(): assert new_ver_bucket_acl == ver_bucket_acl # Clean up - log.debug("Deleting bucket %s", BUCKET_NAME1) + log.debug("Deleting bucket {}".format(BUCKET_NAME1)) bucket1.objects.all().delete() bucket1.delete() - log.debug("Deleting bucket %s", BUCKET_NAME2) + log.debug("Deleting bucket {}".format(BUCKET_NAME2)) bucket2.objects.all().delete() bucket2.delete() - log.debug("Deleting bucket %s", VER_BUCKET_NAME) + log.debug("Deleting bucket {}".format(VER_BUCKET_NAME)) ver_bucket.delete() diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 2ce37e70d48ec..1f884181fdb99 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -22,6 +22,7 @@ extern "C" { #include "common/Formatter.h" #include "common/errno.h" #include "common/safe_io.h" +#include "common/fault_injector.h" #include "include/util.h" @@ -408,6 +409,8 @@ void usage() cout << " --trim-delay-ms time interval in msec to limit the frequency of sync error log entries trimming operations,\n"; cout << " the trimming process will sleep the specified msec for every 1000 entries trimmed\n"; cout << " --max-concurrent-ios maximum concurrent ios for bucket operations (default: 32)\n"; + cout << " --fault-inject-at for testing fault injection\n"; + cout << " --fault-abort-at for testing fault abort\n"; cout << "\n"; cout << " := \"YYYY-MM-DD[ hh:mm:ss]\"\n"; cout << "\nQuota options:\n"; @@ -3234,6 +3237,9 @@ int main(int argc, const char **argv) ceph::timespan opt_retry_delay_ms = std::chrono::milliseconds(2000); ceph::timespan opt_timeout_sec = std::chrono::seconds(60); + std::optional inject_error_at; + std::optional inject_abort_at; + SimpleCmd cmd(all_cmds, cmd_aliases); std::optional rgw_obj_fs; // radoslist field separator @@ -3659,6 +3665,10 @@ int main(int argc, const char **argv) opt_retry_delay_ms = std::chrono::milliseconds(atoi(val.c_str())); } else if (ceph_argparse_witharg(args, i, &val, "--timeout-sec", (char*)NULL)) { opt_timeout_sec = std::chrono::seconds(atoi(val.c_str())); + } else if (ceph_argparse_witharg(args, i, &val, "--inject-error-at", (char*)NULL)) { + inject_error_at = val; + } else if (ceph_argparse_witharg(args, i, &val, "--inject-abort-at", (char*)NULL)) { + inject_abort_at = val; } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) { // do nothing } else if (ceph_argparse_witharg(args, i, &val, "--context", (char*)NULL)) { @@ -5464,6 +5474,9 @@ int main(int argc, const char **argv) return EINVAL; } + // to test using FaultInjector + FaultInjector fault; + if (!user_id.empty()) { user_op.set_user_id(user_id); bucket_op.set_user_id(user_id); @@ -6942,7 +6955,12 @@ next: max_entries = DEFAULT_RESHARD_MAX_ENTRIES; } - return br.execute(num_shards, max_entries, dpp(), + if (inject_error_at) { + fault.inject(*inject_error_at, InjectError{-EIO, dpp()}); + } else if (inject_abort_at) { + fault.inject(*inject_abort_at, InjectAbort{}); + } + return br.execute(num_shards, fault, max_entries, dpp(), verbose, &cout, formatter.get()); } diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 1a28a678daa7f..5d966c5b8d93e 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -503,6 +503,7 @@ int RGWBucketReshardLock::renew(const Clock::time_point& now) { int RGWBucketReshard::do_reshard(int num_shards, int max_entries, + FaultInjector& f, bool verbose, ostream *out, Formatter *formatter, @@ -595,6 +596,8 @@ int RGWBucketReshard::do_reshard(int num_shards, return ret; } + if (ret = f.check("before_target_shard_entry"); ret < 0) { return ret; } + int shard_index = (target_shard_id > 0 ? target_shard_id : 0); ret = target_shards_mgr.add_entry(shard_index, entry, account, @@ -603,6 +606,8 @@ int RGWBucketReshard::do_reshard(int num_shards, return ret; } + if (ret = f.check("after_target_shard_entry"); ret < 0) { return ret; } + Clock::time_point now = Clock::now(); if (reshard_lock.should_renew(now)) { // assume outer locks have timespans at least the size of ours, so @@ -642,6 +647,8 @@ int RGWBucketReshard::do_reshard(int num_shards, return -EIO; } + if (ret = f.check("before_layout_overwrite"); ret < 0) { return ret; } + //overwrite current_index for the next reshard process bucket_info.layout.current_index = *bucket_info.layout.target_index; bucket_info.layout.target_index = std::nullopt; // target_layout doesn't need to exist after reshard @@ -671,10 +678,13 @@ int RGWBucketReshard::update_bucket(rgw::BucketReshardState s, const DoutPrefixP return 0; } -int RGWBucketReshard::execute(int num_shards, int max_op_entries, +int RGWBucketReshard::execute(int num_shards, + FaultInjector& f, + int max_op_entries, const DoutPrefixProvider *dpp, - bool verbose, ostream *out, Formatter *formatter, - RGWReshard* reshard_log) + bool verbose, ostream *out, + Formatter *formatter, + RGWReshard* reshard_log) { int ret = reshard_lock.lock(); if (ret < 0) { @@ -697,9 +707,7 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries, // keep a copy of old index layout prev_index = bucket_info.layout.current_index; - ret = do_reshard(num_shards, - max_op_entries, - verbose, out, formatter, dpp); + ret = do_reshard(num_shards, max_op_entries, f, verbose, out, formatter, dpp); if (ret < 0) { goto error_out; } @@ -1022,15 +1030,17 @@ int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvid } { - RGWBucketReshard br(store, bucket_info, attrs, nullptr); - ret = br.execute(entry.new_num_shards, max_entries, dpp, false, nullptr, - nullptr, this); - if (ret < 0) { - ldout(store->ctx(), 0) << __func__ << - ": Error during resharding bucket " << entry.bucket_name << ":" << - cpp_strerror(-ret)<< dendl; - return ret; - } + RGWBucketReshard br(store, bucket_info, attrs, nullptr); + + FaultInjector f; + ret = br.execute(entry.new_num_shards, f, max_entries, dpp, + false, nullptr, nullptr, this); + if (ret < 0) { + ldout(store->ctx(), 0) << __func__ << + ": Error during resharding bucket " << entry.bucket_name << ":" << + cpp_strerror(-ret)<< dendl; + return ret; + } ldout(store->ctx(), 20) << __func__ << " removing reshard queue entry for bucket " << entry.bucket_name << diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index c062aca594b11..1a46a49865aad 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -21,6 +21,7 @@ #include "cls/lock/cls_lock_client.h" #include "rgw_common.h" +#include "common/fault_injector.h" class RGWReshard; @@ -87,9 +88,9 @@ private: int set_target_layout(int new_num_shards, const DoutPrefixProvider *dpp); int update_bucket(rgw::BucketReshardState s, const DoutPrefixProvider* dpp); - + int do_reshard(int num_shards, - int max_entries, + int max_entries, FaultInjector& f, bool verbose, ostream *os, Formatter *formatter, @@ -102,8 +103,8 @@ public: const RGWBucketInfo& _bucket_info, const std::map& _bucket_attrs, RGWBucketReshardLock* _outer_reshard_lock); - int execute(int num_shards, int max_op_entries, - const DoutPrefixProvider *dpp, + int execute(int num_shards, FaultInjector& f, + int max_op_entries, const DoutPrefixProvider *dpp, bool verbose = false, ostream *out = nullptr, Formatter *formatter = nullptr, RGWReshard *reshard_log = nullptr);