]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: reshard tests with fault injection
authorShilpa Jagannath <smanjara@redhat.com>
Mon, 12 Oct 2020 12:51:16 +0000 (18:21 +0530)
committerCasey Bodley <cbodley@redhat.com>
Fri, 5 Feb 2021 02:19:45 +0000 (21:19 -0500)
Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
qa/workunits/rgw/test_rgw_reshard.py
src/rgw/rgw_admin.cc
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index 14f4fe509946989e2e8f6ad59b78ad928cbea207..c414713dd68516ba4b7c74b468d0bcb484ca16a1 100755 (executable)
@@ -5,6 +5,8 @@ import time
 import subprocess
 import json
 import boto3
+from pprint import pprint
+import re
 
 """
 Rgw manual and dynamic resharding  testing against a running instance
@@ -16,7 +18,7 @@ Rgw manual and dynamic resharding  testing against a running instance
 #
 #
 
-log.basicConfig(level=log.DEBUG)
+log.basicConfig(format = '%(message)s', level=log.DEBUG)
 log.getLogger('botocore').setLevel(log.CRITICAL)
 log.getLogger('boto3').setLevel(log.CRITICAL)
 log.getLogger('urllib3').setLevel(log.CRITICAL)
@@ -26,33 +28,35 @@ USER = 'tester'
 DISPLAY_NAME = 'Testing'
 ACCESS_KEY = 'NX5QOQKC6BH2IDN8HC7A'
 SECRET_KEY = 'LnEsqNNqZIpkzauboDcLXLcYaWwLQ3Kop0zAnKIn'
-BUCKET_NAME1 = 'myfoo'
-BUCKET_NAME2 = 'mybar'
+BUCKET_NAME1 = 'a-bucket'
+BUCKET_NAME2 = 'b-bucket'
+BUCKET_NAME3 = 'c-bucket'
+BUCKET_NAME4 = 'd-bucket'
+BUCKET_NAME5 = 'e-bucket'
 VER_BUCKET_NAME = 'myver'
 
-
 def exec_cmd(cmd):
     try:
         proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
         out, err = proc.communicate()
+        log.info(proc.args)
         if proc.returncode == 0:
-            log.info('command succeeded')
-            if out is not None: log.info(out)
+            if out is not None: log.debug('{} \n {}'.format(out.decode('utf-8'), err.decode('utf-8')))
             return out
         else:
-            raise Exception("error: %s \nreturncode: %s" % (err, proc.returncode))
+            raise Exception("error: {} \nreturncode: {}".format(err, proc.returncode))
     except Exception as e:
-        log.error('command failed')
+        log.error('command failed\n')
         log.error(e)
         return False
 
 
 def get_radosgw_port():
     out = exec_cmd('sudo netstat -nltp | grep radosgw')
-    log.debug('output: %s' % out)
+    log.debug('output: {}'.format(out))
     x = out.decode('utf8').split(" ")
     port = [i for i in x if ':' in i][0].split(':')[1]
-    log.info('radosgw port: %s' % port)
+    log.info('radosgw port: {}'.format(port))
     return port
 
 
@@ -72,19 +76,20 @@ def get_bucket_stats(bucket_name):
     """
     function to get bucket stats
     """
-    cmd = exec_cmd("radosgw-admin bucket stats --bucket %s" % bucket_name)
+    cmd = exec_cmd("radosgw-admin bucket stats --bucket {}".format(bucket_name))
     json_op = json.loads(cmd)
+    #print(json.dumps(json_op, indent = 4, sort_keys=True))
     bucket_id = json_op['id']
-    num_shards_op = json_op['num_shards']
+    num_shards = json_op['num_shards']
     if len(json_op['usage']) > 0:
         num_objects = json_op['usage']['rgw.main']['num_objects']
         size_kb = json_op['usage']['rgw.main']['size_kb']
     else:
         num_objects = 0
         size_kb = 0
-    log.debug("bucket %s id %s num_objects %d size_kb %d num_shards %d", bucket_name, bucket_id,
-              num_objects, size_kb, num_shards_op)
-    return BucketStats(bucket_name, bucket_id, num_objects, size_kb, num_shards_op)
+    log.debug(" \nBUCKET_STATS: \nbucket: {} id: {} num_objects: {} size_kb: {} num_shards: {}\n".format(bucket_name, bucket_id,
+              num_objects, size_kb, num_shards))
+    return BucketStats(bucket_name, bucket_id, num_objects, size_kb, num_shards)
 
 
 def get_bucket_num_shards(bucket_name, bucket_id):
@@ -92,21 +97,29 @@ def get_bucket_num_shards(bucket_name, bucket_id):
     function to get bucket num shards
     """
     metadata = 'bucket.instance:' + bucket_name + ':' + bucket_id
-    log.debug("metadata %s", metadata)
-    cmd = exec_cmd('radosgw-admin metadata get %s' % metadata)
+    cmd = exec_cmd('radosgw-admin metadata get {}'.format(metadata))
     json_op = json.loads(cmd)
     num_shards = json_op['data']['bucket_info']['num_shards']
-    log.debug("bucket %s id %s num_shards %d", bucket_name, bucket_id, num_shards)
     return num_shards
 
+def run_bucket_reshard_cmd(bucket_name, num_shards, **location):
+
+    # TODO: Get rid of duplication. use list
+    if ('error_location' in location):
+        return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {} --inject-error-at {}'.format(bucket_name,
+                        num_shards, location.get('error_location', None)))
+    elif ('abort_location' in location):
+        return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {} --inject-abort-at {}'.format(bucket_name,
+                        num_shards, location.get('abort_location', None)))
+    else:
+        return exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {}'.format(bucket_name, num_shards))
 
 def main():
     """
     execute manual and dynamic resharding commands
     """
     # create user
-    exec_cmd('radosgw-admin user create --uid %s --display-name %s --access-key %s --secret %s'
-                   % (USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY))
+    exec_cmd('radosgw-admin user create --uid {} --display-name {} --access-key {} --secret {}'.format(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY))
 
     def boto_connect(portnum, ssl, proto):
         endpoint = proto + '://localhost:' + portnum
@@ -122,90 +135,227 @@ def main():
 
     port = get_radosgw_port()
 
-    if port == '80':
-        connection = boto_connect(port, ssl=False, proto='http')
-    elif port == '443':
-        connection = boto_connect(port, ssl=True, proto='https')
+    proto = ('https' if port == '443' else 'http')
+    connection = boto_connect(port, ssl=False, proto='http')
 
     # create a bucket
     bucket1 = connection.create_bucket(Bucket=BUCKET_NAME1)
     bucket2 = connection.create_bucket(Bucket=BUCKET_NAME2)
+    bucket3 = connection.create_bucket(Bucket=BUCKET_NAME3)
+    bucket4 = connection.create_bucket(Bucket=BUCKET_NAME4)
+    bucket5 = connection.create_bucket(Bucket=BUCKET_NAME5)
     ver_bucket = connection.create_bucket(Bucket=VER_BUCKET_NAME)
     connection.BucketVersioning('ver_bucket')
 
-    bucket_stats1 = get_bucket_stats(BUCKET_NAME1)
-    bucket_stats2 = get_bucket_stats(BUCKET_NAME2)
-    ver_bucket_stats = get_bucket_stats(VER_BUCKET_NAME)
-
     bucket1_acl = connection.BucketAcl(BUCKET_NAME1).load()
     bucket2_acl = connection.BucketAcl(BUCKET_NAME2).load()
     ver_bucket_acl = connection.BucketAcl(VER_BUCKET_NAME).load()
 
     # TESTCASE 'reshard-add','reshard','add','add bucket to resharding queue','succeeds'
-    log.debug(' test: reshard add')
-    num_shards_expected = bucket_stats1.num_shards + 1
-    cmd = exec_cmd('radosgw-admin reshard add --bucket %s --num-shards %s' % (BUCKET_NAME1, num_shards_expected))
+    log.debug('TEST: reshard add\n')
+    
+    num_shards_expected = get_bucket_stats(BUCKET_NAME1).num_shards + 1
+    cmd = exec_cmd('radosgw-admin reshard add --bucket {} --num-shards {}'.format(BUCKET_NAME1, num_shards_expected))
     cmd = exec_cmd('radosgw-admin reshard list')
     json_op = json.loads(cmd)
-    log.debug('bucket name %s', json_op[0]['bucket_name'])
+    log.debug('bucket name {}'.format(json_op[0]['bucket_name']))
     assert json_op[0]['bucket_name'] == BUCKET_NAME1
     assert json_op[0]['new_num_shards'] == num_shards_expected
 
     # TESTCASE 'reshard-process','reshard','','process bucket resharding','succeeds'
-    log.debug(' test: reshard process')
+    log.debug('TEST: reshard process\n')
     cmd = exec_cmd('radosgw-admin reshard process')
     time.sleep(5)
     # check bucket shards num
     bucket_stats1 = get_bucket_stats(BUCKET_NAME1)
-    bucket_stats1.get_num_shards()
     if bucket_stats1.num_shards != num_shards_expected:
-        log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME1)
+        log.error("Resharding failed on bucket {}. Expected number of shards are not created\n".format(BUCKET_NAME1))
 
     # TESTCASE 'reshard-add','reshard','add','add non empty bucket to resharding queue','succeeds'
-    log.debug(' test: reshard add non empty bucket')
+    log.debug('TEST: reshard add non empty bucket\n')
     # create objs
     num_objs = 8
     for i in range(0, num_objs):
         connection.Object(BUCKET_NAME1, ('key'+str(i))).put(Body=b"some_data")
 
-    num_shards_expected = bucket_stats1.num_shards + 1
-    cmd = exec_cmd('radosgw-admin reshard add --bucket %s --num-shards %s' % (BUCKET_NAME1, num_shards_expected))
+    num_shards_expected = get_bucket_stats(BUCKET_NAME1).num_shards + 1
+    cmd = exec_cmd('radosgw-admin reshard add --bucket {} --num-shards {}'.format(BUCKET_NAME1, num_shards_expected))
     cmd = exec_cmd('radosgw-admin reshard list')
     json_op = json.loads(cmd)
-    log.debug('bucket name %s', json_op[0]['bucket_name'])
     assert json_op[0]['bucket_name'] == BUCKET_NAME1
     assert json_op[0]['new_num_shards'] == num_shards_expected
 
     # TESTCASE 'reshard process ,'reshard','process','reshard non empty bucket','succeeds'
-    log.debug(' test: reshard process non empty bucket')
+    log.debug(' TEST: reshard process non empty bucket\n')
     cmd = exec_cmd('radosgw-admin reshard process')
     # check bucket shards num
     bucket_stats1 = get_bucket_stats(BUCKET_NAME1)
-    bucket_stats1.get_num_shards()
     if bucket_stats1.num_shards != num_shards_expected:
-        log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME1)
+        log.error("Resharding failed on bucket {}. Expected number of shards are not created\n".format(BUCKET_NAME1))
 
-    # TESTCASE 'manual resharding','bucket', 'reshard','','manual bucket resharding','succeeds'
-    log.debug(' test: manual reshard bucket')
+    # TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard'
+    log.debug('TEST: reshard bucket with error injection\n')
     # create objs
     num_objs = 11
     for i in range(0, num_objs):
         connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data")
 
+    time.sleep(5)
+    old_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards
+    num_shards_expected = old_shard_count + 1
+    run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected, error_location = "before_target_shard_entry")
+
+    # check bucket shards num
+    cur_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards
+    assert(cur_shard_count == old_shard_count)
+
+    #verify if bucket is accessible
+    log.info('List objects from bucket: {}'.format(BUCKET_NAME2))
+    for key in bucket2.objects.all():
+        print(key.key)
+
+    #verify if new objects can be added
+    num_objs = 5
+    for i in range(0, num_objs):
+        connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data")
+
+    #retry reshard
+    run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected)
+
+    #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard'
+    log.debug('TEST: reshard bucket with error injection')
+    # create objs
+    num_objs = 11
+    for i in range(0, num_objs):
+        connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data")
+
+    time.sleep(5)
+    old_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards
+    num_shards_expected = old_shard_count + 1
+    run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected, error_location = "after_target_shard_entry")
+    # check bucket shards num
+    bucket_stats3 = get_bucket_stats(BUCKET_NAME3)
+    cur_shard_count = bucket_stats3.num_shards
+    assert(cur_shard_count == old_shard_count)
+
+    #verify if bucket is accessible
+    log.info('List objects from bucket: {}'.format(BUCKET_NAME3))
+    for key in bucket3.objects.all():
+        print(key.key)
+
+    #verify if new objects can be added
+    num_objs = 5
+    for i in range(0, num_objs):
+        connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data")
+
+    #retry reshard
+    run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected)
+    
+    #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard'
+    log.debug('TEST: reshard bucket with error injection')
+    # create objs
+    num_objs = 11
+    for i in range(0, num_objs):
+        connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data")
+
     time.sleep(10)
-    num_shards_expected = bucket_stats2.num_shards + 1
-    cmd = exec_cmd('radosgw-admin bucket reshard --bucket %s --num-shards %s' % (BUCKET_NAME2,
-                                                                                 num_shards_expected))
+    old_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards
+    num_shards_expected = old_shard_count + 1
+    run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected, error_location = "before_layout_overwrite")
+
     # check bucket shards num
-    bucket_stats2 = get_bucket_stats(BUCKET_NAME2)
-    bucket_stats2.get_num_shards()
-    if bucket_stats2.num_shards != num_shards_expected:
-        log.error("Resharding failed on bucket %s. Expected number of shards are not created" % BUCKET_NAME2)
+    cur_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards
+    assert(cur_shard_count == old_shard_count)
+
+    #verify if bucket is accessible
+    log.info('List objects from bucket: {}'.format(BUCKET_NAME3))
+    for key in bucket4.objects.all():
+        print(key.key)
+
+    #verify if new objects can be added
+    num_objs = 5
+    for i in range(0, num_objs):
+        connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data")
+
+    #retry reshard
+    run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected)
+
+    # TESTCASE 'manual bucket resharding','inject crash','fail','check bucket accessibility', 'retry reshard'
+    log.debug('TEST: reshard bucket with crash injection\n')
+
+    old_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards
+    num_shards_expected = old_shard_count + 1
+    run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected, abort_location = "before_target_shard_entry")
+
+    # check bucket shards num
+    cur_shard_count = get_bucket_stats(BUCKET_NAME2).num_shards
+    assert(cur_shard_count == old_shard_count)
+
+    #verify if bucket is accessible
+    log.info('List objects from bucket: {}'.format(BUCKET_NAME2))
+    for key in bucket2.objects.all():
+        print(key.key)
+
+    #verify if new objects can be added
+    num_objs = 5
+    for i in range(0, num_objs):
+        connection.Object(BUCKET_NAME2, ('key' + str(i))).put(Body=b"some_data")
+
+    #retry reshard
+    run_bucket_reshard_cmd(BUCKET_NAME2, num_shards_expected)
+
+    #TESTCASE 'manual bucket resharding','inject crash','fail','check bucket accessibility', 'retry reshard'
+    log.debug('TEST: reshard bucket with error injection')
+
+    old_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards
+    num_shards_expected = old_shard_count + 1
+    run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected, abort_location = "after_target_shard_entry")
+
+    # check bucket shards num
+    cur_shard_count = get_bucket_stats(BUCKET_NAME3).num_shards
+    assert(cur_shard_count == old_shard_count)
+
+    #verify if bucket is accessible
+    log.info('List objects from bucket: {}'.format(BUCKET_NAME3))
+    for key in bucket3.objects.all():
+        print(key.key)
+
+    #verify if new objects can be added
+    num_objs = 5
+    for i in range(0, num_objs):
+        connection.Object(BUCKET_NAME3, ('key' + str(i))).put(Body=b"some_data")
+
+    #retry reshard
+    run_bucket_reshard_cmd(BUCKET_NAME3, num_shards_expected)
+
+    #TESTCASE 'manual bucket resharding','inject error','fail','check bucket accessibility', 'retry reshard'
+    log.debug('TEST: reshard bucket with error injection')
+
+    old_shard_count = get_bucket_stats(BUCKET_NAME4).num_shards
+    num_shards_expected = old_shard_count + 1
+    run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected, abort_location = "before_layout_overwrite")
+    # check bucket shards num
+    bucket_stats4 = get_bucket_stats(BUCKET_NAME4)
+    cur_shard_count = bucket_stats4.num_shards
+    assert(cur_shard_count == old_shard_count)
+
+    #verify if bucket is accessible
+    log.info('List objects from bucket: {}'.format(BUCKET_NAME3))
+    for key in bucket4.objects.all():
+        print(key.key)
+
+    #verify if new objects can be added
+    num_objs = 5
+    for i in range(0, num_objs):
+        connection.Object(BUCKET_NAME4, ('key' + str(i))).put(Body=b"some_data")
+
+    #retry reshard
+    run_bucket_reshard_cmd(BUCKET_NAME4, num_shards_expected)
 
     # TESTCASE 'versioning reshard-','bucket', reshard','versioning reshard','succeeds'
     log.debug(' test: reshard versioned bucket')
-    num_shards_expected = ver_bucket_stats.num_shards + 1
-    cmd = exec_cmd('radosgw-admin bucket reshard --bucket %s --num-shards %s' % (VER_BUCKET_NAME,
+    num_shards_expected = get_bucket_stats(VER_BUCKET_NAME).num_shards + 1
+    cmd = exec_cmd('radosgw-admin bucket reshard --bucket {} --num-shards {}'.format(VER_BUCKET_NAME,
                                                                                  num_shards_expected))
     # check bucket shards num
     ver_bucket_stats = get_bucket_stats(VER_BUCKET_NAME)
@@ -220,13 +370,13 @@ def main():
     assert new_ver_bucket_acl == ver_bucket_acl
 
     # Clean up
-    log.debug("Deleting bucket %s", BUCKET_NAME1)
+    log.debug("Deleting bucket {}".format(BUCKET_NAME1))
     bucket1.objects.all().delete()
     bucket1.delete()
-    log.debug("Deleting bucket %s", BUCKET_NAME2)
+    log.debug("Deleting bucket {}".format(BUCKET_NAME2))
     bucket2.objects.all().delete()
     bucket2.delete()
-    log.debug("Deleting bucket %s", VER_BUCKET_NAME)
+    log.debug("Deleting bucket {}".format(VER_BUCKET_NAME))
     ver_bucket.delete()
 
 
index 2ce37e70d48eca695a1137c49f2f40e90e6f21db..1f884181fdb99547627558d8fe27d2820f7c527e 100644 (file)
@@ -22,6 +22,7 @@ extern "C" {
 #include "common/Formatter.h"
 #include "common/errno.h"
 #include "common/safe_io.h"
+#include "common/fault_injector.h"
 
 #include "include/util.h"
 
@@ -408,6 +409,8 @@ void usage()
   cout << "   --trim-delay-ms           time interval in msec to limit the frequency of sync error log entries trimming operations,\n";
   cout << "                             the trimming process will sleep the specified msec for every 1000 entries trimmed\n";
   cout << "   --max-concurrent-ios      maximum concurrent ios for bucket operations (default: 32)\n";
+  cout << "   --fault-inject-at         for testing fault injection\n";
+  cout << "   --fault-abort-at          for testing fault abort\n";
   cout << "\n";
   cout << "<date> := \"YYYY-MM-DD[ hh:mm:ss]\"\n";
   cout << "\nQuota options:\n";
@@ -3234,6 +3237,9 @@ int main(int argc, const char **argv)
   ceph::timespan opt_retry_delay_ms = std::chrono::milliseconds(2000);
   ceph::timespan opt_timeout_sec = std::chrono::seconds(60);
 
+  std::optional<std::string> inject_error_at;
+  std::optional<std::string> inject_abort_at;
+
   SimpleCmd cmd(all_cmds, cmd_aliases);
 
   std::optional<std::string> rgw_obj_fs; // radoslist field separator
@@ -3659,6 +3665,10 @@ int main(int argc, const char **argv)
       opt_retry_delay_ms = std::chrono::milliseconds(atoi(val.c_str()));
     } else if (ceph_argparse_witharg(args, i, &val, "--timeout-sec", (char*)NULL)) {
       opt_timeout_sec = std::chrono::seconds(atoi(val.c_str()));
+    } else if (ceph_argparse_witharg(args, i, &val, "--inject-error-at", (char*)NULL)) {
+      inject_error_at = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--inject-abort-at", (char*)NULL)) {
+      inject_abort_at = val;
     } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
       // do nothing
     } else if (ceph_argparse_witharg(args, i, &val, "--context", (char*)NULL)) {
@@ -5464,6 +5474,9 @@ int main(int argc, const char **argv)
       return EINVAL;
   }
 
+  // to test using FaultInjector
+  FaultInjector<std::string_view> fault;
+
   if (!user_id.empty()) {
     user_op.set_user_id(user_id);
     bucket_op.set_user_id(user_id);
@@ -6942,7 +6955,12 @@ next:
       max_entries = DEFAULT_RESHARD_MAX_ENTRIES;
     }
 
-    return br.execute(num_shards, max_entries, dpp(),
+    if (inject_error_at) {
+      fault.inject(*inject_error_at, InjectError{-EIO, dpp()});
+    } else if (inject_abort_at) {
+      fault.inject(*inject_abort_at, InjectAbort{});
+    }
+    return br.execute(num_shards, fault, max_entries, dpp(),
                       verbose, &cout, formatter.get());
   }
 
index 1a28a678daa7fe6b7431459c31a7eb107f77dab8..5d966c5b8d93e4e4d0274618dbd31f0e77383aa0 100644 (file)
@@ -503,6 +503,7 @@ int RGWBucketReshardLock::renew(const Clock::time_point& now) {
 
 int RGWBucketReshard::do_reshard(int num_shards,
                                 int max_entries,
+         FaultInjector<std::string_view>& f,
                                 bool verbose,
                                 ostream *out,
                                 Formatter *formatter,
@@ -595,6 +596,8 @@ int RGWBucketReshard::do_reshard(int num_shards,
          return ret;
        }
 
+  if (ret = f.check("before_target_shard_entry"); ret < 0) { return ret; }
+
        int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
 
        ret = target_shards_mgr.add_entry(shard_index, entry, account,
@@ -603,6 +606,8 @@ int RGWBucketReshard::do_reshard(int num_shards,
          return ret;
        }
 
+  if (ret = f.check("after_target_shard_entry"); ret < 0) { return ret; }
+
        Clock::time_point now = Clock::now();
        if (reshard_lock.should_renew(now)) {
          // assume outer locks have timespans at least the size of ours, so
@@ -642,6 +647,8 @@ int RGWBucketReshard::do_reshard(int num_shards,
     return -EIO;
   }
 
+  if (ret = f.check("before_layout_overwrite"); ret < 0) { return ret; }
+
   //overwrite current_index for the next reshard process
   bucket_info.layout.current_index = *bucket_info.layout.target_index;
   bucket_info.layout.target_index = std::nullopt; // target_layout doesn't need to exist after reshard
@@ -671,10 +678,13 @@ int RGWBucketReshard::update_bucket(rgw::BucketReshardState s, const DoutPrefixP
     return 0;
 }
 
-int RGWBucketReshard::execute(int num_shards, int max_op_entries,
+int RGWBucketReshard::execute(int num_shards,
+                              FaultInjector<std::string_view>& f,
+                              int max_op_entries,
                               const DoutPrefixProvider *dpp,
-                              bool verbose, ostream *out, Formatter *formatter,
-                             RGWReshard* reshard_log)
+                              bool verbose, ostream *out,
+                              Formatter *formatter,
+                              RGWReshard* reshard_log)
 {
   int ret = reshard_lock.lock();
   if (ret < 0) {
@@ -697,9 +707,7 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries,
   // keep a copy of old index layout
   prev_index = bucket_info.layout.current_index;
 
-  ret = do_reshard(num_shards,
-                  max_op_entries,
-                   verbose, out, formatter, dpp);
+  ret = do_reshard(num_shards, max_op_entries, f, verbose, out, formatter, dpp);
   if (ret < 0) {
     goto error_out;
   }
@@ -1022,15 +1030,17 @@ int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvid
        }
 
        {
-    RGWBucketReshard br(store, bucket_info, attrs, nullptr);
-    ret = br.execute(entry.new_num_shards, max_entries, dpp, false, nullptr,
-        nullptr, this);
-    if (ret < 0) {
-      ldout(store->ctx(), 0) <<  __func__ <<
-        ": Error during resharding bucket " << entry.bucket_name << ":" <<
-        cpp_strerror(-ret)<< dendl;
-      return ret;
-    }
+       RGWBucketReshard br(store, bucket_info, attrs, nullptr);
+
+  FaultInjector<std::string_view> f;
+       ret = br.execute(entry.new_num_shards, f, max_entries, dpp,
+                         false, nullptr, nullptr, this);
+       if (ret < 0) {
+         ldout(store->ctx(), 0) <<  __func__ <<
+           ": Error during resharding bucket " << entry.bucket_name << ":" <<
+           cpp_strerror(-ret)<< dendl;
+         return ret;
+       }
 
     ldout(store->ctx(), 20) << __func__ <<
       " removing reshard queue entry for bucket " << entry.bucket_name <<
index c062aca594b117a59b1866b691f7087b44b2e2be..1a46a49865aad2cb056128e053bfcc381ac5fc64 100644 (file)
@@ -21,6 +21,7 @@
 #include "cls/lock/cls_lock_client.h"
 
 #include "rgw_common.h"
+#include "common/fault_injector.h"
 
 
 class RGWReshard;
@@ -87,9 +88,9 @@ private:
 
   int set_target_layout(int new_num_shards, const DoutPrefixProvider *dpp);
   int update_bucket(rgw::BucketReshardState s, const DoutPrefixProvider* dpp);
-  
+
   int do_reshard(int num_shards,
-                int max_entries,
+                 int max_entries, FaultInjector<std::string_view>& f,
                  bool verbose,
                  ostream *os,
                 Formatter *formatter,
@@ -102,8 +103,8 @@ public:
                   const RGWBucketInfo& _bucket_info,
                    const std::map<string, bufferlist>& _bucket_attrs,
                   RGWBucketReshardLock* _outer_reshard_lock);
-  int execute(int num_shards, int max_op_entries,
-              const DoutPrefixProvider *dpp,
+  int execute(int num_shards, FaultInjector<std::string_view>& f,
+              int max_op_entries, const DoutPrefixProvider *dpp,
               bool verbose = false, ostream *out = nullptr,
               Formatter *formatter = nullptr,
              RGWReshard *reshard_log = nullptr);