]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/reshard: record a duplicated index entry copy together with
authorliangmingyuan <liangmingyuan@baidu.com>
Wed, 21 Feb 2024 02:49:49 +0000 (10:49 +0800)
committerliangmingyuan <liangmingyuan@baidu.com>
Sat, 20 Jul 2024 14:55:56 +0000 (22:55 +0800)
version bucket writting operations.

Signed-off-by: Mingyuan Liang <liangmingyuan@baidu.com>
src/cls/rgw/cls_rgw.cc

index f3c69098f62d26682d524bcc162118e330adef70..aa5ba9ed7dbfd65060d1f940826901abca4b1d43 100644 (file)
@@ -142,13 +142,13 @@ static void bi_reshard_log_prefix(string& key)
 }
 
 // 0x802001_idx
-static void bi_reshard_log_key(cls_method_context_t hctx, string& key, string& idx)
+static void bi_reshard_log_key(cls_method_context_t hctx, string& key, const string& idx)
 {
   bi_reshard_log_prefix(key);
   key.append(idx);
 }
 
-static int reshard_log_index_operation(cls_method_context_t hctx, string& idx,
+static int reshard_log_index_operation(cls_method_context_t hctx, const string& idx,
                                        const cls_rgw_obj_key& key, bufferlist* log_bl)
 {
   string reshard_log_idx;
@@ -1308,11 +1308,19 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
 } // rgw_bucket_complete_op
 
 template <class T>
-static int write_entry(cls_method_context_t hctx, T& entry, const string& key)
+static int write_entry(cls_method_context_t hctx, T& entry, const string& key,
+                       const bool is_resharding = false)
 {
   bufferlist bl;
   encode(entry, bl);
-  return cls_cxx_map_set_val(hctx, key, &bl);
+  int ret = cls_cxx_map_set_val(hctx, key, &bl);
+  if (ret < 0) {
+    return ret;
+  }
+  if (is_resharding) {
+    ret = reshard_log_index_operation(hctx, key, entry.key, &bl);
+  }
+  return ret;
 }
 
 static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, rgw_bucket_olh_entry *olh_data_entry, string *index_key, bool *found)
@@ -1345,11 +1353,13 @@ static void update_olh_log(rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, co
   log.push_back(log_entry);
 }
 
-static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, const string& instance_idx)
+static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry,
+                                    const string& instance_idx, bool is_resharding)
 {
-  CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(), instance_idx.c_str(), instance_entry.flags);
+  CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(),
+          instance_idx.c_str(), instance_entry.flags);
   /* write the instance entry */
-  int ret = write_entry(hctx, instance_entry, instance_idx);
+  int ret = write_entry(hctx, instance_entry, instance_idx, is_resharding);
   if (ret < 0) {
     CLS_LOG(0, "ERROR: write_entry() instance_key=%s ret=%d", escape_str(instance_idx).c_str(), ret);
     return ret;
@@ -1360,9 +1370,10 @@ static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_en
 /*
  * write object instance entry, and if needed also the list entry
  */
-static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, const string& instance_idx)
+static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry,
+                             const string& instance_idx, bool is_resharding)
 {
-  int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx);
+  int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, is_resharding);
   if (ret < 0) {
     return ret;
   }
@@ -1372,7 +1383,7 @@ static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& in
   if (instance_idx != instance_list_idx) {
     CLS_LOG(20, "write_entry() idx=%s flags=%d", escape_str(instance_list_idx).c_str(), instance_entry.flags);
     /* write a new list entry for the object instance */
-    ret = write_entry(hctx, instance_entry, instance_list_idx);
+    ret = write_entry(hctx, instance_entry, instance_list_idx, is_resharding);
     if (ret < 0) {
       CLS_LOG(0, "ERROR: write_entry() instance=%s instance_list_idx=%s ret=%d", instance_entry.key.instance.c_str(), instance_list_idx.c_str(), ret);
       return ret;
@@ -1428,8 +1439,8 @@ public:
     instance_entry.versioned_epoch = epoch;
   }
 
-  int unlink_list_entry() {
-    string list_idx;
+  int unlink_list_entry(bool is_resharding) {
+    string list_idx, list_sub_ver;
     /* this instance has a previous list entry, remove that entry */
     get_list_index_key(instance_entry, &list_idx);
     CLS_LOG(20, "unlink_list_entry() list_idx=%s", escape_str(list_idx).c_str());
@@ -1438,10 +1449,14 @@ public:
       CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() list_idx=%s ret=%d", list_idx.c_str(), ret);
       return ret;
     }
+    if (is_resharding) {
+      bufferlist empty;
+      return reshard_log_index_operation(hctx, list_idx, instance_entry.key, &empty);
+    }
     return 0;
   }
 
-  int unlink() {
+  int unlink(bool is_resharding, const cls_rgw_obj_key& key) {
     /* remove the instance entry */
     CLS_LOG(20, "unlink() idx=%s", escape_str(instance_idx).c_str());
     int ret = cls_cxx_map_remove_key(hctx, instance_idx);
@@ -1449,10 +1464,14 @@ public:
       CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
       return ret;
     }
+    if (is_resharding) {
+      bufferlist empty;
+      return reshard_log_index_operation(hctx, instance_idx, key, &empty);
+    }
     return 0;
   }
 
-  int write_entries(uint64_t flags_set, uint64_t flags_reset) {
+  int write_entries(uint64_t flags_set, uint64_t flags_reset, bool is_resharding) {
     if (!initialized) {
       int ret = init();
       if (ret < 0) {
@@ -1465,7 +1484,7 @@ public:
     /* write the instance and list entries */
     bool special_delete_marker_key = (instance_entry.is_delete_marker() && instance_entry.key.instance.empty());
     encode_obj_versioned_data_key(key, &instance_idx, special_delete_marker_key);
-    int ret = write_obj_entries(hctx, instance_entry, instance_idx);
+    int ret = write_obj_entries(hctx, instance_entry, instance_idx, is_resharding);
     if (ret < 0) {
       CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret);
       return ret;
@@ -1474,11 +1493,11 @@ public:
     return 0;
   }
 
-  int write(uint64_t epoch, bool current) {
+  int write(uint64_t epoch, bool current, bool is_resharding) {
     if (instance_entry.versioned_epoch > 0) {
       CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch);
       /* this instance has a previous list entry, remove that entry */
-      int ret = unlink_list_entry();
+      int ret = unlink_list_entry(is_resharding);
       if (ret < 0) {
         return ret;
       }
@@ -1490,11 +1509,11 @@ public:
     }
 
     instance_entry.versioned_epoch = epoch;
-    return write_entries(flags, 0);
+    return write_entries(flags, 0, is_resharding);
   }
 
-  int demote_current() {
-    return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT);
+  int demote_current(bool is_resharding) {
+    return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, is_resharding);
   }
 
   bool is_delete_marker() {
@@ -1596,9 +1615,9 @@ public:
     olh_data_entry.key = key;
   }
 
-  int write() {
+  int write(bool is_resharding) {
     /* write the olh data entry */
-    int ret = write_entry(hctx, olh_data_entry, olh_data_idx);
+    int ret = write_entry(hctx, olh_data_entry, olh_data_idx, is_resharding);
     if (ret < 0) {
       CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_idx.c_str(), ret);
       return ret;
@@ -1632,12 +1651,13 @@ public:
   }
 };
 
-static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key)
+static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key,
+                                bool is_resharding)
 {
   rgw_bucket_dir_entry entry;
   entry.key = key;
   entry.flags = rgw_bucket_dir_entry::FLAG_VER_MARKER;
-  int ret = write_entry(hctx, entry, key.name);
+  int ret = write_entry(hctx, entry, key.name, is_resharding);
   if (ret < 0) {
     CLS_LOG(0, "ERROR: write_entry returned ret=%d", ret);
     return ret;
@@ -1652,9 +1672,10 @@ static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key)
  * key. Their version is going to be empty though
  */
 static int convert_plain_entry_to_versioned(cls_method_context_t hctx,
-                                           cls_rgw_obj_key& key,
-                                           bool demote_current,
-                                           bool instance_only)
+                                            cls_rgw_obj_key& key,
+                                            bool demote_current,
+                                            bool instance_only,
+                                            bool is_resharding)
 {
   if (!key.instance.empty()) {
     return -EINVAL;
@@ -1681,9 +1702,9 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx,
     encode_obj_versioned_data_key(key, &new_idx);
 
     if (instance_only) {
-      ret = write_obj_instance_entry(hctx, entry, new_idx);
+      ret = write_obj_instance_entry(hctx, entry, new_idx, is_resharding);
     } else {
-      ret = write_obj_entries(hctx, entry, new_idx);
+      ret = write_obj_entries(hctx, entry, new_idx, is_resharding);
     }
     if (ret < 0) {
       CLS_LOG(0, "ERROR: write_obj_entries new_idx=%s returned %d",
@@ -1692,7 +1713,7 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx,
     }
   }
 
-  ret = write_version_marker(hctx, key);
+  ret = write_version_marker(hctx, key, is_resharding);
   if (ret < 0) {
     return ret;
   }
@@ -1732,6 +1753,13 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
     return -EINVAL;
   }
 
+  struct rgw_bucket_dir_header header;
+  int rc = read_bucket_header(hctx, &header);
+  if (rc < 0) {
+    CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__);
+    return rc;
+  }
+
   /* read instance entry */
   BIVerObjEntry obj(hctx, op.key);
   int ret = obj.init(op.delete_marker);
@@ -1805,7 +1833,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
                                              * entry */
     existed = (ret >= 0 && !other_obj.is_delete_marker());
     if (ret >= 0 && other_obj.is_delete_marker() != op.delete_marker) {
-      ret = other_obj.unlink_list_entry();
+      ret = other_obj.unlink_list_entry(header.resharding_in_logrecord());
       if (ret < 0) {
         return ret;
       }
@@ -1813,7 +1841,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
 
     removing = existed && op.delete_marker;
     if (!removing) {
-      ret = other_obj.unlink();
+      ret = other_obj.unlink(header.resharding_in_logrecord(), op.key);
       if (ret < 0) {
         return ret;
       }
@@ -1839,7 +1867,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
   const uint64_t prev_epoch = olh.get_epoch();
 
   if (!olh.start_modify(op.olh_epoch)) {
-    ret = obj.write(op.olh_epoch, false);
+    ret = obj.write(op.olh_epoch, false, header.resharding_in_logrecord());
     if (ret < 0) {
       return ret;
     }
@@ -1871,7 +1899,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
       if (!(olh_entry.key == op.key)) {
         BIVerObjEntry old_obj(hctx, olh_entry.key);
 
-        ret = old_obj.demote_current();
+        ret = old_obj.demote_current(header.resharding_in_logrecord());
         if (ret < 0) {
           CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
           return ret;
@@ -1882,7 +1910,8 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
   } else {
     bool instance_only = (op.key.instance.empty() && op.delete_marker);
     cls_rgw_obj_key key(op.key.name);
-    ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only);
+    ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only,
+                                           header.resharding_in_logrecord());
     if (ret < 0) {
       CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
       return ret;
@@ -1904,14 +1933,14 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
   }
   olh.set_exists(true);
 
-  ret = olh.write();
+  ret = olh.write(header.resharding_in_logrecord());
   if (ret < 0) {
     CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
     return ret;
   }
 
   /* write the instance and list entries */
-  ret = obj.write(olh.get_epoch(), promote);
+  ret = obj.write(olh.get_epoch(), promote, header.resharding_in_logrecord());
   if (ret < 0) {
     return ret;
   }
@@ -1920,12 +1949,6 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
    return 0;
   }
 
-  rgw_bucket_dir_header header;
-  ret = read_bucket_header(hctx, &header);
-  if (ret < 0) {
-    CLS_LOG(1, "ERROR: rgw_bucket_link_olh(): failed to read header\n");
-    return ret;
-  }
   if (header.syncstopped) {
     return 0;
   }
@@ -1972,10 +1995,17 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
 
   cls_rgw_obj_key dest_key = op.key;
 
+  struct rgw_bucket_dir_header header;
+  int ret = read_bucket_header(hctx, &header);
+  if (ret < 0) {
+    CLS_LOG(1, "ERROR: rgw_bucket_unlink_instance(): failed to read header\n");
+    return ret;
+  }
+
   BIVerObjEntry obj(hctx, dest_key);
   BIOLHEntry olh(hctx, dest_key);
 
-  int ret = obj.init();
+  ret = obj.init();
   if (ret < 0) {
     if (ret != -ENOENT) {
       CLS_LOG(0, "ERROR: obj.init() returned ret=%d", ret);
@@ -1993,7 +2023,8 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
   if (!olh_found) {
     bool instance_only = false;
     cls_rgw_obj_key key(dest_key.name);
-    ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only);
+    ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only,
+                                           header.resharding_in_logrecord());
     if (ret < 0) {
       CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
       return ret;
@@ -2005,7 +2036,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
   }
 
   if (!olh.start_modify(op.olh_epoch)) {
-    ret = obj.unlink_list_entry();
+    ret = obj.unlink_list_entry(header.resharding_in_logrecord());
     if (ret < 0) {
       return ret;
     }
@@ -2015,7 +2046,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
     }
 
     olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
-    return olh.write();
+    return olh.write(header.resharding_in_logrecord());
   }
 
   rgw_bucket_olh_entry& olh_entry = olh.get_entry();
@@ -2035,7 +2066,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
 
     if (found) {
       BIVerObjEntry next(hctx, next_key);
-      ret = next.write(olh.get_epoch(), true);
+      ret = next.write(olh.get_epoch(), true, header.resharding_in_logrecord());
       if (ret < 0) {
         CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
         return ret;
@@ -2062,18 +2093,18 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
   } else {
     /* this is a delete marker, it's our responsibility to remove its
      * instance entry */
-    ret = obj.unlink();
+    ret = obj.unlink(header.resharding_in_logrecord(), op.key);
     if (ret < 0) {
       return ret;
     }
   }
 
-  ret = obj.unlink_list_entry();
+  ret = obj.unlink_list_entry(header.resharding_in_logrecord());
   if (ret < 0) {
     return ret;
   }
 
-  ret = olh.write();
+  ret = olh.write(header.resharding_in_logrecord());
   if (ret < 0) {
     return ret;
   }
@@ -2082,12 +2113,6 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
     return 0;
   }
 
-  rgw_bucket_dir_header header;
-  ret = read_bucket_header(hctx, &header);
-  if (ret < 0) {
-    CLS_LOG(1, "ERROR: rgw_bucket_unlink_instance(): failed to read header\n");
-    return ret;
-  }
   if (header.syncstopped) {
     return 0;
   }
@@ -2203,8 +2228,15 @@ static int rgw_bucket_trim_olh_log(cls_method_context_t hctx, bufferlist *in, bu
     log.erase(rm_iter);
   }
 
+  struct rgw_bucket_dir_header header;
+  int rc = read_bucket_header(hctx, &header);
+  if (rc < 0) {
+    CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__);
+    return rc;
+  }
+
   /* write the olh data entry */
-  ret = write_entry(hctx, olh_data_entry, olh_data_key);
+  ret = write_entry(hctx, olh_data_entry, olh_data_key, header.resharding_in_logrecord());
   if (ret < 0) {
     CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_key.c_str(), ret);
     return ret;
@@ -2231,9 +2263,16 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe
     return -EINVAL;
   }
 
+  struct rgw_bucket_dir_header header;
+  int rc = read_bucket_header(hctx, &header);
+  if (rc < 0) {
+    CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__);
+    return rc;
+  }
+
   /* read olh entry */
   rgw_bucket_olh_entry olh_data_entry;
-  string olh_data_key;
+  string olh_data_key, olh_sub_ver;
   encode_olh_data_key(op.key, &olh_data_key);
   int ret = read_index_entry(hctx, olh_data_key, &olh_data_entry);
   if (ret < 0 && ret != -ENOENT) {
@@ -2251,6 +2290,10 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe
     CLS_LOG(1, "NOTICE: %s: can't remove key %s ret=%d", __func__, olh_data_key.c_str(), ret);
     return ret;
   }
+  bufferlist empty;
+  ret = record_duplicate_entry(hctx, olh_data_key, olh_data_entry.key, &empty, header.resharding_in_logrecord());
+  if (ret < 0)
+    return ret;
 
   rgw_bucket_dir_entry plain_entry;
 
@@ -2276,6 +2319,10 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe
     return ret;
   }
 
+  ret = record_duplicate_entry(hctx, op.key.name, plain_entry.key, &empty, header.resharding_in_logrecord());
+  if (ret < 0)
+    return ret;
+
   return 0;
 }
 
@@ -2442,6 +2489,10 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
             return ret;
           }
         }
+        if (header.resharding_in_logrecord()) {
+          bufferlist empty;
+          return reshard_log_index_operation(hctx, cur_change_key, cur_change.key, &empty);
+        }
         break;
       case CEPH_RGW_UPDATE:
        CLS_LOG_BITX(bitx_inst, 10,
@@ -2475,6 +2526,9 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx,
             return ret;
           }
         }
+        if (header.resharding_in_logrecord()) {
+          return reshard_log_index_operation(hctx, cur_change_key, cur_change.key, &cur_state_bl);
+        }
         break;
       } // switch(op)
     } // if (cur_disk.pending_map.empty())