]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: break the coupling of olh epoch and epochs of olh ops, and make the epochs of...
authorJane Zhu <jzhu116@bloomberg.net>
Wed, 4 Mar 2026 23:39:06 +0000 (23:39 +0000)
committerJane Zhu <jzhu116@bloomberg.net>
Sat, 4 Apr 2026 17:02:17 +0000 (17:02 +0000)
Signed-off-by: Jane Zhu <jzhu116@bloomberg.net>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_types.cc
src/cls/rgw/cls_rgw_types.h
src/rgw/driver/rados/rgw_rados.cc
src/test/rgw/rgw_multi/tests.py

index fbe5525a40cb9b84547936509f482d25078d68ec..0630c01922b81c67130321f84ca233485f76df48 100644 (file)
@@ -1399,7 +1399,7 @@ static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, rgw_buck
 static void update_olh_log(rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, const string& op_tag,
                            cls_rgw_obj_key& key, bool delete_marker, uint64_t epoch)
 {
-  vector<rgw_bucket_olh_log_entry>& log = olh_data_entry.pending_log[olh_data_entry.epoch];
+  vector<rgw_bucket_olh_log_entry>& log = olh_data_entry.pending_log[epoch];
   rgw_bucket_olh_log_entry log_entry;
   log_entry.epoch = epoch;
   log_entry.op = op;
@@ -1544,7 +1544,7 @@ public:
 
   int write(uint64_t epoch, bool current, rgw_bucket_dir_header& header) {
     if (instance_entry.versioned_epoch > 0) {
-      CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch);
+      CLS_LOG(20, "%s: instance_entry.versioned_epoch=%lu epoch=%lu", __func__, instance_entry.versioned_epoch, epoch);
       /* this instance has a previous list entry, remove that entry */
       int ret = unlink_list_entry(header);
       if (ret < 0) {
@@ -1641,14 +1641,17 @@ public:
    * This timestamp is then used later on to guard against OLH updates for add/remove instance ops that happened *before*
    * the latest op that updated the OLH entry.
    * @param candidate_epoch - this is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
+   * @param replace - true to replace the epoch if larger than the old one;
    */
-  bool start_modify (uint64_t candidate_epoch) {
+  bool start_modify (uint64_t candidate_epoch, bool replace = true) {
     // only update the olh.epoch if it is newer than the current one.
     if (candidate_epoch < olh_data_entry.epoch) {
       return false; /* olh cannot be modified, old epoch */
     }
 
-    olh_data_entry.epoch = candidate_epoch;
+    if (replace) {
+      olh_data_entry.epoch = candidate_epoch;
+    }
     return true;
   }
 
@@ -1656,6 +1659,10 @@ public:
     return olh_data_entry.epoch;
   }
 
+  void set_epoch(uint64_t epoch) {
+    olh_data_entry.epoch = epoch;
+  }
+
   rgw_bucket_olh_entry& get_entry() {
     return olh_data_entry;
   }
@@ -1725,7 +1732,8 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx,
                                             cls_rgw_obj_key& key,
                                             bool demote_current,
                                             bool instance_only,
-                                            rgw_bucket_dir_header& header)
+                                            rgw_bucket_dir_header& header,
+                                            uint64_t& versioned_epoch)
 {
   if (!key.instance.empty()) {
     return -EINVAL;
@@ -1741,7 +1749,8 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx,
       return ret;
     }
 
-    entry.versioned_epoch = 1; /* converted entries are always 1 */
+    entry.versioned_epoch = versioned_epoch =
+          duration_cast<std::chrono::nanoseconds>(entry.meta.mtime.time_since_epoch()).count();
     entry.flags |= rgw_bucket_dir_entry::FLAG_VER;
 
     if (demote_current) {
@@ -1899,8 +1908,8 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
   const uint64_t prev_epoch = olh.get_epoch();
 
   // op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
-  uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch :
-    duration_cast<std::chrono::nanoseconds>(obj.mtime().time_since_epoch()).count();
+  uint64_t now_epoch = duration_cast<std::chrono::nanoseconds>(real_clock::now().time_since_epoch()).count();
+  uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch : now_epoch;
   if (olh.start_modify(candidate_epoch)) {
     // promote this version to current if it's a newer epoch, or if it matches the
     // current epoch and sorts after the current instance
@@ -1940,21 +1949,22 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
     } else {
       bool instance_only = (op.key.instance.empty() && op.delete_marker);
       cls_rgw_obj_key key(op.key.name);
-      ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header);
+      uint64_t versioned_epoch = candidate_epoch;
+      ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header, versioned_epoch);
       if (ret < 0) {
         CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
         return ret;
       }
       olh.set_tag(op.olh_tag);
       if (op.key.instance.empty()) {
-        obj.set_epoch(1);
+        obj.set_epoch(versioned_epoch);
       }
     }
 
     /* update the olh log */
-    olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker);
+    olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker, now_epoch);
     if (removing) {
-      olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
+      olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch);
     }
 
     if (promote) {
@@ -1969,8 +1979,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
     }
 
     ret = olh.write(header);
-  }
-  else {
+  } else {
     ret = obj.write(candidate_epoch, false, header);
     if (ret < 0) {
       return ret;
@@ -1980,9 +1989,11 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
     // the epoch is already stale compared to the current - so no point in applying it;
 
     if (removing) {
-      olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch);
-      ret = olh.write(header);
+      olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch);
+    } else {
+      olh.update_log(CLS_RGW_OLH_OP_STALE, op.op_tag, op.key, false, now_epoch);
     }
+    ret = olh.write(header);
   }
 
   if (ret < 0) {
@@ -2070,10 +2081,14 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
     return ret;
   }
 
+  uint64_t now_epoch = duration_cast<std::chrono::nanoseconds>(real_clock::now().time_since_epoch()).count();
+  uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch : now_epoch;
+
   if (!olh_found) {
     bool instance_only = false;
     cls_rgw_obj_key key(dest_key.name);
-    ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, header);
+    uint64_t versioned_epoch = candidate_epoch - 1;
+    ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, header, versioned_epoch);
     if (ret < 0) {
       CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
       return ret;
@@ -2081,13 +2096,11 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
     olh.update(dest_key, false);
     olh.set_tag(op.olh_tag);
 
-    obj.set_epoch(1);
+    obj.set_epoch(versioned_epoch);
   }
 
   // op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
-  uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch :
-    duration_cast<std::chrono::nanoseconds>(real_clock::now().time_since_epoch()).count();
-  if (olh.start_modify(candidate_epoch)) {
+  if (olh.start_modify(candidate_epoch, false)) {
     rgw_bucket_olh_entry &olh_entry = olh.get_entry();
     cls_rgw_obj_key &olh_key = olh_entry.key;
     CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__,
@@ -2105,7 +2118,14 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
 
       if (found) {
         BIVerObjEntry next(hctx, next_key);
-        ret = next.write(olh.get_epoch(), true, header);
+        ret = next.init();
+        if (ret < 0) {
+          CLS_LOG(0, "ERROR: next.init() returned ret=%d", ret);
+          return ret;
+        }
+
+        uint64_t next_epoch = next.get_dir_entry().versioned_epoch;
+        ret = next.write(next_epoch, true, header);
         if (ret < 0) {
           CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
           return ret;
@@ -2115,21 +2135,28 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
                 next_key.name.c_str(), next_key.instance.c_str(), (int) next.is_delete_marker());
 
         olh.update(next_key, next.is_delete_marker());
-        olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker());
+        olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker(), now_epoch);
+        // use the next entry's versioned_epoch in the olh entry since it's the new head now
+        olh.set_epoch(next_epoch);
       } else {
         // next_key is empty, but we need to preserve its name in case this entry
         // gets resharded, because this key is used for hash placement
         next_key.name = dest_key.name;
         olh.update(next_key, false);
-        olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false);
+        if (olh.get_epoch() == 0) {
+          olh.set_epoch(candidate_epoch);
+        }
+        olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false, now_epoch);
         olh.set_exists(false);
         olh.set_pending_removal(true);
       }
     }
 
     if (!obj.is_delete_marker()) {
-      olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
+      olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch);
     } else {
+      olh.update_log(CLS_RGW_OLH_OP_STALE, op.op_tag, op.key, false, now_epoch);
+
       /* this is a delete marker, it's our responsibility to remove its
        * instance entry */
       ret = obj.unlink(header, op.key);
@@ -2150,10 +2177,17 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
     }
 
     if (obj.is_delete_marker()) {
-      return 0;
+      olh.update_log(CLS_RGW_OLH_OP_STALE, op.op_tag, op.key, false, now_epoch);
+
+      ret = olh.write(header);
+      if (ret < 0) {
+        CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
+      }
+
+      return ret;
     }
 
-    olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch);
+    olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch);
   }
 
   ret = olh.write(header);
index ac89cf51e39a76c6c2d6f668a6de1d01108f422e..036a63dfe030de60a734d85b76983eef020d236f 100644 (file)
@@ -558,6 +558,9 @@ void rgw_bucket_olh_log_entry::dump(Formatter *f) const
     case CLS_RGW_OLH_OP_REMOVE_INSTANCE:
       op_str = "remove_instance";
       break;
+    case CLS_RGW_OLH_OP_STALE:
+      op_str = "stale_olh_op";
+      break;
     default:
       op_str = "unknown";
   }
@@ -578,6 +581,8 @@ void rgw_bucket_olh_log_entry::decode_json(JSONObj *obj)
     op = CLS_RGW_OLH_OP_UNLINK_OLH;
   } else if (op_str == "remove_instance") {
     op = CLS_RGW_OLH_OP_REMOVE_INSTANCE;
+  } else if (op_str == "stale_olh_op") {
+    op = CLS_RGW_OLH_OP_STALE;
   } else {
     op = CLS_RGW_OLH_OP_UNKNOWN;
   }
index 6fe5e4e251d1c36380eb4cdf6c9cde17b0e9c8f7..069f5e3361f37448b0132838da77bc7a4cdd9b96 100644 (file)
@@ -517,6 +517,8 @@ enum OLHLogOp {
   CLS_RGW_OLH_OP_UNLINK_OLH      = 2, /* object does not exist */
   // remove a specific instance of an object, such as <obj_name>.<obj_version>
   CLS_RGW_OLH_OP_REMOVE_INSTANCE = 3,
+  // a stale op to be used to cleanup olh.pending attribute of the olh object
+  CLS_RGW_OLH_OP_STALE           = 4,
 };
 
 struct rgw_bucket_olh_log_entry {
index 505a43a7940d2f819fa9d14c60d634b02e333b3b..26b983b8d28491aa5336ac8ed2f7a9c345f4e34f 100644 (file)
@@ -8656,6 +8656,7 @@ int RGWRados::olh_init_modification_impl(const DoutPrefixProvider *dpp, const RG
   string attr_name = RGW_ATTR_OLH_PENDING_PREFIX;
   attr_name.append(*op_tag);
 
+  ldpp_dout(dpp, 20) << __func__ << " adding olh pending attr: " << attr_name << dendl;
   op.setxattr(attr_name.c_str(), bl);
 
   int ret = obj_operate(dpp, bucket_info, olh_obj, std::move(op), y);
@@ -9305,7 +9306,7 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp,
   uint64_t link_epoch = 0;
   cls_rgw_obj_key key;
   bool delete_marker = false;
-  list<cls_rgw_obj_key> remove_instances;
+  set<cls_rgw_obj_key> remove_instances;
   bool need_to_remove = false;
 
   // decode current epoch and instance
@@ -9335,20 +9336,26 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp,
                      << " key=" << entry.key.name << "[" << entry.key.instance << "] "
                      << (entry.delete_marker ? "(delete)" : "") << dendl;
 
-      if (link_epoch == iter->first)
+      if (link_epoch == entry.epoch)
         ldpp_dout(dpp, 1) << "apply_olh_log epoch collision detected for " << entry.key
                           << "; incoming op: " << entry.op << "(" << entry.op_tag << ")" << dendl;
 
       switch (entry.op) {
       case CLS_RGW_OLH_OP_REMOVE_INSTANCE:
-        remove_instances.push_back(entry.key);
+        remove_instances.insert(entry.key);
         break;
       case CLS_RGW_OLH_OP_LINK_OLH:
         // only overwrite a link of the same epoch if its key sorts before
-        if (link_epoch < iter->first || key.instance.empty() ||
+        // or there is a CLS_RGW_OLH_OP_UNLINK_OLH before this op in this batch
+        if (need_to_remove || link_epoch < entry.epoch || key.instance.empty() ||
             key.instance > entry.key.instance) {
           ldpp_dout(dpp, 20) << "apply_olh_log applying key=" << entry.key << " epoch=" << iter->first << " delete_marker=" << entry.delete_marker
               << " over current=" << key << " epoch=" << link_epoch << " delete_marker=" << delete_marker << dendl;
+
+          if (need_to_remove) {
+            // cancel the instance removal if it's linked again -- e.g. coming from a multisite remote zone
+            remove_instances.erase(entry.key);
+          }
           need_to_link = true;
           need_to_remove = false;
           key = entry.key;
@@ -9362,6 +9369,8 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp,
         need_to_remove = true;
         need_to_link = false;
         break;
+      case CLS_RGW_OLH_OP_STALE:
+        break;
       default:
         ldpp_dout(dpp, 0) << "ERROR: apply_olh_log: invalid op: " << (int)entry.op << dendl;
         return -EIO;
@@ -9391,9 +9400,9 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp,
   }
 
   /* first remove object instances */
-  for (list<cls_rgw_obj_key>::iterator liter = remove_instances.begin();
+  for (set<cls_rgw_obj_key>::iterator liter = remove_instances.begin();
        liter != remove_instances.end(); ++liter) {
-    cls_rgw_obj_key& key = *liter;
+    const cls_rgw_obj_key& key = *liter;
     rgw_obj obj_instance(bucket, key);
     int ret = delete_obj(dpp, obj_ctx, bucket_info, obj_instance, 0, y,
                         null_verid, RGW_BILOG_FLAG_VERSIONED_OP,
index f1893547d7ab35ff9a49f528afa178fbafa975fe..fbeae3a7142832d4d9238f92d33a67bfe656cc85 100644 (file)
@@ -1695,6 +1695,103 @@ def test_bucket_sync_disable_enable():
 
     zonegroup_data_checkpoint(zonegroup_conns)
 
+@attr('bucket_sync_disable')
+def test_versioned_bucket_sync_disable_enable_object_delete():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+
+    primary = zonegroup_conns.rw_zones[0]
+    secondary = zonegroup_conns.rw_zones[1]
+
+    # create a bucket
+    bucket = primary.create_bucket(gen_bucket_name())
+    log.debug('created bucket=%s', bucket.name)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # enable versioning
+    primary.s3_client.put_bucket_versioning(
+        Bucket=bucket.name,
+        VersioningConfiguration={'Status': 'Enabled'}
+    )
+    zonegroup_meta_checkpoint(zonegroup)
+
+    obj = 'obj'
+
+    # upload an initial object
+    resp1 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+    version_id_1 = resp1.get('VersionId', 'null')
+    log.debug('created initial version id=%s', version_id_1)
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    # upload the second version
+    resp2 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+    version_id_2 = resp2['VersionId']
+    log.debug('created new version id=%s', version_id_2)
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    # upload the third version
+    resp3 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+    version_id_3 = resp3['VersionId']
+    log.debug('created new version id=%s', version_id_3)
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    # test deleting the non-head from the secondary
+
+    log.debug(f"Disabling bucket sync for bucket:{bucket.name}")
+    disable_bucket_sync(realm.meta_master_zone(), bucket.name)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # upload the fourth version - do this before the following object delete
+    # so it has a slightly smaller epoch
+    resp4 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+    version_id_4 = resp4['VersionId']
+    log.debug('created new version id=%s', version_id_4)
+
+    # Delete the second object version
+    cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_2]
+    secondary.zone.cluster.admin(cmd)
+
+    log.debug(f"Enabling bucket sync for bucket:{bucket.name}")
+    enable_bucket_sync(realm.meta_master_zone(), bucket.name)
+
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    # test deleting the head from the secondary
+
+    log.debug(f"Disabling bucket sync for bucket:{bucket.name}")
+    disable_bucket_sync(realm.meta_master_zone(), bucket.name)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # Delete the fourth object version
+    cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_4]
+    secondary.zone.cluster.admin(cmd)
+
+    log.debug(f"Enabling bucket sync for bucket:{bucket.name}")
+    enable_bucket_sync(realm.meta_master_zone(), bucket.name)
+
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    # test deleting everything from the secondary
+
+    log.debug(f"Disabling bucket sync for bucket:{bucket.name}")
+    disable_bucket_sync(realm.meta_master_zone(), bucket.name)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # Delete all object versions
+    cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_1]
+    secondary.zone.cluster.admin(cmd)
+    cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_2]
+    secondary.zone.cluster.admin(cmd)
+    cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_3]
+    secondary.zone.cluster.admin(cmd)
+    cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_4]
+    secondary.zone.cluster.admin(cmd)
+
+    log.debug(f"Enabling bucket sync for bucket:{bucket.name}")
+    enable_bucket_sync(realm.meta_master_zone(), bucket.name)
+
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
 def test_multipart_object_sync():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)