From: Jane Zhu Date: Wed, 4 Mar 2026 23:39:06 +0000 (+0000) Subject: rgw: break the coupling of olh epoch and epochs of olh ops, and make the epochs of... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=75c7b8ece796b813ce734a14b7354593d03d3cfc;p=ceph.git rgw: break the coupling of olh epoch and epochs of olh ops, and make the epochs of olh ops ever-increasing Signed-off-by: Jane Zhu --- diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index fbe5525a40cb..0630c01922b8 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -1399,7 +1399,7 @@ static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, rgw_buck static void update_olh_log(rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, const string& op_tag, cls_rgw_obj_key& key, bool delete_marker, uint64_t epoch) { - vector& log = olh_data_entry.pending_log[olh_data_entry.epoch]; + vector& log = olh_data_entry.pending_log[epoch]; rgw_bucket_olh_log_entry log_entry; log_entry.epoch = epoch; log_entry.op = op; @@ -1544,7 +1544,7 @@ public: int write(uint64_t epoch, bool current, rgw_bucket_dir_header& header) { if (instance_entry.versioned_epoch > 0) { - CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch); + CLS_LOG(20, "%s: instance_entry.versioned_epoch=%lu epoch=%lu", __func__, instance_entry.versioned_epoch, epoch); /* this instance has a previous list entry, remove that entry */ int ret = unlink_list_entry(header); if (ret < 0) { @@ -1641,14 +1641,17 @@ public: * This timestamp is then used later on to guard against OLH updates for add/remove instance ops that happened *before* * the latest op that updated the OLH entry. * @param candidate_epoch - this is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync; + * @param replace - true to replace the epoch if larger than the old one; */ - bool start_modify (uint64_t candidate_epoch) { + bool start_modify (uint64_t candidate_epoch, bool replace = true) { // only update the olh.epoch if it is newer than the current one. if (candidate_epoch < olh_data_entry.epoch) { return false; /* olh cannot be modified, old epoch */ } - olh_data_entry.epoch = candidate_epoch; + if (replace) { + olh_data_entry.epoch = candidate_epoch; + } return true; } @@ -1656,6 +1659,10 @@ public: return olh_data_entry.epoch; } + void set_epoch(uint64_t epoch) { + olh_data_entry.epoch = epoch; + } + rgw_bucket_olh_entry& get_entry() { return olh_data_entry; } @@ -1725,7 +1732,8 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, cls_rgw_obj_key& key, bool demote_current, bool instance_only, - rgw_bucket_dir_header& header) + rgw_bucket_dir_header& header, + uint64_t& versioned_epoch) { if (!key.instance.empty()) { return -EINVAL; @@ -1741,7 +1749,8 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, return ret; } - entry.versioned_epoch = 1; /* converted entries are always 1 */ + entry.versioned_epoch = versioned_epoch = + duration_cast(entry.meta.mtime.time_since_epoch()).count(); entry.flags |= rgw_bucket_dir_entry::FLAG_VER; if (demote_current) { @@ -1899,8 +1908,8 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer const uint64_t prev_epoch = olh.get_epoch(); // op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync; - uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch : - duration_cast(obj.mtime().time_since_epoch()).count(); + uint64_t now_epoch = duration_cast(real_clock::now().time_since_epoch()).count(); + uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch : now_epoch; if (olh.start_modify(candidate_epoch)) { // promote this version to current if it's a newer epoch, or if it matches the // current epoch and sorts after the current instance @@ -1940,21 +1949,22 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer } else { bool instance_only = (op.key.instance.empty() && op.delete_marker); cls_rgw_obj_key key(op.key.name); - ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header); + uint64_t versioned_epoch = candidate_epoch; + ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header, versioned_epoch); if (ret < 0) { CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret); return ret; } olh.set_tag(op.olh_tag); if (op.key.instance.empty()) { - obj.set_epoch(1); + obj.set_epoch(versioned_epoch); } } /* update the olh log */ - olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker); + olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker, now_epoch); if (removing) { - olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false); + olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch); } if (promote) { @@ -1969,8 +1979,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer } ret = olh.write(header); - } - else { + } else { ret = obj.write(candidate_epoch, false, header); if (ret < 0) { return ret; @@ -1980,9 +1989,11 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer // the epoch is already stale compared to the current - so no point in applying it; if (removing) { - olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch); - ret = olh.write(header); + olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch); + } else { + olh.update_log(CLS_RGW_OLH_OP_STALE, op.op_tag, op.key, false, now_epoch); } + ret = olh.write(header); } if (ret < 0) { @@ -2070,10 +2081,14 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, return ret; } + uint64_t now_epoch = duration_cast(real_clock::now().time_since_epoch()).count(); + uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch : now_epoch; + if (!olh_found) { bool instance_only = false; cls_rgw_obj_key key(dest_key.name); - ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, header); + uint64_t versioned_epoch = candidate_epoch - 1; + ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, header, versioned_epoch); if (ret < 0) { CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret); return ret; @@ -2081,13 +2096,11 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, olh.update(dest_key, false); olh.set_tag(op.olh_tag); - obj.set_epoch(1); + obj.set_epoch(versioned_epoch); } // op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync; - uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch : - duration_cast(real_clock::now().time_since_epoch()).count(); - if (olh.start_modify(candidate_epoch)) { + if (olh.start_modify(candidate_epoch, false)) { rgw_bucket_olh_entry &olh_entry = olh.get_entry(); cls_rgw_obj_key &olh_key = olh_entry.key; CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__, @@ -2105,7 +2118,14 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, if (found) { BIVerObjEntry next(hctx, next_key); - ret = next.write(olh.get_epoch(), true, header); + ret = next.init(); + if (ret < 0) { + CLS_LOG(0, "ERROR: next.init() returned ret=%d", ret); + return ret; + } + + uint64_t next_epoch = next.get_dir_entry().versioned_epoch; + ret = next.write(next_epoch, true, header); if (ret < 0) { CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret); return ret; @@ -2115,21 +2135,28 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, next_key.name.c_str(), next_key.instance.c_str(), (int) next.is_delete_marker()); olh.update(next_key, next.is_delete_marker()); - olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker()); + olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker(), now_epoch); + // use the next entry's versioned_epoch in the olh entry since it's the new head now + olh.set_epoch(next_epoch); } else { // next_key is empty, but we need to preserve its name in case this entry // gets resharded, because this key is used for hash placement next_key.name = dest_key.name; olh.update(next_key, false); - olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false); + if (olh.get_epoch() == 0) { + olh.set_epoch(candidate_epoch); + } + olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false, now_epoch); olh.set_exists(false); olh.set_pending_removal(true); } } if (!obj.is_delete_marker()) { - olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false); + olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch); } else { + olh.update_log(CLS_RGW_OLH_OP_STALE, op.op_tag, op.key, false, now_epoch); + /* this is a delete marker, it's our responsibility to remove its * instance entry */ ret = obj.unlink(header, op.key); @@ -2150,10 +2177,17 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, } if (obj.is_delete_marker()) { - return 0; + olh.update_log(CLS_RGW_OLH_OP_STALE, op.op_tag, op.key, false, now_epoch); + + ret = olh.write(header); + if (ret < 0) { + CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret); + } + + return ret; } - olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch); + olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch); } ret = olh.write(header); diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc index ac89cf51e39a..036a63dfe030 100644 --- a/src/cls/rgw/cls_rgw_types.cc +++ b/src/cls/rgw/cls_rgw_types.cc @@ -558,6 +558,9 @@ void rgw_bucket_olh_log_entry::dump(Formatter *f) const case CLS_RGW_OLH_OP_REMOVE_INSTANCE: op_str = "remove_instance"; break; + case CLS_RGW_OLH_OP_STALE: + op_str = "stale_olh_op"; + break; default: op_str = "unknown"; } @@ -578,6 +581,8 @@ void rgw_bucket_olh_log_entry::decode_json(JSONObj *obj) op = CLS_RGW_OLH_OP_UNLINK_OLH; } else if (op_str == "remove_instance") { op = CLS_RGW_OLH_OP_REMOVE_INSTANCE; + } else if (op_str == "stale_olh_op") { + op = CLS_RGW_OLH_OP_STALE; } else { op = CLS_RGW_OLH_OP_UNKNOWN; } diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index 6fe5e4e251d1..069f5e3361f3 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -517,6 +517,8 @@ enum OLHLogOp { CLS_RGW_OLH_OP_UNLINK_OLH = 2, /* object does not exist */ // remove a specific instance of an object, such as . CLS_RGW_OLH_OP_REMOVE_INSTANCE = 3, + // a stale op to be used to cleanup olh.pending attribute of the olh object + CLS_RGW_OLH_OP_STALE = 4, }; struct rgw_bucket_olh_log_entry { diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 505a43a7940d..26b983b8d284 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -8656,6 +8656,7 @@ int RGWRados::olh_init_modification_impl(const DoutPrefixProvider *dpp, const RG string attr_name = RGW_ATTR_OLH_PENDING_PREFIX; attr_name.append(*op_tag); + ldpp_dout(dpp, 20) << __func__ << " adding olh pending attr: " << attr_name << dendl; op.setxattr(attr_name.c_str(), bl); int ret = obj_operate(dpp, bucket_info, olh_obj, std::move(op), y); @@ -9305,7 +9306,7 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp, uint64_t link_epoch = 0; cls_rgw_obj_key key; bool delete_marker = false; - list remove_instances; + set remove_instances; bool need_to_remove = false; // decode current epoch and instance @@ -9335,20 +9336,26 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp, << " key=" << entry.key.name << "[" << entry.key.instance << "] " << (entry.delete_marker ? "(delete)" : "") << dendl; - if (link_epoch == iter->first) + if (link_epoch == entry.epoch) ldpp_dout(dpp, 1) << "apply_olh_log epoch collision detected for " << entry.key << "; incoming op: " << entry.op << "(" << entry.op_tag << ")" << dendl; switch (entry.op) { case CLS_RGW_OLH_OP_REMOVE_INSTANCE: - remove_instances.push_back(entry.key); + remove_instances.insert(entry.key); break; case CLS_RGW_OLH_OP_LINK_OLH: // only overwrite a link of the same epoch if its key sorts before - if (link_epoch < iter->first || key.instance.empty() || + // or there is a CLS_RGW_OLH_OP_UNLINK_OLH before this op in this batch + if (need_to_remove || link_epoch < entry.epoch || key.instance.empty() || key.instance > entry.key.instance) { ldpp_dout(dpp, 20) << "apply_olh_log applying key=" << entry.key << " epoch=" << iter->first << " delete_marker=" << entry.delete_marker << " over current=" << key << " epoch=" << link_epoch << " delete_marker=" << delete_marker << dendl; + + if (need_to_remove) { + // cancel the instance removal if it's linked again -- e.g. coming from a multisite remote zone + remove_instances.erase(entry.key); + } need_to_link = true; need_to_remove = false; key = entry.key; @@ -9362,6 +9369,8 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp, need_to_remove = true; need_to_link = false; break; + case CLS_RGW_OLH_OP_STALE: + break; default: ldpp_dout(dpp, 0) << "ERROR: apply_olh_log: invalid op: " << (int)entry.op << dendl; return -EIO; @@ -9391,9 +9400,9 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp, } /* first remove object instances */ - for (list::iterator liter = remove_instances.begin(); + for (set::iterator liter = remove_instances.begin(); liter != remove_instances.end(); ++liter) { - cls_rgw_obj_key& key = *liter; + const cls_rgw_obj_key& key = *liter; rgw_obj obj_instance(bucket, key); int ret = delete_obj(dpp, obj_ctx, bucket_info, obj_instance, 0, y, null_verid, RGW_BILOG_FLAG_VERSIONED_OP, diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index f1893547d7ab..fbeae3a71428 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -1695,6 +1695,103 @@ def test_bucket_sync_disable_enable(): zonegroup_data_checkpoint(zonegroup_conns) +@attr('bucket_sync_disable') +def test_versioned_bucket_sync_disable_enable_object_delete(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + primary = zonegroup_conns.rw_zones[0] + secondary = zonegroup_conns.rw_zones[1] + + # create a bucket + bucket = primary.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # enable versioning + primary.s3_client.put_bucket_versioning( + Bucket=bucket.name, + VersioningConfiguration={'Status': 'Enabled'} + ) + zonegroup_meta_checkpoint(zonegroup) + + obj = 'obj' + + # upload an initial object + resp1 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='') + version_id_1 = resp1.get('VersionId', 'null') + log.debug('created initial version id=%s', version_id_1) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # upload the second version + resp2 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='') + version_id_2 = resp2['VersionId'] + log.debug('created new version id=%s', version_id_2) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # upload the third version + resp3 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='') + version_id_3 = resp3['VersionId'] + log.debug('created new version id=%s', version_id_3) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # test deleting the non-head from the secondary + + log.debug(f"Disabling bucket sync for bucket:{bucket.name}") + disable_bucket_sync(realm.meta_master_zone(), bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # upload the fourth version - do this before the following object delete + # so it has a slightly smaller epoch + resp4 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='') + version_id_4 = resp4['VersionId'] + log.debug('created new version id=%s', version_id_4) + + # Delete the second object version + cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_2] + secondary.zone.cluster.admin(cmd) + + log.debug(f"Enabling bucket sync for bucket:{bucket.name}") + enable_bucket_sync(realm.meta_master_zone(), bucket.name) + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # test deleting the head from the secondary + + log.debug(f"Disabling bucket sync for bucket:{bucket.name}") + disable_bucket_sync(realm.meta_master_zone(), bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # Delete the fourth object version + cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_4] + secondary.zone.cluster.admin(cmd) + + log.debug(f"Enabling bucket sync for bucket:{bucket.name}") + enable_bucket_sync(realm.meta_master_zone(), bucket.name) + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # test deleting everything from the secondary + + log.debug(f"Disabling bucket sync for bucket:{bucket.name}") + disable_bucket_sync(realm.meta_master_zone(), bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # Delete all object versions + cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_1] + secondary.zone.cluster.admin(cmd) + cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_2] + secondary.zone.cluster.admin(cmd) + cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_3] + secondary.zone.cluster.admin(cmd) + cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_4] + secondary.zone.cluster.admin(cmd) + + log.debug(f"Enabling bucket sync for bucket:{bucket.name}") + enable_bucket_sync(realm.meta_master_zone(), bucket.name) + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + def test_multipart_object_sync(): zonegroup = realm.master_zonegroup() zonegroup_conns = ZonegroupConns(zonegroup)