]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/reshard: copy the index entries to dest shards.
authorliangmingyuan <liangmingyuan@baidu.com>
Sun, 31 Mar 2024 12:11:02 +0000 (20:11 +0800)
committerliangmingyuan <liangmingyuan@baidu.com>
Sun, 21 Jul 2024 09:52:33 +0000 (17:52 +0800)
In logrecord state, copy inventoried index entries to dest shards
and record a log for new writting entry. In progress state, block
the writes, listing the logs written in logrecord state, then gain
corresponding index entries and copy them to dest shards.

Signed-off-by: Mingyuan Liang <liangmingyuan@baidu.com>
15 files changed:
qa/workunits/rgw/test_rgw_reshard.py
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/cls/rgw/cls_rgw_ops.h
src/cls/rgw/cls_rgw_types.cc
src/rgw/driver/rados/rgw_bucket.cc
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_reshard.cc
src/rgw/driver/rados/rgw_reshard.h
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket_layout.cc
src/rgw/rgw_bucket_layout.h
src/test/cls_rgw/test_cls_rgw.cc

index 6326e7b173cf4ba429bc2c412ff61f760f350e5e..dd7f601998e3e3f8f786d93123d798cb47d8a52b 100755 (executable)
@@ -223,6 +223,18 @@ def main():
     log.debug('TEST: reshard bucket with abort at do_reshard\n')
     test_bucket_reshard(connection, 'abort-at-do-reshard', abort_at='do_reshard')
 
+    log.debug('TEST: reshard bucket with EIO injected at logrecord_writes\n')
+    test_bucket_reshard(connection, 'error-at-logrecord-writes', error_at='logrecord_writes')
+    log.debug('TEST: reshard bucket with abort at logrecord_writes\n')
+    test_bucket_reshard(connection, 'abort-at-logrecord-writes', abort_at='logrecord_writes')
+
+    log.debug('TEST: reshard bucket with EIO injected at change_reshard_state\n')
+    test_bucket_reshard(connection, 'error-at-change-reshard-state', error_at='change_reshard_state')
+    log.debug('TEST: reshard bucket with ECANCELED injected at change_reshard_state\n')
+    test_bucket_reshard(connection, 'error-at-change-reshard-state', error_at='change_reshard_state', error_code=errno.ECANCELED)
+    log.debug('TEST: reshard bucket with abort at change_reshard_state\n')
+    test_bucket_reshard(connection, 'abort-at-change-reshard-state', abort_at='change_reshard_state')
+
     # TESTCASE 'versioning reshard-','bucket', reshard','versioning reshard','succeeds'
     log.debug(' test: reshard versioned bucket')
     num_shards_expected = get_bucket_stats(VER_BUCKET_NAME).num_shards + 1
index aa5ba9ed7dbfd65060d1f940826901abca4b1d43..7bd60bf20c854d7ea7b89d35f9ed265c4a91425e 100644 (file)
@@ -118,6 +118,20 @@ static bool bi_entry_gt(const string& first, const string& second)
   return first > second;
 }
 
+/**
+ * return: Plain, Instance, OLH or Invalid
+ */
+BIIndexType bi_type(const string& s, const string& prefix)
+{
+  int ret = bi_entry_type(s.substr(prefix.size()));
+  if (ret < 0) {
+    return BIIndexType::Invalid;
+  } else if (ret == 0) {
+    return BIIndexType::Plain;
+  }
+  return (BIIndexType)ret;
+}
+
 static void get_time_key(real_time& ut, string *key)
 {
   char buf[32];
@@ -2822,15 +2836,22 @@ static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist *
 
   rgw_cls_bi_entry& entry = op.entry;
 
-  int r = cls_cxx_map_set_val(hctx, entry.idx, &entry.data);
-  if (r < 0) {
-    CLS_LOG(0, "ERROR: %s: cls_cxx_map_set_val() returned r=%d", __func__, r);
+  if (entry.type == BIIndexType::ReshardDeleted) {
+    int r = cls_cxx_map_remove_key(hctx, entry.idx);
+    if (r < 0) {
+      CLS_LOG(0, "ERROR: %s: cls_cxx_map_remove_key() returned r=%d", __func__, r);
+      return r;
+    }
+  } else {
+    int r = cls_cxx_map_set_val(hctx, entry.idx, &entry.data);
+    if (r < 0) {
+      CLS_LOG(0, "ERROR: %s: cls_cxx_map_set_val() returned r=%d", __func__, r);
+    }
   }
 
   return 0;
 }
 
-
 /* The plain entries in the bucket index are divided into two regions
  * divided by the special entries that begin with 0x80. Those below
  * ("Low") are ascii entries. Those above ("High") bring in unicode
@@ -3176,6 +3197,57 @@ static int list_olh_entries(cls_method_context_t hctx,
   return count;
 }
 
+static int reshard_log_list_entries(cls_method_context_t hctx, const string& marker,
+                                    uint32_t max, list<rgw_cls_bi_entry>& entries, bool *truncated)
+{
+  string start_key, end_key;
+  start_key = BI_PREFIX_CHAR;
+  start_key.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX]);
+
+  string bi_type_marker = start_key;
+
+  end_key = BI_PREFIX_CHAR;
+  end_key.append(bucket_index_prefixes[BI_BUCKET_RESHARD_LOG_INDEX + 1]);
+
+  if (!marker.empty()) {
+    start_key.append(marker);
+  }
+
+  map<string, bufferlist> keys;
+  int ret = cls_cxx_map_get_vals(hctx, start_key, string(), max, &keys, truncated);
+  CLS_LOG(20, "%s(): start_key=%s keys.size()=%d", __func__, escape_str(start_key).c_str(), (int)keys.size());
+  if (ret < 0) {
+    return ret;
+  }
+
+  map<string, bufferlist>::iterator iter;
+  for (iter = keys.begin(); iter != keys.end(); ++iter) {
+    if (iter->first.compare(end_key) >= 0) {
+      if (truncated) {
+        *truncated = false;
+      }
+      return 0;
+    }
+
+    rgw_cls_bi_entry entry;
+    auto biter = iter->second.cbegin();
+    try {
+      decode(entry, biter);
+    } catch (ceph::buffer::error& err) {
+      CLS_LOG(0, "ERROR: %s: failed to decode buffer for rgw_cls_bi_entry \"%s\"",
+             __func__, escape_str(iter->first).c_str());
+      return -EIO;
+    }
+    if (entry.type != BIIndexType::ReshardDeleted)
+      entry.type = bi_type(iter->first, bi_type_marker);
+
+    CLS_LOG(20, "reshard_log_list_entries key=%s bl.length=%d\n", entry.idx.c_str(), (int)iter->second.length());
+
+    entries.push_back(entry);
+  }
+  return 0;
+}
+
 static int check_index(cls_method_context_t hctx,
                       rgw_bucket_dir_header *existing_header,
                       rgw_bucket_dir_header *calc_header)
@@ -3285,7 +3357,8 @@ int rgw_bucket_check_index(cls_method_context_t hctx, bufferlist *in, bufferlist
 }
 
 
-/* Lists all the entries that appear in a bucket index listing.
+/* Lists all the entries that appear in a bucket index listing,
+ * or list all the entries in reshardlog namespace.
  *
  * It may not be obvious why this function calls three other "segment"
  * functions (list_plain_entries (twice), list_instance_entries,
@@ -3324,15 +3397,24 @@ static int rgw_bi_list_op(cls_method_context_t hctx,
   constexpr uint32_t MAX_BI_LIST_ENTRIES = 1000;
   const uint32_t max = std::min(op.max, MAX_BI_LIST_ENTRIES);
 
-  CLS_LOG(20, "%s: op.marker=\"%s\", op.name_filter=\"%s\", op.max=%u max=%u",
+  CLS_LOG(20, "%s: op.marker=\"%s\", op.name_filter=\"%s\", op.max=%u max=%u, op.reshardlog=%d",
          __func__, escape_str(op.marker).c_str(), escape_str(op.name_filter).c_str(),
-         op.max, max);
+         op.max, max, op.reshardlog);
 
   int ret;
   uint32_t count = 0;
   bool more = false;
   rgw_cls_bi_list_ret op_ret;
 
+  if (op.reshardlog) {
+    ret = reshard_log_list_entries(hctx, op.marker, op.max, op_ret.entries, &op_ret.is_truncated);
+    if (ret < 0)
+      return ret;
+    CLS_LOG(20, "%s: returning %lu entries, is_truncated=%d", __func__, op_ret.entries.size(), op_ret.is_truncated);
+    encode(op_ret, *out);
+    return 0;
+  }
+
   ret = list_plain_entries(hctx, op.name_filter, op.marker, max,
                           &op_ret.entries, &more, PlainEntriesRegion::Low);
   if (ret < 0) {
@@ -3662,7 +3744,6 @@ static int rgw_bi_log_stop(cls_method_context_t hctx, bufferlist *in, bufferlist
   return write_bucket_header(hctx, &header);
 }
 
-
 static void usage_record_prefix_by_time(uint64_t epoch, string& key)
 {
   char buf[32];
@@ -4754,7 +4835,7 @@ static int rgw_guard_bucket_resharding(cls_method_context_t hctx, bufferlist *in
     return rc;
   }
 
-  if (header.resharding()) {
+  if (header.resharding_in_progress()) {
     return op.ret_err;
   }
 
index e65dedf14e42d45c1ab1b0720051a29f0cbc19ac..bfdac5259aa6e6fa3a1ae46c3eac51d44bd908d1 100644 (file)
@@ -443,6 +443,7 @@ int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
   return 0;
 }
 
+
 int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry)
 {
   bufferlist in, out;
@@ -470,13 +471,14 @@ void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, const rgw_cls_bi
  */
 int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
                    const std::string& name_filter, const std::string& marker, uint32_t max,
-                   std::list<rgw_cls_bi_entry> *entries, bool *is_truncated)
+                   std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog)
 {
   bufferlist in, out;
   rgw_cls_bi_list_op call;
   call.name_filter = name_filter;
   call.marker = marker;
   call.max = max;
+  call.reshardlog = reshardlog;
   encode(call, in);
   int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_LIST, in, out);
   if (r < 0)
index 365a51fb5d5997c30bfdd2a3bd4d018bef032d1f..6f39858b10ba422d7b40394214a6511565f12a53 100644 (file)
@@ -372,8 +372,7 @@ int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls
 void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry);
 int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
                    const std::string& name, const std::string& marker, uint32_t max,
-                   std::list<rgw_cls_bi_entry> *entries, bool *is_truncated);
-
+                   std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog = false);
 
 void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op,
                             const cls_rgw_obj_key& key, const ceph::buffer::list& olh_tag,
index b824c73d3d0115c00b0efcd70871bcf4192ed0b1..dd8a7779aeddbec788ed172c4c7e3a50d9183e42 100644 (file)
@@ -760,22 +760,27 @@ struct rgw_cls_bi_list_op {
   uint32_t max;
   std::string name_filter; // limit result to one object and its instances
   std::string marker;
+  bool reshardlog;
 
-  rgw_cls_bi_list_op() : max(0) {}
+  rgw_cls_bi_list_op() : max(0), reshardlog(false) {}
 
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(max, bl);
     encode(name_filter, bl);
     encode(marker, bl);
+    encode(reshardlog, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(ceph::buffer::list::const_iterator& bl) {
-    DECODE_START(1, bl);
+    DECODE_START(2, bl);
     decode(max, bl);
     decode(name_filter, bl);
     decode(marker, bl);
+    if (struct_v >= 2) {
+      decode(reshardlog, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -783,6 +788,7 @@ struct rgw_cls_bi_list_op {
     f->dump_unsigned("max", max);
     f->dump_string("name_filter", name_filter);
     f->dump_string("marker", marker);
+    f->dump_bool("reshardlog", reshardlog);
   }
 
   static void generate_test_instances(std::list<rgw_cls_bi_list_op*>& o) {
@@ -791,6 +797,7 @@ struct rgw_cls_bi_list_op {
     o.back()->max = 100;
     o.back()->name_filter = "name_filter";
     o.back()->marker = "marker";
+    o.back()->reshardlog = true;
   }
 };
 WRITE_CLASS_ENCODER(rgw_cls_bi_list_op)
index b58d37691065fa0d8d1abe882de563dec12ef4ee..ad8e7f16e2fb51b985df4d77f48cdc62ed40bcdd 100644 (file)
@@ -955,6 +955,9 @@ std::ostream& operator<<(std::ostream& out, cls_rgw_reshard_status status) {
   case cls_rgw_reshard_status::NOT_RESHARDING:
     out << "NOT_RESHARDING";
     break;
+  case cls_rgw_reshard_status::IN_LOGRECORD:
+    out << "IN_LOGRECORD";
+    break;
   case cls_rgw_reshard_status::IN_PROGRESS:
     out << "IN_PROGRESS";
     break;
index 572c24162e778a6bff32fd379c4eac8027222de6..f002dd78daa1770b8dbb4e2d688e38b74e444698 100644 (file)
@@ -369,7 +369,7 @@ static int check_bad_index_multipart(rgw::sal::RadosStore* const rados_store,
   do {
     entries_read.clear();
     ret = store->bi_list(bs, "", marker, -1,
-                        &entries_read, &is_truncated, y);
+                        &entries_read, &is_truncated, false, y);
     if (ret < 0) {
       ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) <<
        dendl;
@@ -630,7 +630,7 @@ static int check_index_olh(rgw::sal::RadosStore* const rados_store,
   *count_out = 0;
   do {
     entries.clear();
-    ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, y);
+    ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, false, y);
     if (ret < 0) {
       ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
       break;
@@ -857,7 +857,7 @@ static int check_index_unlinked(rgw::sal::RadosStore* const rados_store,
   *count_out = 0;
   do {
     entries.clear();
-    ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, y);
+    ret = store->bi_list(bs, "", marker, -1, &entries, &is_truncated, false, y);
     if (ret < 0) {
       ldpp_dout(dpp, -1) << "ERROR bi_list(): " << cpp_strerror(-ret) << dendl;
       break;
index 2b408dbb369bc5bdbe644a466e76c511b6f653dc..4939438a68b4fc64994f2a97d98c56238021f830 100644 (file)
@@ -9199,7 +9199,8 @@ int RGWRados::bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj&
 
 int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket,
                      const string& obj_name_filter, const string& marker, uint32_t max,
-                     list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y)
+                     list<rgw_cls_bi_entry> *entries, bool *is_truncated,
+                     bool reshardlog, optional_yield y)
 {
   rgw_obj obj(bucket, obj_name_filter);
   BucketShard bs(this);
@@ -9210,7 +9211,7 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket,
   }
 
   auto& ref = bs.bucket_obj;
-  ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated);
+  ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated, reshardlog);
   if (ret == -ENOENT) {
     *is_truncated = false;
   }
@@ -9221,10 +9222,10 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket,
 }
 
 int RGWRados::bi_list(BucketShard& bs, const string& obj_name_filter, const string& marker, uint32_t max,
-                     list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y)
+                     list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y)
 {
   auto& ref = bs.bucket_obj;
-  int ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated);
+  int ret = cls_rgw_bi_list(ref.ioctx, ref.obj.oid, obj_name_filter, marker, max, entries, is_truncated, reshardlog);
   if (ret < 0)
     return ret;
 
@@ -9233,7 +9234,7 @@ int RGWRados::bi_list(BucketShard& bs, const string& obj_name_filter, const stri
 
 int RGWRados::bi_list(const DoutPrefixProvider *dpp,
                      const RGWBucketInfo& bucket_info, int shard_id, const string& obj_name_filter, const string& marker, uint32_t max,
-                     list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y)
+                     list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y)
 {
   BucketShard bs(this);
   int ret = bs.init(dpp, bucket_info,
@@ -9244,7 +9245,7 @@ int RGWRados::bi_list(const DoutPrefixProvider *dpp,
     return ret;
   }
 
-  return bi_list(bs, obj_name_filter, marker, max, entries, is_truncated, y);
+  return bi_list(bs, obj_name_filter, marker, max, entries, is_truncated, reshardlog, y);
 }
 
 int RGWRados::bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs)
index 278d1182b4dd655dd594b989ea936198f237d97e..7d6299d118dedc9bcfc0d0f2926fc0e95ed865d2 100644 (file)
@@ -1525,10 +1525,11 @@ public:
              const std::string& marker,
              uint32_t max,
              std::list<rgw_cls_bi_entry> *entries,
-             bool *is_truncated, optional_yield y);
-  int bi_list(BucketShard& bs, const std::string& filter_obj, const std::string& marker, uint32_t max, std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y);
+             bool *is_truncated, bool reshardlog, optional_yield y);
+  int bi_list(BucketShard& bs, const std::string& filter_obj, const std::string& marker, uint32_t max, std::list<rgw_cls_bi_entry> *entries,
+              bool *is_truncated, bool reshardlog, optional_yield y);
   int bi_list(const DoutPrefixProvider *dpp, rgw_bucket& bucket, const std::string& obj_name, const std::string& marker, uint32_t max,
-              std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, optional_yield y);
+              std::list<rgw_cls_bi_entry> *entries, bool *is_truncated, bool reshardlog, optional_yield y);
   int bi_remove(const DoutPrefixProvider *dpp, BucketShard& bs);
 
   int cls_obj_usage_log_add(const DoutPrefixProvider *dpp, const std::string& oid, rgw_usage_log_info& info, optional_yield y);
index a609bd86b6d8cc48f64608288f3336395edcf021..b75bb7bf8cd9de91cf0fc377301fbdf12c6a133a 100644 (file)
@@ -302,6 +302,7 @@ public:
          ": shard->wait_all_aio() returned ret=" << ret << dendl;
       }
     }
+    target_shards.clear();
   }
 
   int add_entry(int shard_index,
@@ -334,7 +335,6 @@ public:
         ret = r;
       }
     }
-    target_shards.clear();
     return ret;
   }
 }; // class BucketReshardManager
@@ -464,10 +464,12 @@ static int init_target_layout(rgw::sal::RadosStore* store,
   // retry in case of racing writes to the bucket instance metadata
   static constexpr auto max_retries = 10;
   int tries = 0;
+
   do {
+
     // update resharding state
     bucket_info.layout.target_index = target;
-    bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
+    bucket_info.layout.resharding = rgw::BucketReshardState::InLogrecord;
 
     if (ret = fault.check("set_target_layout");
         ret == 0) { // no fault injected, write the bucket instance metadata
@@ -607,6 +609,87 @@ static int init_reshard(rgw::sal::RadosStore* store,
     return ret;
   }
 
+  if (ret = fault.check("logrecord_writes");
+      ret == 0) { // no fault injected, record log with writing to the current index shards
+    ret = set_resharding_status(dpp, store, bucket_info,
+                                cls_rgw_reshard_status::IN_LOGRECORD);
+  }
+
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to pause "
+        "writes to the current index: " << cpp_strerror(ret) << dendl;
+    // clean up the target layout (ignore errors)
+    revert_target_layout(store, bucket_info, bucket_attrs, fault, dpp, y);
+    return ret;
+  }
+  return 0;
+} // init_reshard
+
+static int change_reshard_state(rgw::sal::RadosStore* store,
+                                RGWBucketInfo& bucket_info,
+                                std::map<std::string, bufferlist>& bucket_attrs,
+                                ReshardFaultInjector& fault,
+                                const DoutPrefixProvider *dpp, optional_yield y)
+{
+  auto prev = bucket_info.layout; // make a copy for cleanup
+  const auto current = prev.current_index;
+
+  // retry in case of racing writes to the bucket instance metadata
+  static constexpr auto max_retries = 10;
+  int tries = 0;
+  int ret = 0;
+  do {
+    // update resharding state
+    bucket_info.layout.resharding = rgw::BucketReshardState::InProgress;
+
+    if (ret = fault.check("change_reshard_state");
+        ret == 0) { // no fault injected, write the bucket instance metadata
+      ret = store->getRados()->put_bucket_instance_info(bucket_info, false,
+                                                        real_time(), &bucket_attrs, dpp, y);
+    } else if (ret == -ECANCELED) {
+      fault.clear(); // clear the fault so a retry can succeed
+    }
+
+    if (ret == -ECANCELED) {
+      // racing write detected, read the latest bucket info and try again
+      int ret2 = store->getRados()->get_bucket_instance_info(
+          bucket_info.bucket, bucket_info,
+          nullptr, &bucket_attrs, y, dpp);
+      if (ret2 < 0) {
+        ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to read "
+            "bucket info: " << cpp_strerror(ret2) << dendl;
+        ret = ret2;
+        break;
+      }
+
+      // check that we're still in the reshard state we started in
+      if (bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord ||
+          bucket_info.layout.current_index != current) {
+        ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " raced with "
+            "another reshard" << dendl;
+        break;
+      }
+    }
+    ++tries;
+  } while (ret == -ECANCELED && tries < max_retries);
+
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " failed to commit "
+        "target index layout: " << cpp_strerror(ret) << dendl;
+
+    bucket_info.layout = std::move(prev); // restore in-memory layout
+
+    // unblock writes to the current index shard objects
+    int ret2 = set_resharding_status(dpp, store, bucket_info,
+                                     cls_rgw_reshard_status::NOT_RESHARDING);
+    if (ret2 < 0) {
+      ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to unblock "
+          "writes to current index objects: " << cpp_strerror(ret2) << dendl;
+      // non-fatal error
+    }
+    return ret;
+  }
+
   if (ret = fault.check("block_writes");
       ret == 0) { // no fault injected, block writes to the current index shards
     ret = set_resharding_status(dpp, store, bucket_info,
@@ -622,7 +705,7 @@ static int init_reshard(rgw::sal::RadosStore* store,
   }
 
   return 0;
-} // init_reshard
+} // change_reshard_state
 
 static int cancel_reshard(rgw::sal::RadosStore* store,
                           RGWBucketInfo& bucket_info,
@@ -788,7 +871,8 @@ int RGWBucketReshard::cancel(const DoutPrefixProvider* dpp, optional_yield y)
     return ret;
   }
 
-  if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress) {
+  if (bucket_info.layout.resharding != rgw::BucketReshardState::InProgress ||
+      bucket_info.layout.resharding != rgw::BucketReshardState::InLogrecord) {
     ldpp_dout(dpp, -1) << "ERROR: bucket is not resharding" << dendl;
     ret = -EINVAL;
   } else {
@@ -885,41 +969,40 @@ int RGWBucketReshardLock::renew(const Clock::time_point& now) {
   return 0;
 }
 
-
-int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& current,
-                                 const rgw::bucket_index_layout_generation& target,
-                                 int max_op_entries, // max num to process per op
-                                bool verbose,
-                                ostream *out,
-                                Formatter *formatter,
-                                 const DoutPrefixProvider *dpp, optional_yield y)
+int RGWBucketReshard::reshard_process(const rgw::bucket_index_layout_generation& current,
+                                                int& max_op_entries,
+                                                BucketReshardManager& target_shards_mgr,
+                                                bool verbose_json_out,
+                                                ostream *out,
+                                                Formatter *formatter, rgw::BucketReshardState reshard_stage,
+                                                const DoutPrefixProvider *dpp, optional_yield y)
 {
-  if (out) {
-    (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
-    (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
-  }
-
-  /* update bucket info -- in progress*/
   list<rgw_cls_bi_entry> entries;
 
-  if (max_op_entries <= 0) {
-    ldpp_dout(dpp, 0) << __func__ <<
-      ": can't reshard, non-positive max_op_entries" << dendl;
+  string stage;
+  bool read_reshardlog;
+  switch (reshard_stage) {
+  case rgw::BucketReshardState::InLogrecord:
+    stage = "inventory";
+    read_reshardlog = false;
+    break;
+  case rgw::BucketReshardState::InProgress:
+    stage = "inc";
+    read_reshardlog = true;
+    break;
+  default:
+    ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " unknown reshard stage" << dendl;
     return -EINVAL;
   }
-
-  BucketReshardManager target_shards_mgr(dpp, store, bucket_info, target);
-
-  bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
-
+  stage.append("_entries");
   if (verbose_json_out) {
-    formatter->open_array_section("entries");
+    formatter->open_array_section(stage);
   }
 
-  uint64_t total_entries = 0;
-
+  uint64_t stage_entries = 0;
+  stage.append(":");
   if (!verbose_json_out && out) {
-    (*out) << "total entries:";
+    (*out) << stage;
   }
 
   const uint32_t num_source_shards = rgw::num_shards(current.layout.normal);
@@ -930,9 +1013,10 @@ int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& curr
     const std::string null_object_filter; // empty string since we're not filtering by object
     while (is_truncated) {
       entries.clear();
-      int ret = store->getRados()->bi_list(
-       dpp, bucket_info, i, null_object_filter, marker, max_op_entries,
-       &entries, &is_truncated, y);
+
+      int ret = store->getRados()->bi_list(dpp, bucket_info, i, null_object_filter,
+                                           marker, max_op_entries, &entries,
+                                           &is_truncated, read_reshardlog, y);
       if (ret == -ENOENT) {
         ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to find shard "
             << i << ", skipping" << dendl;
@@ -944,74 +1028,74 @@ int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& curr
       }
 
       for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
-       rgw_cls_bi_entry& entry = *iter;
-       if (verbose_json_out) {
-         formatter->open_object_section("entry");
-
-         encode_json("shard_id", i, formatter);
-         encode_json("num_entry", total_entries, formatter);
-         encode_json("entry", entry, formatter);
-       }
-       total_entries++;
-
-       marker = entry.idx;
-
-       int target_shard_id;
-       cls_rgw_obj_key cls_key;
-       RGWObjCategory category;
-       rgw_bucket_category_stats stats;
-       bool account = entry.get_info(&cls_key, &category, &stats);
-       rgw_obj_key key(cls_key);
-       if (entry.type == BIIndexType::OLH && key.empty()) {
-         // bogus entry created by https://tracker.ceph.com/issues/46456
-         // to fix, skip so it doesn't get include in the new bucket instance
-         total_entries--;
-         ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl;
-         continue;
-       }
-       rgw_obj obj(bucket_info.bucket, key);
-       RGWMPObj mp;
-       if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
-         // place the multipart .meta object on the same shard as its head object
-         obj.index_hash_source = mp.get_key();
-       }
-       ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal,
-                                                    obj.get_hash_object(), &target_shard_id);
-       if (ret < 0) {
-         ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
-         return ret;
-       }
-
-       int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
-
-       ret = target_shards_mgr.add_entry(shard_index, entry, account,
-                                         category, stats);
-       if (ret < 0) {
-         return ret;
-       }
-
-       Clock::time_point now = Clock::now();
-       if (reshard_lock.should_renew(now)) {
-         // assume outer locks have timespans at least the size of ours, so
-         // can call inside conditional
-         if (outer_reshard_lock) {
-           ret = outer_reshard_lock->renew(now);
-           if (ret < 0) {
-             return ret;
-           }
-         }
-         ret = reshard_lock.renew(now);
-         if (ret < 0) {
-           ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
-           return ret;
-         }
-       }
-       if (verbose_json_out) {
-         formatter->close_section();
-         formatter->flush(*out);
-       } else if (out && !(total_entries % 1000)) {
-         (*out) << " " << total_entries;
-       }
+        rgw_cls_bi_entry& entry = *iter;
+        if (verbose_json_out) {
+          formatter->open_object_section("entry");
+
+          encode_json("shard_id", i, formatter);
+          encode_json("num_entry", stage_entries, formatter);
+          encode_json("entry", entry, formatter);
+        }
+        stage_entries++;
+
+        marker = entry.idx;
+
+        int target_shard_id;
+        cls_rgw_obj_key cls_key;
+        RGWObjCategory category;
+        rgw_bucket_category_stats stats;
+        bool account = entry.get_info(&cls_key, &category, &stats);
+        rgw_obj_key key(cls_key);
+        if (entry.type == BIIndexType::OLH && key.empty()) {
+          // bogus entry created by https://tracker.ceph.com/issues/46456
+          // to fix, skip so it doesn't get include in the new bucket instance
+          stage_entries--;
+          ldpp_dout(dpp, 10) << "Dropping entry with empty name, idx=" << marker << dendl;
+          continue;
+        }
+        rgw_obj obj(bucket_info.bucket, key);
+        RGWMPObj mp;
+        if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
+          // place the multipart .meta object on the same shard as its head object
+          obj.index_hash_source = mp.get_key();
+        }
+        ret = store->getRados()->get_target_shard_id(bucket_info.layout.target_index->layout.normal,
+                      obj.get_hash_object(), &target_shard_id);
+        if (ret < 0) {
+          ldpp_dout(dpp, -1) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
+          return ret;
+        }
+
+        int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
+
+        ret = target_shards_mgr.add_entry(shard_index, entry, account,
+                  category, stats);
+        if (ret < 0) {
+          return ret;
+        }
+
+        Clock::time_point now = Clock::now();
+        if (reshard_lock.should_renew(now)) {
+          // assume outer locks have timespans at least the size of ours, so
+          // can call inside conditional
+          if (outer_reshard_lock) {
+            ret = outer_reshard_lock->renew(now);
+            if (ret < 0) {
+              return ret;
+            }
+          }
+          ret = reshard_lock.renew(now);
+          if (ret < 0) {
+            ldpp_dout(dpp, -1) << "Error renewing bucket lock: " << ret << dendl;
+            return ret;
+          }
+        }
+        if (verbose_json_out) {
+          formatter->close_section();
+          formatter->flush(*out);
+        } else if (out && !(stage_entries % 1000)) {
+          (*out) << " " << stage_entries;
+        }
       } // entries loop
     }
   }
@@ -1020,15 +1104,64 @@ int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& curr
     formatter->close_section();
     formatter->flush(*out);
   } else if (out) {
-    (*out) << " " << total_entries << std::endl;
+    (*out) << " " << stage_entries << std::endl;
   }
 
   int ret = target_shards_mgr.finish();
   if (ret < 0) {
-    ldpp_dout(dpp, -1) << "ERROR: failed to reshard" << dendl;
+    ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl;
     return -EIO;
   }
   return 0;
+}
+
+int RGWBucketReshard::do_reshard(const rgw::bucket_index_layout_generation& current,
+                                 const rgw::bucket_index_layout_generation& target,
+                                 int max_op_entries, // max num to process per op
+                                bool verbose,
+                                ostream *out,
+                                Formatter *formatter,
+                                 ReshardFaultInjector& fault,
+                                 const DoutPrefixProvider *dpp, optional_yield y)
+{
+  if (out) {
+    (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
+    (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
+  }
+
+  if (max_op_entries <= 0) {
+    ldpp_dout(dpp, 0) << __func__ <<
+      ": can't reshard, non-positive max_op_entries" << dendl;
+    return -EINVAL;
+  }
+
+  BucketReshardManager target_shards_mgr(dpp, store, bucket_info, target);
+
+  bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
+
+  // a log is written to shard going with client op at this state
+  ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InLogrecord);
+  int ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
+                            formatter, bucket_info.layout.resharding, dpp, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << __func__ << ": failed in logrecord state of reshard ret = " << ret << dendl;
+    return ret;
+  }
+
+  ret = change_reshard_state(store, bucket_info, bucket_attrs, fault, dpp, y);
+  if (ret < 0) {
+    return ret;
+  }
+
+  // block the client op and complete the resharding
+  ceph_assert(bucket_info.layout.resharding == rgw::BucketReshardState::InProgress);
+  ret = reshard_process(current, max_op_entries, target_shards_mgr, verbose_json_out, out,
+                        formatter, bucket_info.layout.resharding, dpp, y);
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << __func__ << ": failed in progress state of reshard ret = " << ret << dendl;
+    return ret;
+  }
+  return 0;
 } // RGWBucketReshard::do_reshard
 
 int RGWBucketReshard::get_status(const DoutPrefixProvider *dpp, list<cls_rgw_bucket_instance_entry> *status)
@@ -1052,7 +1185,7 @@ int RGWBucketReshard::execute(int num_shards,
   if (ret < 0) {
     return ret;
   }
-  // unlock when scope exits
+  // TODO: release the lock when purging the old index shards or unsucessful new index shards
   auto unlock = make_scope_guard([this] { reshard_lock.unlock(); });
 
   if (reshard_log) {
@@ -1072,7 +1205,7 @@ int RGWBucketReshard::execute(int num_shards,
       ret == 0) { // no fault injected, do the reshard
     ret = do_reshard(bucket_info.layout.current_index,
                      *bucket_info.layout.target_index,
-                     max_op_entries, verbose, out, formatter, dpp, y);
+                     max_op_entries, verbose, out, formatter, fault, dpp, y);
   }
 
   if (ret < 0) {
index a2097318827f3b568df5b2c1c7fe0e7e095faf8e..4101f9f06ed3c9bade8e71aecb9bd8e88e942306 100644 (file)
@@ -26,6 +26,7 @@
 class RGWReshard;
 
 
+class BucketReshardManager;
 namespace rgw { namespace sal {
   class RadosStore;
 } }
@@ -83,13 +84,20 @@ class RGWBucketReshard {
   // using an initializer_list as an array in contiguous memory
   // allocated in at once
   static const std::initializer_list<uint16_t> reshard_primes;
-
+  int reshard_process(const rgw::bucket_index_layout_generation& current,
+                      int& max_entries,
+                      BucketReshardManager& target_shards_mgr,
+                      bool verbose_json_out,
+                      std::ostream *out,
+                      Formatter *formatter, rgw::BucketReshardState reshard_stage,
+                      const DoutPrefixProvider *dpp, optional_yield y);
   int do_reshard(const rgw::bucket_index_layout_generation& current,
                  const rgw::bucket_index_layout_generation& target,
                  int max_entries,
                  bool verbose,
                  std::ostream *os,
                 Formatter *formatter,
+                 ReshardFaultInjector& fault,
                  const DoutPrefixProvider *dpp, optional_yield y);
 public:
 
index 44900e5d1afafa54f89be5e0bb294497239e6bc2..ee8f1451bfa49aab0da9d5a2c055b1e3ad99be72 100644 (file)
@@ -7917,7 +7917,7 @@ next:
       do {
         entries.clear();
        // if object is specified, we use that as a filter to only retrieve some entries
-        ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->bi_list(bs, object, marker, max_entries, &entries, &is_truncated, null_yield);
+        ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->bi_list(bs, object, marker, max_entries, &entries, &is_truncated, false, null_yield);
         if (ret < 0) {
           ldpp_dout(dpp(), 0) << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
           return -ret;
index 499e8f0cd4374fc35a0a754ed1c960706aff0e1b..57d37bf381f4eac4c005e647a0c699e709bdda79 100644 (file)
@@ -299,6 +299,7 @@ std::string_view to_string(const BucketReshardState& s)
 {
   switch (s) {
   case BucketReshardState::None: return "None";
+  case BucketReshardState::InLogrecord: return "InLogrecord";
   case BucketReshardState::InProgress: return "InProgress";
   default: return "Unknown";
   }
@@ -309,6 +310,10 @@ bool parse(std::string_view str, BucketReshardState& s)
     s = BucketReshardState::None;
     return true;
   }
+  if (boost::iequals(str, "InLogrecord")) {
+    s = BucketReshardState::InLogrecord;
+    return true;
+  }
   if (boost::iequals(str, "InProgress")) {
     s = BucketReshardState::InProgress;
     return true;
index 114f1f1ff589c591c7429fbc3b784c149a479e44..25dc30c3b5f0188b95b88117a82aa35fcfe819e0 100644 (file)
@@ -219,6 +219,7 @@ inline bucket_index_layout_generation log_to_index_layout(const bucket_log_layou
 
 enum class BucketReshardState : uint8_t {
   None,
+  InLogrecord,
   InProgress,
 };
 std::string_view to_string(const BucketReshardState& s);
index a2f2fa66a7672435ede5b72c650da33cc59e19f0..0cee97e02a22829262ae84b3c00838093e90f860 100644 (file)
@@ -238,7 +238,7 @@ TEST_F(cls_rgw, index_remove_object)
   /* prepare both removal and modification on the same object, this time we'll
    * first complete modification then remove*/
   index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag_remove, obj, loc);
-  index_prepare(ioctx, bucket_oid, CLS_RGW_OP_DEL, tag_modify, obj, loc);
+  index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag_modify, obj, loc);
 
   /* complete modification */
   total_size -= meta.size;
@@ -1340,3 +1340,64 @@ TEST_F(cls_rgw, index_racing_removes)
 
   test_stats(ioctx, bucket_oid, RGWObjCategory::None, 0, 0);
 }
+
+void set_reshard_status(librados::IoCtx& ioctx, const std::string& oid,
+                const cls_rgw_bucket_instance_entry& entry)
+{
+  map<int, string> bucket_objs;
+  bucket_objs[0] = oid;
+  int r = CLSRGWIssueSetBucketResharding(ioctx, bucket_objs, entry, 1)();
+  ASSERT_EQ(0, r);
+}
+
+static int reshardlog_list(librados::IoCtx& ioctx, const std::string& oid,
+                           std::list<rgw_cls_bi_entry> *entries, bool *is_truncated)
+{
+  int ret = cls_rgw_bi_list(ioctx, oid, "", "", 100, entries, is_truncated, true);
+  if (ret < 0) {
+    return ret;
+  }
+  return 0;
+}
+
+TEST_F(cls_rgw, reshardlog_list)
+{
+  string bucket_oid = str_int("reshard", 0);
+
+  ObjectWriteOperation op;
+  cls_rgw_bucket_init_index(op);
+  ASSERT_EQ(0, ioctx.operate(bucket_oid, &op));
+
+  cls_rgw_obj_key obj1 = str_int("obj1", 0);
+  string tag = str_int("tag-prepare", 0);
+  string loc = str_int("loc", 0);
+  index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj1, loc);
+  rgw_bucket_dir_entry_meta meta;
+  index_complete(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, 1, obj1, meta);
+
+  // do not record logs
+  bool is_truncated = false;
+  std::list<rgw_cls_bi_entry> entries;
+  ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated));
+  ASSERT_FALSE(is_truncated);
+  ASSERT_EQ(0u, entries.size());
+
+  // set reshard status to IN_LOGRECORD
+  cls_rgw_bucket_instance_entry entry;
+  entry.reshard_status = cls_rgw_reshard_status::IN_LOGRECORD;
+  set_reshard_status(ioctx, bucket_oid, entry);
+
+  // record a log in prepare
+  cls_rgw_obj_key obj2 = str_int("obj2", 0);
+  index_prepare(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, obj2, loc);
+  ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated));
+  ASSERT_FALSE(is_truncated);
+  ASSERT_EQ(1u, entries.size());
+
+  // overwrite the log writen in prepare
+  entries.clear();
+  index_complete(ioctx, bucket_oid, CLS_RGW_OP_ADD, tag, 1, obj2, meta);
+  ASSERT_EQ(0, reshardlog_list(ioctx, bucket_oid, &entries, &is_truncated));
+  ASSERT_FALSE(is_truncated);
+  ASSERT_EQ(1u, entries.size());
+}