From edb8e3bc6e0388ca5dc8ae2c6b4ad2729ef96044 Mon Sep 17 00:00:00 2001 From: Shilpa Jagannath Date: Mon, 12 Oct 2020 18:21:16 +0530 Subject: [PATCH] rgw: reshard tests with fault injection Signed-off-by: Shilpa Jagannath --- qa/workunits/rgw/test_rgw_reshard.py | 253 ++++++++++++++++++++++----- src/rgw/rgw_admin.cc | 19 +- src/rgw/rgw_reshard.cc | 40 +++-- src/rgw/rgw_reshard.h | 11 +- 4 files changed, 254 insertions(+), 69 deletions(-) diff --git a/qa/workunits/rgw/test_rgw_reshard.py b/qa/workunits/rgw/test_rgw_reshard.py index 092a002b2bd7f..5dd2c260aac65 100755 --- a/qa/workunits/rgw/test_rgw_reshard.py +++ b/qa/workunits/rgw/test_rgw_reshard.py @@ -6,6 +6,9 @@ import subprocess import json import boto3 import botocore.exceptions +import re + +from pprint import pprint """ Rgw manual and dynamic resharding testing against a running instance @@ -17,7 +20,7 @@ Rgw manual and dynamic resharding testing against a running instance # # -log.basicConfig(level=log.DEBUG) +log.basicConfig(format = '%(message)s', level=log.DEBUG) log.getLogger('botocore').setLevel(log.CRITICAL) log.getLogger('boto3').setLevel(log.CRITICAL) log.getLogger('urllib3').setLevel(log.CRITICAL) @@ -27,8 +30,11 @@ USER = 'tester' DISPLAY_NAME = 'Testing' ACCESS_KEY = 'NX5QOQKC6BH2IDN8HC7A' SECRET_KEY = 'LnEsqNNqZIpkzauboDcLXLcYaWwLQ3Kop0zAnKIn' -BUCKET_NAME1 = 'myfoo' -BUCKET_NAME2 = 'mybar' +BUCKET_NAME1 = 'a-bucket' +BUCKET_NAME2 = 'b-bucket' +BUCKET_NAME3 = 'c-bucket' +BUCKET_NAME4 = 'd-bucket' +BUCKET_NAME5 = 'e-bucket' VER_BUCKET_NAME = 'myver' @@ -36,14 +42,15 @@ def exec_cmd(cmd): try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) out, err = proc.communicate() + log.info(proc.args) if proc.returncode == 0: log.info('command succeeded') - if out is not None: log.info(out) + if out is not None: log.debug('{} \n {}'.format(out.decode('utf-8'), err.decode('utf-8'))) return out else: - raise Exception("error: %s \nreturncode: %s" % (err, proc.returncode)) + raise Exception("error: {} \nreturncode: {}".format(err, proc.returncode)) except Exception as e: - log.error('command failed') + log.error('command failed\n') log.error(e) return False @@ -64,19 +71,20 @@ def get_bucket_stats(bucket_name): """ function to get bucket stats """ - cmd = exec_cmd("radosgw-admin bucket stats --bucket %s" % bucket_name) + cmd = exec_cmd("radosgw-admin bucket stats --bucket {}".format(bucket_name)) json_op = json.loads(cmd) + #print(json.dumps(json_op, indent = 4, sort_keys=True)) bucket_id = json_op['id'] - num_shards_op = json_op['num_shards'] + num_shards = json_op['num_shards'] if len(json_op['usage']) > 0: num_objects = json_op['usage']['rgw.main']['num_objects'] size_kb = json_op['usage']['rgw.main']['size_kb'] else: num_objects = 0 size_kb = 0 - log.debug("bucket %s id %s num_objects %d size_kb %d num_shards %d", bucket_name, bucket_id, - num_objects, size_kb, num_shards_op) - return BucketStats(bucket_name, bucket_id, num_objects, size_kb, num_shards_op) + log.debug(" \nBUCKET_STATS: \nbucket: {} id: {} num_objects: {} size_kb: {} num_shards: {}\n".format(bucket_name, bucket_id, + num_objects, size_kb, num_shards)) + return BucketStats(bucket_name, bucket_id, num_objects, size_kb, num_shards) def get_bucket_num_shards(bucket_name, bucket_id): @@ -84,21 +92,30 @@ def get_bucket_num_shards(bucket_name, bucket_id): function to get bucket num shards """ metadata = 'bucket.instance:' + bucket_name + ':' + bucket_id - log.debug("metadata %s", metadata) - cmd = exec_cmd('radosgw-admin metadata get %s' % metadata) + cmd = exec_cmd('radosgw-admin metadata get {}'.format(metadata)) json_op = json.loads(cmd) num_shards = json_op['data']['bucket_info']['num_shards'] - log.debug("bucket %s id %s num_shards %d", bucket_name, bucket_id, num_shards) return num_shards +def run_bucket_reshard_cmd(bucket_name, num_shards, **location): + + # TODO: Get rid of duplication. use list + if ('error_location' in location): + return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {} --inject-error-at {}'.format(bucket_name, + num_shards, location.get('error_location', None))) + elif ('abort_location' in location): + return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {} --inject-abort-at {}'.format(bucket_name, + num_shards, location.get('abort_location', None))) + else: + return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {}'.format(bucket_name, num_shards)) + def main(): """ execute manual and dynamic resharding commands """ # create user - exec_cmd('radosgw-admin user create --uid %s --display-name %s --access-key %s --secret %s' - % (USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)) + exec_cmd('radosgw-admin user create --uid {} --display-name {} --access-key {} --secret {}'.format(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)) def boto_connect(portnum, ssl, proto): endpoint = proto + '://localhost:' + portnum @@ -130,82 +147,222 @@ def main(): # create a bucket bucket1 = connection.create_bucket(Bucket=BUCKET_NAME1) bucket2 = connection.create_bucket(Bucket=BUCKET_NAME2) + bucket3 = connection.create_bucket(Bucket=BUCKET_NAME3) + bucket4 = connection.create_bucket(Bucket=BUCKET_NAME4) + bucket5 = connection.create_bucket(Bucket=BUCKET_NAME5) ver_bucket = connection.create_bucket(Bucket=VER_BUCKET_NAME) connection.BucketVersioning('ver_bucket') - bucket_stats1 = get_bucket_stats(BUCKET_NAME1) - bucket_stats2 = get_bucket_stats(BUCKET_NAME2) - ver_bucket_stats = get_bucket_stats(VER_BUCKET_NAME) - bucket1_acl = connection.BucketAcl(BUCKET_NAME1).load() bucket2_acl = connection.BucketAcl(BUCKET_NAME2).load() ver_bucket_acl = connection.BucketAcl(VER_BUCKET_NAME).load() # TESTCASE 'reshard-add','reshard','add','add bucket to resharding queue','succeeds' - log.debug(' test: reshard add') - num_shards_expected = bucket_stats1.num_shards + 1 - cmd = exec_cmd('radosgw-admin reshard add --bucket %s --num-shards %s' % (BUCKET_NAME1, num_shards_expected)) + log.debug('TEST: reshard add\n') + + num_shards_expected = get_bucket_stats(BUCKET_NAME1).num_shards + 1 + cmd = exec_cmd('radosgw-admin reshard add --bucket {} --num-shards {}'.format(BUCKET_NAME1, num_shards_expected)) + cmd = exec_cmd('radosgw-admin reshard list') json_op = json.loads(cmd) - log.debug('bucket name %s', json_op[0]['bucket_name']) + log.debug('bucket name {}'.format(json_op[0]['bucket_name'])) assert json_op[0]['bucket_name'] == BUCKET_NAME1 assert json_op[0]['new_num_shards'] == num_shards_expected # TESTCASE 'reshard-process','reshard','','process bucket resharding','succeeds' - log.debug(' test: reshard process') + log.debug('TEST: reshard process\n') cmd = exec_cmd('radosgw-admin reshard process') time.sleep(5) # check bucket shards num bucket_stats1 = get_bucket_stats(BUCKET_NAME1) - bucket_stats1.get_num_shards() if bucket_stats1.num_shards != num_shards_expected: - log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME1) + log.error("Resharding failed on bucket {}. Expected number of shards are not created\n".format(BUCKET_NAME1)) # TESTCASE 'reshard-add','reshard','add','add non empty bucket to resharding queue','succeeds' - log.debug(' test: reshard add non empty bucket') + log.debug('TEST: reshard add non empty bucket\n') # create objs num_objs = 8 for i in range(0, num_objs): connection.Object(BUCKET_NAME1, ('key'+str(i))).put(Body=b"some_data") - num_shards_expected = bucket_stats1.num_shards + 1 - cmd = exec_cmd('radosgw-admin reshard add --bucket %s --num-shards %s' % (BUCKET_NAME1, num_shards_expected)) + num_shards_expected = get_bucket_stats(BUCKET_NAME1).num_shards + 1 + cmd = exec_cmd('radosgw-admin reshard add --bucket {} --num-shards {}'.format(BUCKET_NAME1, num_shards_expected)) cmd = exec_cmd('radosgw-admin reshard list') json_op = json.loads(cmd) - log.debug('bucket name %s', json_op[0]['bucket_name']) assert json_op[0]['bucket_name'] == BUCKET_NAME1 assert json_op[0]['new_num_shards'] == num_shards_expected # TESTCASE 'reshard process ,'reshard','process','reshard non empty bucket','succeeds' - log.debug(' test: reshard process non empty bucket') + log.debug('TEST: reshard process non empty bucket\n') cmd = exec_cmd('radosgw-admin reshard process') # check bucket shards num bucket_stats1 = get_bucket_stats(BUCKET_NAME1) - bucket_stats1.get_num_shards() if bucket_stats1.num_shards != num_shards_expected: - log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME1) + log.error("Resharding failed on bucket {}. Expected number of shards are not created\n".format(BUCKET_NAME1)) - # TESTCASE 'manual resharding','bucket', 'reshard','','manual bucket resharding','succeeds' - log.debug(' test: manual reshard bucket') + # TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with error injection\n') # create objs num_objs = 11 for i in range(0, num_objs): connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data") + time.sleep(5) + old_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected, error_location = "before_target_shard_entry") + + # check bucket shards num + cur_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME2)) + for key in bucket2.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected) + + #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with error injection') + # create objs + num_objs = 11 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data") + + time.sleep(5) + old_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected, error_location = "after_target_shard_entry") + # check bucket shards num + bucket_stats3 = get_bucket_stats(BUCKET_NAME3) + cur_shard_count = bucket_stats3.num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME3)) + for key in bucket3.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected) + + #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with error injection') + # create objs + num_objs = 11 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data") + time.sleep(10) - num_shards_expected = bucket_stats2.num_shards + 1 - cmd = exec_cmd('radosgw-admin bucket reshard --bucket %s --num-shards %s' % (BUCKET_NAME2, - num_shards_expected)) + old_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected, error_location = "before_layout_overwrite") + + # check bucket shards num + cur_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME3)) + for key in bucket4.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected) + + # TESTCASE 'manual bucket resharding','inject crash','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with crash injection\n') + + old_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected, abort_location = "before_target_shard_entry") + # check bucket shards num - bucket_stats2 = get_bucket_stats(BUCKET_NAME2) - bucket_stats2.get_num_shards() - if bucket_stats2.num_shards != num_shards_expected: - log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME2) + cur_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME2)) + for key in bucket2.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected) + + #TESTCASE 'manual bucket resharding','inject crash','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with error injection') + + old_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected, abort_location = "after_target_shard_entry") + + # check bucket shards num + cur_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME3)) + for key in bucket3.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected) + + #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard' + log.debug('TEST: reshard bucket with error injection') + + old_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards + num_shards_expected = old_shard_count + 1 + run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected, abort_location = "before_layout_overwrite") + # check bucket shards num + bucket_stats4 = get_bucket_stats(BUCKET_NAME4) + cur_shard_count = bucket_stats4.num_shards + assert(cur_shard_count == old_shard_count) + + #verify if bucket is accessible + log.info('List objects from bucket: {}'.format(BUCKET_NAME3)) + for key in bucket4.objects.all(): + print(key.key) + + #verify if new objects can be added + num_objs = 5 + for i in range(0, num_objs): + connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data") + + #retry reshard + run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected) # TESTCASE 'versioning reshard-','bucket', reshard','versioning reshard','succeeds' log.debug(' test: reshard versioned bucket') - num_shards_expected = ver_bucket_stats.num_shards + 1 - cmd = exec_cmd('radosgw-admin bucket reshard --bucket %s --num-shards %s' % (VER_BUCKET_NAME, + num_shards_expected = get_bucket_stats(VER_BUCKET_NAME).num_shards + 1 + cmd = exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {}'.format(VER_BUCKET_NAME, num_shards_expected)) # check bucket shards num ver_bucket_stats = get_bucket_stats(VER_BUCKET_NAME) @@ -220,13 +377,13 @@ def main(): assert new_ver_bucket_acl == ver_bucket_acl # Clean up - log.debug("Deleting bucket %s", BUCKET_NAME1) + log.debug("Deleting bucket {}".format(BUCKET_NAME1)) bucket1.objects.all().delete() bucket1.delete() - log.debug("Deleting bucket %s", BUCKET_NAME2) + log.debug("Deleting bucket {}".format(BUCKET_NAME2)) bucket2.objects.all().delete() bucket2.delete() - log.debug("Deleting bucket %s", VER_BUCKET_NAME) + log.debug("Deleting bucket {}".format(VER_BUCKET_NAME)) ver_bucket.delete() diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 6cf155857a332..54a11bc55a9a2 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -22,6 +22,7 @@ extern "C" { #include "common/Formatter.h" #include "common/errno.h" #include "common/safe_io.h" +#include "common/fault_injector.h" #include "include/util.h" @@ -422,6 +423,8 @@ void usage() cout << " --trim-delay-ms time interval in msec to limit the frequency of sync error log entries trimming operations,\n"; cout << " the trimming process will sleep the specified msec for every 1000 entries trimmed\n"; cout << " --max-concurrent-ios maximum concurrent ios for bucket operations (default: 32)\n"; + cout << " --fault-inject-at for testing fault injection\n"; + cout << " --fault-abort-at for testing fault abort\n"; cout << "\n"; cout << " := \"YYYY-MM-DD[ hh:mm:ss]\"\n"; cout << "\nQuota options:\n"; @@ -3599,6 +3602,9 @@ int main(int argc, const char **argv) ceph::timespan opt_retry_delay_ms = std::chrono::milliseconds(2000); ceph::timespan opt_timeout_sec = std::chrono::seconds(60); + std::optional inject_error_at; + std::optional inject_abort_at; + SimpleCmd cmd(all_cmds, cmd_aliases); bool raw_storage_op = false; @@ -4074,6 +4080,10 @@ int main(int argc, const char **argv) opt_retry_delay_ms = std::chrono::milliseconds(atoi(val.c_str())); } else if (ceph_argparse_witharg(args, i, &val, "--timeout-sec", (char*)NULL)) { opt_timeout_sec = std::chrono::seconds(atoi(val.c_str())); + } else if (ceph_argparse_witharg(args, i, &val, "--inject-error-at", (char*)NULL)) { + inject_error_at = val; + } else if (ceph_argparse_witharg(args, i, &val, "--inject-abort-at", (char*)NULL)) { + inject_abort_at = val; } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) { // do nothing } else if (ceph_argparse_witharg(args, i, &val, "--context", (char*)NULL)) { @@ -6081,6 +6091,8 @@ int main(int argc, const char **argv) return EINVAL; } + // to test using FaultInjector + FaultInjector fault; if (!rgw::sal::User::empty(user)) { user_op.set_user_id(user->get_id()); bucket_op.set_user_id(user->get_id()); @@ -7559,7 +7571,12 @@ next: max_entries = DEFAULT_RESHARD_MAX_ENTRIES; } - return br.execute(num_shards, max_entries, dpp(), + if (inject_error_at) { + fault.inject(*inject_error_at, InjectError{-EIO, dpp()}); + } else if (inject_abort_at) { + fault.inject(*inject_abort_at, InjectAbort{}); + } + return br.execute(num_shards, fault, max_entries, dpp(), verbose, &cout, formatter.get()); } diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 60482690f4f5c..facaae79dfa32 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -517,6 +517,7 @@ int RGWBucketReshardLock::renew(const Clock::time_point& now) { int RGWBucketReshard::do_reshard(int num_shards, int max_entries, + FaultInjector& f, bool verbose, ostream *out, Formatter *formatter, @@ -610,6 +611,8 @@ int RGWBucketReshard::do_reshard(int num_shards, return ret; } + if (ret = f.check("before_target_shard_entry"); ret < 0) { return ret; } + int shard_index = (target_shard_id > 0 ? target_shard_id : 0); ret = target_shards_mgr.add_entry(shard_index, entry, account, @@ -618,6 +621,8 @@ int RGWBucketReshard::do_reshard(int num_shards, return ret; } + if (ret = f.check("after_target_shard_entry"); ret < 0) { return ret; } + Clock::time_point now = Clock::now(); if (reshard_lock.should_renew(now)) { // assume outer locks have timespans at least the size of ours, so @@ -657,6 +662,8 @@ int RGWBucketReshard::do_reshard(int num_shards, return -EIO; } + if (ret = f.check("before_layout_overwrite"); ret < 0) { return ret; } + //overwrite current_index for the next reshard process bucket_info.layout.current_index = *bucket_info.layout.target_index; bucket_info.layout.target_index = std::nullopt; // target_layout doesn't need to exist after reshard @@ -686,10 +693,13 @@ int RGWBucketReshard::update_bucket(rgw::BucketReshardState s, const DoutPrefixP return 0; } -int RGWBucketReshard::execute(int num_shards, int max_op_entries, +int RGWBucketReshard::execute(int num_shards, + FaultInjector& f, + int max_op_entries, const DoutPrefixProvider *dpp, - bool verbose, ostream *out, Formatter *formatter, - RGWReshard* reshard_log) + bool verbose, ostream *out, + Formatter *formatter, + RGWReshard* reshard_log) { int ret = reshard_lock.lock(dpp); if (ret < 0) { @@ -712,9 +722,7 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries, // keep a copy of old index layout prev_index = bucket_info.layout.current_index; - ret = do_reshard(num_shards, - max_op_entries, - verbose, out, formatter, dpp); + ret = do_reshard(num_shards, max_op_entries, f, verbose, out, formatter, dpp); if (ret < 0) { goto error_out; } @@ -1035,15 +1043,17 @@ int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvid } { - RGWBucketReshard br(store, bucket_info, attrs, nullptr); - ret = br.execute(entry.new_num_shards, max_entries, dpp, false, nullptr, - nullptr, this); - if (ret < 0) { - ldpp_dout(dpp, 0) << __func__ << - ": Error during resharding bucket " << entry.bucket_name << ":" << - cpp_strerror(-ret)<< dendl; - return ret; - } + RGWBucketReshard br(store, bucket_info, attrs, nullptr); + + FaultInjector f; + ret = br.execute(entry.new_num_shards, f, max_entries, dpp, + false, nullptr, nullptr, this); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << + ": Error during resharding bucket " << entry.bucket_name << ":" << + cpp_strerror(-ret)<< dendl; + return ret; + } ldpp_dout(dpp, 20) << __func__ << " removing reshard queue entry for bucket " << entry.bucket_name << diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index 7a18ddb82b061..2bba9a3690210 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -21,6 +21,7 @@ #include "cls/lock/cls_lock_client.h" #include "rgw_common.h" +#include "common/fault_injector.h" class RGWReshard; @@ -87,9 +88,9 @@ private: int set_target_layout(int new_num_shards, const DoutPrefixProvider *dpp); int update_bucket(rgw::BucketReshardState s, const DoutPrefixProvider* dpp); - + int do_reshard(int num_shards, - int max_entries, + int max_entries, FaultInjector& f, bool verbose, std::ostream *os, Formatter *formatter, @@ -102,10 +103,10 @@ public: const RGWBucketInfo& _bucket_info, const std::map& _bucket_attrs, RGWBucketReshardLock* _outer_reshard_lock); - int execute(int num_shards, int max_op_entries, - const DoutPrefixProvider *dpp, + int execute(int num_shards, FaultInjector& f, + int max_op_entries, const DoutPrefixProvider *dpp, bool verbose = false, std::ostream *out = nullptr, - Formatter *formatter = nullptr, + ceph::Formatter *formatter = nullptr, RGWReshard *reshard_log = nullptr); int get_status(const DoutPrefixProvider *dpp, std::list *status); int cancel(const DoutPrefixProvider *dpp); -- 2.39.5