]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
reshard: limiting the number of log to be recorded
authorliangmingyuan <liangmingyuan@baidu.com>
Fri, 26 Jul 2024 08:23:32 +0000 (16:23 +0800)
committerliangmingyuan <liangmingyuan@baidu.com>
Fri, 26 Jul 2024 14:34:23 +0000 (22:34 +0800)
When the bucket's index shards are already overloaded,  avoid
adding too many extra keys in the reshard log. Limiting the size
of this reshard log to `rgw_reshardlog_threshold`, if an index
write operation during the logrecord stage would exceed that limit,
returning the ERR_BUSY_RESHARDING error early.

Using the reshardlog_entries in `rgw_bucket_dir_header` to do this,
when writting shards, adding the reshardlog_entries. But not need
to add in deleting, because number of index entries reduce meanwhile.

Signed-off-by: Mingyuan Liang <liangmingyuan@baidu.com>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_types.cc
src/cls/rgw/cls_rgw_types.h
src/common/options/rgw.yaml.in
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_reshard.cc
src/test/cli/radosgw-admin/help.t
src/test/cls_rgw/test_cls_rgw.cc

index 9f4fd54a67da184308fc49ff1834c4cf496b4d09..8fc928e07d9121cf3ee7e77ce318e4cc96ba71c0 100644 (file)
@@ -848,7 +848,7 @@ static std::string modify_op_str(uint8_t op) {
 
 static int record_duplicate_entry(cls_method_context_t hctx, string& idx,
                                   const cls_rgw_obj_key& key, bufferlist* log_bl,
-                                  bool resharding) {
+                                  bool resharding, uint32_t* reshardlog_entries = NULL) {
   if (resharding) {
     int rc = reshard_log_index_operation(hctx, idx, key, log_bl);
     if (rc < 0) {
@@ -856,10 +856,20 @@ static int record_duplicate_entry(cls_method_context_t hctx, string& idx,
               escape_str(idx).c_str(), rc);
       return rc;
     }
+    if (reshardlog_entries) {
+      *reshardlog_entries += 1;
+    }
   }
   return 0;
 }
 
+static int write_header_while_logrecord(cls_method_context_t hctx,
+                                        rgw_bucket_dir_header& header) {
+  if (header.resharding_in_logrecord())
+    return write_bucket_header(hctx, &header);
+  return 0;
+}
+
 int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   const ConfigProxy& conf = cls_get_config(hctx);
@@ -1067,7 +1077,8 @@ static int complete_remove_obj(cls_method_context_t hctx,
   unaccount_entry(header, entry);
 
   bufferlist empty;
-  ret = record_duplicate_entry(hctx, idx, key, &empty, header.resharding_in_logrecord());
+  ret = record_duplicate_entry(hctx, idx, key, &empty,
+                               header.resharding_in_logrecord());
   if (ret < 0)
     return ret;
 
@@ -1202,7 +1213,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
                        __func__, escape_str(idx).c_str(), rc);
           return rc;
         }
-        rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, header.resharding_in_logrecord());
+        rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl,
+                                    header.resharding_in_logrecord(),
+                                    &header.reshardlog_entries);
         if (rc < 0)
           return rc;
       }
@@ -1251,7 +1264,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
                     __func__, escape_str(idx).c_str(), rc);
         return rc;
       }
-      rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, header.resharding_in_logrecord());
+      rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl,
+                                  header.resharding_in_logrecord(),
+                                  &header.reshardlog_entries);
       if (rc < 0)
         return rc;
     }
@@ -1286,7 +1301,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
                   __func__, escape_str(idx).c_str(), rc);
       return rc;
     }
-    rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl, header.resharding_in_logrecord());
+    rc = record_duplicate_entry(hctx, idx, entry.key, &new_key_bl,
+                                header.resharding_in_logrecord(),
+                                &header.reshardlog_entries);
     if (rc < 0)
       return rc;
   } // CLS_RGW_OP_ADD
@@ -1336,7 +1353,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
 
 template <class T>
 static int write_entry(cls_method_context_t hctx, T& entry, const string& key,
-                       const bool is_resharding = false)
+                       uint32_t& reshardlog_entries, const bool is_resharding = false)
 {
   bufferlist bl;
   encode(entry, bl);
@@ -1346,6 +1363,7 @@ static int write_entry(cls_method_context_t hctx, T& entry, const string& key,
   }
   if (is_resharding) {
     ret = reshard_log_index_operation(hctx, key, entry.key, &bl);
+    reshardlog_entries++;
   }
   return ret;
 }
@@ -1381,12 +1399,13 @@ static void update_olh_log(rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, co
 }
 
 static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry,
-                                    const string& instance_idx, bool is_resharding)
+                                    const string& instance_idx, uint32_t& reshardlog_entries,
+                                    bool is_resharding)
 {
   CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(),
           instance_idx.c_str(), instance_entry.flags);
   /* write the instance entry */
-  int ret = write_entry(hctx, instance_entry, instance_idx, is_resharding);
+  int ret = write_entry(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding);
   if (ret < 0) {
     CLS_LOG(0, "ERROR: write_entry() instance_key=%s ret=%d", escape_str(instance_idx).c_str(), ret);
     return ret;
@@ -1398,9 +1417,10 @@ static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_en
  * write object instance entry, and if needed also the list entry
  */
 static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry,
-                             const string& instance_idx, bool is_resharding)
+                             const string& instance_idx, uint32_t& reshardlog_entries,
+                             bool is_resharding)
 {
-  int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, is_resharding);
+  int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding);
   if (ret < 0) {
     return ret;
   }
@@ -1410,7 +1430,7 @@ static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& in
   if (instance_idx != instance_list_idx) {
     CLS_LOG(20, "write_entry() idx=%s flags=%d", escape_str(instance_list_idx).c_str(), instance_entry.flags);
     /* write a new list entry for the object instance */
-    ret = write_entry(hctx, instance_entry, instance_list_idx, is_resharding);
+    ret = write_entry(hctx, instance_entry, instance_list_idx, reshardlog_entries, is_resharding);
     if (ret < 0) {
       CLS_LOG(0, "ERROR: write_entry() instance=%s instance_list_idx=%s ret=%d", instance_entry.key.instance.c_str(), instance_list_idx.c_str(), ret);
       return ret;
@@ -1498,7 +1518,8 @@ public:
     return 0;
   }
 
-  int write_entries(uint64_t flags_set, uint64_t flags_reset, bool is_resharding) {
+  int write_entries(uint64_t flags_set, uint64_t flags_reset,
+                    uint32_t& reshardlog_entries, bool is_resharding) {
     if (!initialized) {
       int ret = init();
       if (ret < 0) {
@@ -1511,7 +1532,7 @@ public:
     /* write the instance and list entries */
     bool special_delete_marker_key = (instance_entry.is_delete_marker() && instance_entry.key.instance.empty());
     encode_obj_versioned_data_key(key, &instance_idx, special_delete_marker_key);
-    int ret = write_obj_entries(hctx, instance_entry, instance_idx, is_resharding);
+    int ret = write_obj_entries(hctx, instance_entry, instance_idx, reshardlog_entries, is_resharding);
     if (ret < 0) {
       CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
       return ret;
@@ -1520,7 +1541,7 @@ public:
     return 0;
   }
 
-  int write(uint64_t epoch, bool current, bool is_resharding) {
+  int write(uint64_t epoch, bool current, uint32_t& reshardlog_entries, bool is_resharding) {
     if (instance_entry.versioned_epoch > 0) {
       CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch);
       /* this instance has a previous list entry, remove that entry */
@@ -1536,11 +1557,11 @@ public:
     }
 
     instance_entry.versioned_epoch = epoch;
-    return write_entries(flags, 0, is_resharding);
+    return write_entries(flags, 0, reshardlog_entries, is_resharding);
   }
 
-  int demote_current(bool is_resharding) {
-    return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, is_resharding);
+  int demote_current(uint32_t& reshardlog_entries, bool is_resharding) {
+    return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, reshardlog_entries, is_resharding);
   }
 
   bool is_delete_marker() {
@@ -1642,9 +1663,9 @@ public:
     olh_data_entry.key = key;
   }
 
-  int write(bool is_resharding) {
+  int write(uint32_t& reshardlog_entries, bool is_resharding) {
     /* write the olh data entry */
-    int ret = write_entry(hctx, olh_data_entry, olh_data_idx, is_resharding);
+    int ret = write_entry(hctx, olh_data_entry, olh_data_idx, reshardlog_entries, is_resharding);
     if (ret < 0) {
       CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_idx.c_str(), ret);
       return ret;
@@ -1679,12 +1700,12 @@ public:
 };
 
 static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key,
-                                bool is_resharding)
+                                uint32_t& reshardlog_entries, bool is_resharding)
 {
   rgw_bucket_dir_entry entry;
   entry.key = key;
   entry.flags = rgw_bucket_dir_entry::FLAG_VER_MARKER;
-  int ret = write_entry(hctx, entry, key.name, is_resharding);
+  int ret = write_entry(hctx, entry, key.name, reshardlog_entries, is_resharding);
   if (ret < 0) {
     CLS_LOG(0, "ERROR: write_entry returned ret=%d", ret);
     return ret;
@@ -1702,6 +1723,7 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx,
                                             cls_rgw_obj_key& key,
                                             bool demote_current,
                                             bool instance_only,
+                                            uint32_t& reshardlog_entries,
                                             bool is_resharding)
 {
   if (!key.instance.empty()) {
@@ -1729,9 +1751,9 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx,
     encode_obj_versioned_data_key(key, &new_idx);
 
     if (instance_only) {
-      ret = write_obj_instance_entry(hctx, entry, new_idx, is_resharding);
+      ret = write_obj_instance_entry(hctx, entry, new_idx, reshardlog_entries, is_resharding);
     } else {
-      ret = write_obj_entries(hctx, entry, new_idx, is_resharding);
+      ret = write_obj_entries(hctx, entry, new_idx, reshardlog_entries, is_resharding);
     }
     if (ret < 0) {
       CLS_LOG(0, "ERROR: write_obj_entries new_idx=%s returned %d",
@@ -1740,7 +1762,7 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx,
     }
   }
 
-  ret = write_version_marker(hctx, key, is_resharding);
+  ret = write_version_marker(hctx, key, reshardlog_entries, is_resharding);
   if (ret < 0) {
     return ret;
   }
@@ -1894,14 +1916,14 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
   const uint64_t prev_epoch = olh.get_epoch();
 
   if (!olh.start_modify(op.olh_epoch)) {
-    ret = obj.write(op.olh_epoch, false, header.resharding_in_logrecord());
+    ret = obj.write(op.olh_epoch, false, header.reshardlog_entries, header.resharding_in_logrecord());
     if (ret < 0) {
       return ret;
     }
     if (removing) {
       olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
     }
-    return 0;
+    return write_header_while_logrecord(hctx, header);
   }
 
   // promote this version to current if it's a newer epoch, or if it matches the
@@ -1926,7 +1948,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
       if (!(olh_entry.key == op.key)) {
         BIVerObjEntry old_obj(hctx, olh_entry.key);
 
-        ret = old_obj.demote_current(header.resharding_in_logrecord());
+        ret = old_obj.demote_current(header.reshardlog_entries, header.resharding_in_logrecord());
         if (ret < 0) {
           CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
           return ret;
@@ -1938,6 +1960,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
     bool instance_only = (op.key.instance.empty() && op.delete_marker);
     cls_rgw_obj_key key(op.key.name);
     ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only,
+                                           header.reshardlog_entries,
                                            header.resharding_in_logrecord());
     if (ret < 0) {
       CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
@@ -1960,24 +1983,25 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
   }
   olh.set_exists(true);
 
-  ret = olh.write(header.resharding_in_logrecord());
+  ret = olh.write(header.reshardlog_entries, header.resharding_in_logrecord());
   if (ret < 0) {
     CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
     return ret;
   }
 
   /* write the instance and list entries */
-  ret = obj.write(olh.get_epoch(), promote, header.resharding_in_logrecord());
+  ret = obj.write(olh.get_epoch(), promote, header.reshardlog_entries,
+                  header.resharding_in_logrecord());
   if (ret < 0) {
     return ret;
   }
 
   if (!op.log_op) {
-   return 0;
+    return write_header_while_logrecord(hctx, header);
   }
 
   if (header.syncstopped) {
-    return 0;
+    return write_header_while_logrecord(hctx, header);
   }
 
   rgw_bucket_dir_entry& entry = obj.get_dir_entry();
@@ -2051,6 +2075,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
     bool instance_only = false;
     cls_rgw_obj_key key(dest_key.name);
     ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only,
+                                           header.reshardlog_entries,
                                            header.resharding_in_logrecord());
     if (ret < 0) {
       CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
@@ -2073,7 +2098,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
     }
 
     olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
-    return olh.write(header.resharding_in_logrecord());
+    return olh.write(header.reshardlog_entries, header.resharding_in_logrecord());
   }
 
   rgw_bucket_olh_entry& olh_entry = olh.get_entry();
@@ -2093,7 +2118,8 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
 
     if (found) {
       BIVerObjEntry next(hctx, next_key);
-      ret = next.write(olh.get_epoch(), true, header.resharding_in_logrecord());
+      ret = next.write(olh.get_epoch(), true, header.reshardlog_entries,
+                       header.resharding_in_logrecord());
       if (ret < 0) {
         CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
         return ret;
@@ -2131,17 +2157,17 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
     return ret;
   }
 
-  ret = olh.write(header.resharding_in_logrecord());
+  ret = olh.write(header.reshardlog_entries, header.resharding_in_logrecord());
   if (ret < 0) {
     return ret;
   }
 
   if (!op.log_op) {
-    return 0;
+    return write_header_while_logrecord(hctx, header);
   }
 
   if (header.syncstopped) {
-    return 0;
+    return write_header_while_logrecord(hctx, header);
   }
 
   rgw_bucket_entry_ver ver;
@@ -2263,7 +2289,9 @@ static int rgw_bucket_trim_olh_log(cls_method_context_t hctx, bufferlist *in, bu
   }
 
   /* write the olh data entry */
-  ret = write_entry(hctx, olh_data_entry, olh_data_key, header.resharding_in_logrecord());
+  ret = write_entry(hctx, olh_data_entry, olh_data_key,
+                    header.reshardlog_entries,
+                    header.resharding_in_logrecord());
   if (ret < 0) {
     CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_key.c_str(), ret);
     return ret;
@@ -3836,7 +3864,14 @@ static int rgw_reshard_log_trim_op(cls_method_context_t hctx, bufferlist *in, bu
   std::set<std::string> keys;
   bool more = false;
 
-  int rc = cls_cxx_map_get_keys(hctx, key_begin, max_entries, &keys, &more);
+  rgw_bucket_dir_header header;
+  int rc = read_bucket_header(hctx, &header);
+  if (rc < 0) {
+    CLS_LOG(0, "ERROR: rgw_reshard_log_trim_op(): failed to read header\n");
+    return rc;
+  }
+
+  rc = cls_cxx_map_get_keys(hctx, key_begin, max_entries, &keys, &more);
   if (rc < 0) {
     CLS_LOG(1, "ERROR: cls_cxx_map_get_keys failed rc=%d", rc);
     return rc;
@@ -3861,6 +3896,12 @@ static int rgw_reshard_log_trim_op(cls_method_context_t hctx, bufferlist *in, bu
     CLS_LOG(1, "ERROR: cls_cxx_map_remove_range failed rc=%d", rc);
     return rc;
   }
+
+  header.reshardlog_entries = 0;
+  rc = write_bucket_header(hctx, &header);  if (rc < 0) {
+    CLS_LOG(0, "ERROR: rgw_reshard_log_trim_op(): failed to write header\n");
+    return rc;
+  }
   return 0;
 }
 
@@ -4911,6 +4952,37 @@ static int rgw_set_bucket_resharding(cls_method_context_t hctx, bufferlist *in,
   return write_bucket_header(hctx, &header);
 }
 
+static int rgw_set_bucket_resharding2(cls_method_context_t hctx, bufferlist *in,  bufferlist *out)
+{
+  CLS_LOG(10, "entered %s", __func__);
+  cls_rgw_set_bucket_resharding_op op;
+
+  auto in_iter = in->cbegin();
+  try {
+    decode(op, in_iter);
+  } catch (ceph::buffer::error& err) {
+    CLS_LOG(1, "ERROR: cls_rgw_set_bucket_resharding: failed to decode entry\n");
+    return -EINVAL;
+  }
+
+  rgw_bucket_dir_header header;
+  int rc = read_bucket_header(hctx, &header);
+  if (rc < 0) {
+    CLS_LOG(1, "ERROR: %s: failed to read header", __func__);
+    return rc;
+  }
+
+  if (op.entry.reshard_status == cls_rgw_reshard_status::IN_LOGRECORD) {
+    if (header.reshardlog_entries != 0) {
+      CLS_LOG(1, "ERROR: %s: cannot set logrecord status on non-zero log record count", __func__);
+      return -EOPNOTSUPP;
+    }
+  }
+  header.new_instance.set_status(op.entry.reshard_status);
+
+  return write_bucket_header(hctx, &header);
+}
+
 static int rgw_clear_bucket_resharding(cls_method_context_t hctx, bufferlist *in,  bufferlist *out)
 {
   CLS_LOG(10, "entered %s", __func__);
@@ -4938,6 +5010,10 @@ static int rgw_clear_bucket_resharding(cls_method_context_t hctx, bufferlist *in
 static int rgw_guard_bucket_resharding(cls_method_context_t hctx, bufferlist *in,  bufferlist *out)
 {
   CLS_LOG(10, "entered %s", __func__);
+
+  const ConfigProxy& conf = cls_get_config(hctx);
+  const uint32_t reshardlog_threshold = conf->rgw_reshardlog_threshold;
+
   cls_rgw_guard_bucket_resharding_op op;
 
   auto in_iter = in->cbegin();
@@ -4955,7 +5031,8 @@ static int rgw_guard_bucket_resharding(cls_method_context_t hctx, bufferlist *in
     return rc;
   }
 
-  if (header.resharding_in_progress()) {
+  if (header.resharding_in_progress() ||
+      (header.resharding_in_logrecord() && header.reshardlog_entries >= reshardlog_threshold)) {
     return op.ret_err;
   }
 
@@ -5118,7 +5195,7 @@ CLS_INIT(rgw)
   cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR,
                          rgw_set_bucket_resharding, &h_rgw_set_bucket_resharding);
   cls_register_cxx_method(h_class, RGW_SET_BUCKET_RESHARDING2, CLS_METHOD_RD | CLS_METHOD_WR,
-                         rgw_set_bucket_resharding, &h_rgw_set_bucket_resharding);
+                         rgw_set_bucket_resharding2, &h_rgw_set_bucket_resharding);
   cls_register_cxx_method(h_class, RGW_CLEAR_BUCKET_RESHARDING, CLS_METHOD_RD | CLS_METHOD_WR,
                          rgw_clear_bucket_resharding, &h_rgw_clear_bucket_resharding);
   cls_register_cxx_method(h_class, RGW_GUARD_BUCKET_RESHARDING, CLS_METHOD_RD ,
index ad8e7f16e2fb51b985df4d77f48cdc62ed40bcdd..8b125640e86afe233f8eff74d2a97667abd4be60 100644 (file)
@@ -697,6 +697,7 @@ void rgw_bucket_dir_header::dump(Formatter *f) const
   }
   f->close_section();
   ::encode_json("new_instance", new_instance, f);
+  f->dump_int("reshardlog_entries", reshardlog_entries);
 }
 
 void rgw_bucket_dir::generate_test_instances(list<rgw_bucket_dir*>& o)
index f3ef5ec6aec753c1cc93c48c79ebde02f2ae72f6..8c90bee4c50433b62fe6185081b680042923d4c7 100644 (file)
@@ -828,11 +828,13 @@ struct rgw_bucket_dir_header {
   std::string max_marker;
   cls_rgw_bucket_instance_entry new_instance;
   bool syncstopped;
+  uint32_t reshardlog_entries;
 
-  rgw_bucket_dir_header() : tag_timeout(0), ver(0), master_ver(0), syncstopped(false) {}
+  rgw_bucket_dir_header() : tag_timeout(0), ver(0), master_ver(0), syncstopped(false),
+                            reshardlog_entries(0) {}
 
   void encode(ceph::buffer::list &bl) const {
-    ENCODE_START(7, 2, bl);
+    ENCODE_START(8, 2, bl);
     encode(stats, bl);
     encode(tag_timeout, bl);
     encode(ver, bl);
@@ -840,10 +842,11 @@ struct rgw_bucket_dir_header {
     encode(max_marker, bl);
     encode(new_instance, bl);
     encode(syncstopped,bl);
+    encode(reshardlog_entries, bl);
     ENCODE_FINISH(bl);
   }
   void decode(ceph::buffer::list::const_iterator &bl) {
-    DECODE_START_LEGACY_COMPAT_LEN(6, 2, 2, bl);
+    DECODE_START_LEGACY_COMPAT_LEN(8, 2, 2, bl);
     decode(stats, bl);
     if (struct_v > 2) {
       decode(tag_timeout, bl);
@@ -867,6 +870,11 @@ struct rgw_bucket_dir_header {
     if (struct_v >= 7) {
       decode(syncstopped,bl);
     }
+    if (struct_v >= 8) {
+      decode(reshardlog_entries, bl);
+    } else {
+      reshardlog_entries = 0;
+    }
     DECODE_FINISH(bl);
   }
   void dump(ceph::Formatter *f) const;
index d23ad6cfe1ca58134e6156bb8f1af58cfea792d6..f62b4b45c673d858c5e862aad2b40aa952f001bd 100644 (file)
@@ -2730,6 +2730,17 @@ options:
   - rgw
   see_also:
   - rgw_reshard_progress_judge_interval
+- name: rgw_reshardlog_threshold
+  type: uint
+  level: dev
+  desc: threshold for a shard to record log before blocking writes
+  default: 30000
+  with_legacy: true
+  services:
+  - rgw
+  - osd
+  see_also:
+  - rgw_reshard_progress_judge_interval
 - name: rgw_debug_inject_set_olh_err
   type: uint
   level: dev
index aa5e646ed518c3dbedb151e496dd3093b9c5403d..829e06a3b2c0b4e78ff8720c5f90a4d8e6baf1b9 100644 (file)
@@ -7801,7 +7801,7 @@ int RGWRados::block_while_resharding(RGWRados::BucketShard *bs,
       return ret;
     }
 
-    if (!entry.resharding_in_progress()) {
+    if (!entry.resharding()) {
       ret = fetch_new_bucket_info("get_bucket_resharding_succeeded");
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "ERROR: " << __func__ <<
index 84c76f2d52d24a6e938c0e95e17c68ed336502ea..18aa48ad57fdb459fef7daf40313fc561d6415b0 100644 (file)
@@ -1357,6 +1357,7 @@ int RGWBucketReshard::execute(int num_shards,
     return ret;
   }
 
+  auto current_num_shards = rgw::num_shards(bucket_info.layout.current_index);
   ret = commit_reshard(store, bucket_info, bucket_attrs, fault, dpp, y);
   if (ret < 0) {
     return ret;
@@ -1364,7 +1365,7 @@ int RGWBucketReshard::execute(int num_shards,
 
   ldpp_dout(dpp, 1) << __func__ << " INFO: reshard of bucket \"" <<
     bucket_info.bucket.name << "\" from " <<
-    rgw::num_shards(bucket_info.layout.current_index) << " shards to " << num_shards <<
+    current_num_shards << " shards to " << num_shards <<
     " shards completed successfully" << dendl;
 
   return 0;
index 46bfcdef325cdc7fcee7858a9c4282402af31880..7c3e3f1f0a3b2a2bc167c3638ffc839de42925e8 100644 (file)
     reshard cancel                   cancel resharding a bucket
     reshard stale-instances list     list stale-instances from bucket resharding
     reshard stale-instances delete   cleanup stale-instances from bucket resharding
-    reshardlog list                  list bucket reshard newest generation log
-    reshardlog purge                 trim all bucket resharding log
+    reshardlog list                  list bucket resharding log
+    reshardlog purge                 trim bucket resharding log
     sync error list                  list sync error
     sync error trim                  trim sync error
     mfa create                       create a new MFA TOTP token
index 7963231f99c4be33a7522d95331c3af5be03c2fd..e0c8eeb13f6c8cd14f9754d9b0c7d4dbb16a7330 100644 (file)
@@ -1418,3 +1418,55 @@ TEST_F(cls_rgw, reshardlog_list)
   ASSERT_FALSE(is_truncated);
   ASSERT_EQ(2u, entries.size());
 }
+
+void reshardlog_entries(librados::IoCtx& ioctx, const std::string& oid, uint32_t num_entries)
+{
+  map<int, struct rgw_cls_list_ret> results;
+  map<int, string> oids;
+  oids[0] = oid;
+  ASSERT_EQ(0, CLSRGWIssueGetDirHeader(ioctx, oids, results, 8)());
+
+  uint32_t entries = 0;
+  map<int, struct rgw_cls_list_ret>::iterator iter = results.begin();
+  for (; iter != results.end(); ++iter) {
+    entries += (iter->second).dir.header.reshardlog_entries;
+  }
+  ASSERT_EQ(entries, num_entries);
+}
+
+TEST_F(cls_rgw, reshardlog_num)
+{
+  string bucket_oid = str_int("reshard2", 0);
+
+  ObjectWriteOperation op;
+  cls_rgw_bucket_init_index(op);
+  ASSERT_EQ(0, ioctx.operate(bucket_oid, &op));
+
+  cls_rgw_obj_key obj1 = str_int("obj1", 0);
+  string tag = str_int("tag-prepare", 0);
+  string loc = str_int("loc", 0);
+  index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj1, loc);
+  rgw_bucket_dir_entry_meta meta;
+  index_complete(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, 1, obj1, meta);
+
+  // do not record logs
+  reshardlog_entries(ioctx, bucket_oid, 0u);
+
+  // set reshard status to IN_LOGRECORD
+  cls_rgw_bucket_instance_entry entry;
+  entry.reshard_status = cls_rgw_reshard_status::IN_LOGRECORD;
+  set_reshard_status(ioctx, bucket_oid, entry);
+
+  // record a log in prepare not add reshardlog_entry
+  cls_rgw_obj_key obj2 = str_int("obj2", 0);
+  index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj2, loc);
+  reshardlog_entries(ioctx, bucket_oid, 0u);
+  // record a log in complete add reshardlog_entry
+  index_complete(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, 1, obj2, meta);
+  reshardlog_entries(ioctx, bucket_oid, 1u);
+
+  // record a log in deleting obj not add reshardlog_entry
+  index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, obj1, loc);
+  index_complete(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag, 1, obj1, meta);
+  reshardlog_entries(ioctx, bucket_oid, 1u);
+}