From: Alex Wojno Date: Tue, 23 Apr 2024 17:58:05 +0000 (-0400) Subject: rgw: implement x-amz-replication-status for PENDING & COMPLETED X-Git-Tag: v20.0.0~1605^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3161e24894afafa43cbf0bf7a3b316a6d05b0420;p=ceph.git rgw: implement x-amz-replication-status for PENDING & COMPLETED Signed-off-by: Alex Wojno --- diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.cc b/src/rgw/driver/d4n/rgw_sal_d4n.cc index 1afd6822fe2..7a5d6402067 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.cc +++ b/src/rgw/driver/d4n/rgw_sal_d4n.cc @@ -210,7 +210,7 @@ int D4NFilterObject::copy_object(const ACLOwner& owner, } int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, - Attrs* delattrs, optional_yield y) + Attrs* delattrs, optional_yield y, uint32_t flags) { if (setattrs != NULL) { /* Ensure setattrs and delattrs do not overlap */ @@ -241,7 +241,7 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl; } - return next->set_obj_attrs(dpp, setattrs, delattrs, y); + return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags); } int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, diff --git a/src/rgw/driver/d4n/rgw_sal_d4n.h b/src/rgw/driver/d4n/rgw_sal_d4n.h index 42436b92d1d..3d3a1125e02 100644 --- a/src/rgw/driver/d4n/rgw_sal_d4n.h +++ b/src/rgw/driver/d4n/rgw_sal_d4n.h @@ -195,7 +195,7 @@ class D4NFilterObject : public FilterObject { const DoutPrefixProvider* dpp, optional_yield y) override; virtual const std::string &get_name() const override { return next->get_name(); } virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, - Attrs* delattrs, optional_yield y) override; + Attrs* delattrs, optional_yield y, uint32_t flags) override; virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override; virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, diff --git a/src/rgw/driver/daos/rgw_sal_daos.cc b/src/rgw/driver/daos/rgw_sal_daos.cc index f8f60d82d02..d39422e4da8 100644 --- a/src/rgw/driver/daos/rgw_sal_daos.cc +++ b/src/rgw/driver/daos/rgw_sal_daos.cc @@ -900,7 +900,7 @@ int DaosObject::get_obj_state(const DoutPrefixProvider* dpp, DaosObject::~DaosObject() { close(nullptr); } int DaosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, - Attrs* delattrs, optional_yield y) { + Attrs* delattrs, optional_yield y, uint32_t flags) { ldpp_dout(dpp, 20) << "DEBUG: DaosObject::set_obj_attrs()" << dendl; // TODO handle target_obj // Get object's metadata (those stored in rgw_bucket_dir_entry) @@ -959,7 +959,7 @@ int DaosObject::delete_obj_attrs(const DoutPrefixProvider* dpp, bufferlist bl; rmattr[attr_name] = bl; - return set_obj_attrs(dpp, nullptr, &rmattr, y); + return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP); } bool DaosObject::is_expired() { diff --git a/src/rgw/driver/daos/rgw_sal_daos.h b/src/rgw/driver/daos/rgw_sal_daos.h index cf1583fc174..8c72fa68fa6 100644 --- a/src/rgw/driver/daos/rgw_sal_daos.h +++ b/src/rgw/driver/daos/rgw_sal_daos.h @@ -620,7 +620,7 @@ class DaosObject : public StoreObject { virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState** state, optional_yield y, bool follow_olh = true) override; virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, - Attrs* delattrs, optional_yield y) override; + Attrs* delattrs, optional_yield y, uint32_t flags) override; virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override; virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, diff --git a/src/rgw/driver/motr/rgw_sal_motr.cc b/src/rgw/driver/motr/rgw_sal_motr.cc index ae86ec9e7d3..419e613d0f8 100644 --- a/src/rgw/driver/motr/rgw_sal_motr.cc +++ b/src/rgw/driver/motr/rgw_sal_motr.cc @@ -1182,7 +1182,7 @@ MotrObject::~MotrObject() { // return read_op.prepare(dpp); // } -int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) +int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) { // TODO: implement ldpp_dout(dpp, 20) <<__func__<< ": MotrObject::set_obj_attrs()" << dendl; @@ -1238,7 +1238,7 @@ int MotrObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, op } set_atomic(); state.attrset[attr_name] = attr_val; - return set_obj_attrs(dpp, &state.attrset, nullptr, y); + return set_obj_attrs(dpp, &state.attrset, nullptr, y, rgw::sal::FLAG_LOG_OP); } int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) @@ -1249,7 +1249,7 @@ int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr set_atomic(); rmattr[attr_name] = bl; - return set_obj_attrs(dpp, nullptr, &rmattr, y); + return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP); } bool MotrObject::is_expired() { diff --git a/src/rgw/driver/motr/rgw_sal_motr.h b/src/rgw/driver/motr/rgw_sal_motr.h index e278728c7e7..233e99e8f59 100644 --- a/src/rgw/driver/motr/rgw_sal_motr.h +++ b/src/rgw/driver/motr/rgw_sal_motr.h @@ -677,7 +677,7 @@ class MotrObject : public StoreObject { virtual RGWAccessControlPolicy& get_acl(void) override { return acls; } virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; } virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override; - virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override; + virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override; virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override; virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override; virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override; diff --git a/src/rgw/driver/posix/rgw_sal_posix.cc b/src/rgw/driver/posix/rgw_sal_posix.cc index 9b1b34fa9e4..8bb0df5dca1 100644 --- a/src/rgw/driver/posix/rgw_sal_posix.cc +++ b/src/rgw/driver/posix/rgw_sal_posix.cc @@ -1555,7 +1555,7 @@ int POSIXObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **psta } int POSIXObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, - Attrs* delattrs, optional_yield y) + Attrs* delattrs, optional_yield y, uint32_t flags) { if (delattrs) { for (auto& it : *delattrs) { @@ -2420,7 +2420,7 @@ int POSIXObject::copy(const DoutPrefixProvider *dpp, optional_yield y, return ret; } - ret = dobj->set_obj_attrs(dpp, &get_attrs(), NULL, y); + ret = dobj->set_obj_attrs(dpp, &get_attrs(), NULL, y, rgw::sal::FLAG_LOG_OP); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: could not write attrs to dest object " << dobj->get_name() << dendl; @@ -2529,7 +2529,7 @@ int POSIXMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, attrs[RGW_POSIX_ATTR_MPUPLOAD] = bl; - return meta_obj->set_obj_attrs(dpp, &attrs, nullptr, y); + return meta_obj->set_obj_attrs(dpp, &attrs, nullptr, y, rgw::sal::FLAG_LOG_OP); } int POSIXMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct, diff --git a/src/rgw/driver/posix/rgw_sal_posix.h b/src/rgw/driver/posix/rgw_sal_posix.h index 587bf783e90..22445d06667 100644 --- a/src/rgw/driver/posix/rgw_sal_posix.h +++ b/src/rgw/driver/posix/rgw_sal_posix.h @@ -341,7 +341,7 @@ public: virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; } virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override; virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, - Attrs* delattrs, optional_yield y) override; + Attrs* delattrs, optional_yield y, uint32_t flags) override; virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override; virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, diff --git a/src/rgw/driver/rados/rgw_bucket_sync.cc b/src/rgw/driver/rados/rgw_bucket_sync.cc index dafbb6df46f..d59d79a0085 100644 --- a/src/rgw/driver/rados/rgw_bucket_sync.cc +++ b/src/rgw/driver/rados/rgw_bucket_sync.cc @@ -984,6 +984,19 @@ void RGWBucketSyncPolicyHandler::get_pipes(std::set *_sour } } +bool RGWBucketSyncPolicyHandler::bucket_exports_object(const std::string& obj_name, const RGWObjTags& tags) { + if (bucket_exports_data()) { + for (auto& entry : target_pipes.pipe_map) { + auto& filter = entry.second.params.source.filter; + if (filter.check_prefix(obj_name) && filter.check_tags(tags.get_tags())) { + return true; + } + } + } + + return false; +} + bool RGWBucketSyncPolicyHandler::bucket_exports_data() const { if (!bucket) { diff --git a/src/rgw/driver/rados/rgw_bucket_sync.h b/src/rgw/driver/rados/rgw_bucket_sync.h index d425ecf1732..db542833792 100644 --- a/src/rgw/driver/rados/rgw_bucket_sync.h +++ b/src/rgw/driver/rados/rgw_bucket_sync.h @@ -402,6 +402,7 @@ public: return target_hints; } + bool bucket_exports_object(const std::string& obj_name, const RGWObjTags& tags); bool bucket_exports_data() const; bool bucket_imports_data() const; diff --git a/src/rgw/driver/rados/rgw_rados.cc b/src/rgw/driver/rados/rgw_rados.cc index 95f05f149a0..3ff6ff2201e 100644 --- a/src/rgw/driver/rados/rgw_rados.cc +++ b/src/rgw/driver/rados/rgw_rados.cc @@ -5470,7 +5470,7 @@ static int resync_encrypted_multipart(const DoutPrefixProvider* dpp, }; return store->set_attrs(dpp, &obj_ctx, bucket_info, state.obj, - add_attrs, nullptr, y, set_mtime); + add_attrs, nullptr, y, true, set_mtime); } static void try_resync_encrypted_multipart(const DoutPrefixProvider* dpp, @@ -6469,13 +6469,14 @@ int RGWRados::set_attr(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBuc { map attrs; attrs[name] = bl; - return set_attrs(dpp, octx, bucket_info, obj, attrs, NULL, y); + return set_attrs(dpp, octx, bucket_info, obj, attrs, NULL, y, true); } int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBucketInfo& bucket_info, const rgw_obj& src_obj, map& attrs, map* rmattrs, optional_yield y, + bool log_op, ceph::real_time set_mtime /* = zero() */) { rgw_obj obj = src_obj; @@ -6547,7 +6548,7 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu string tag; append_rand_alpha(cct, tag, tag, 32); state->write_tag = tag; - r = index_op.prepare(dpp, CLS_RGW_OP_ADD, &state->write_tag, y); + r = index_op.prepare(dpp, CLS_RGW_OP_ADD, &state->write_tag, y, log_op); if (r < 0) return r; @@ -6591,9 +6592,9 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu int64_t poolid = ioctx.get_id(); r = index_op.complete(dpp, poolid, epoch, state->size, state->accounted_size, mtime, etag, content_type, storage_class, owner, - RGWObjCategory::Main, nullptr, y); + RGWObjCategory::Main, nullptr, y, nullptr, false, log_op); } else { - int ret = index_op.cancel(dpp, nullptr, y); + int ret = index_op.cancel(dpp, nullptr, y, log_op); if (ret < 0) { ldpp_dout(dpp, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl; } diff --git a/src/rgw/driver/rados/rgw_rados.h b/src/rgw/driver/rados/rgw_rados.h index 481a94a140d..b182b888582 100644 --- a/src/rgw/driver/rados/rgw_rados.h +++ b/src/rgw/driver/rados/rgw_rados.h @@ -1284,6 +1284,7 @@ public: std::map& attrs, std::map* rmattrs, optional_yield y, + bool log_op, ceph::real_time set_mtime = ceph::real_clock::zero()); int get_obj_state(const DoutPrefixProvider *dpp, RGWObjectCtx *rctx, diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 78f76218e84..aa1e5c902b6 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -59,6 +59,8 @@ #include "services/svc_meta.h" #include "services/svc_meta_be_sobj.h" #include "services/svc_cls.h" +#include "services/svc_bilog_rados.h" +#include "services/svc_bi_rados.h" #include "services/svc_zone.h" #include "services/svc_tier_rados.h" #include "services/svc_quota.h" @@ -2238,6 +2240,40 @@ RadosObject::~RadosObject() delete rados_ctx; } +bool RadosObject::is_sync_completed(const DoutPrefixProvider* dpp, + const ceph::real_time& obj_mtime) +{ + const auto& bucket_info = get_bucket()->get_info(); + if (bucket_info.is_indexless()) { + ldpp_dout(dpp, 0) << "ERROR: Trying to check object replication status for object in an indexless bucket. obj=" << get_key() << dendl; + return false; + } + + const auto& log_layout = bucket_info.layout.logs.front(); + const uint32_t shard_count = num_shards(log_to_index_layout(log_layout)); + + std::string marker; + bool truncated; + list entries; + + const int shard_id = RGWSI_BucketIndex_RADOS::bucket_shard_index(get_key(), shard_count); + + int ret = store->svc()->bilog_rados->log_list(dpp, bucket_info, log_layout, shard_id, + marker, 1, entries, &truncated); + + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: Failed to retrieve bilog info for obj=" << get_key() << dendl; + return false; + } + + if (entries.empty()) { + return true; + } + + const rgw_bi_log_entry& earliest_marker = entries.front(); + return earliest_marker.timestamp > obj_mtime; +} + int RadosObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **pstate, optional_yield y, bool follow_olh) { int ret = store->getRados()->get_obj_state(dpp, rados_ctx, bucket->get_info(), get_obj(), pstate, &manifest, follow_olh, y); @@ -2268,7 +2304,7 @@ int RadosObject::read_attrs(const DoutPrefixProvider* dpp, RGWRados::Object::Rea return read_op.prepare(y, dpp); } -int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) +int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) { Attrs empty; return store->getRados()->set_attrs(dpp, rados_ctx, @@ -2276,7 +2312,7 @@ int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, A get_obj(), setattrs ? *setattrs : empty, delattrs ? delattrs : nullptr, - y); + y, flags & rgw::sal::FLAG_LOG_OP); } int RadosObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj) @@ -2300,7 +2336,7 @@ int RadosObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, o state.obj = target; set_atomic(); state.attrset[attr_name] = attr_val; - r = set_obj_attrs(dpp, &state.attrset, nullptr, y); + r = set_obj_attrs(dpp, &state.attrset, nullptr, y, rgw::sal::FLAG_LOG_OP); /* Restore target */ state.obj = save; @@ -2314,7 +2350,7 @@ int RadosObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* att set_atomic(); rmattr[attr_name] = bl; - return set_obj_attrs(dpp, nullptr, &rmattr, y); + return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP); } bool RadosObject::is_expired() { @@ -2467,7 +2503,7 @@ int RadosObject::chown(User& new_user, const DoutPrefixProvider* dpp, optional_y set_atomic(); map attrs; attrs[RGW_ATTR_ACL] = bl; - r = set_obj_attrs(dpp, &attrs, nullptr, y); + r = set_obj_attrs(dpp, &attrs, nullptr, y, rgw::sal::FLAG_LOG_OP); if (r < 0) { ldpp_dout(dpp, 0) << "ERROR: modify attr failed " << cpp_strerror(-r) << dendl; return r; diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 78bd849717f..a056b08a832 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -585,8 +585,10 @@ class RadosObject : public StoreObject { StoreObject::set_compressed(); } + virtual bool is_sync_completed(const DoutPrefixProvider* dpp, + const ceph::real_time& obj_mtime) override; virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override; - virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override; + virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override; virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override; virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override; virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 602280182d7..6045a9a60f3 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -57,6 +57,7 @@ #include "rgw_lua_data_filter.h" #include "rgw_lua.h" #include "rgw_iam_managed_policy.h" +#include "rgw_bucket_sync.h" #include "services/svc_zone.h" #include "services/svc_quota.h" @@ -908,6 +909,27 @@ void rgw_build_iam_environment(rgw::sal::Driver* driver, } } +void handle_replication_status_header( + const DoutPrefixProvider *dpp, + rgw::sal::Attrs& attrs, + req_state* s, + const ceph::real_time &obj_mtime) { + auto attr_iter = attrs.find(RGW_ATTR_OBJ_REPLICATION_STATUS); + if (attr_iter != attrs.end() && attr_iter->second.to_str() == "PENDING") { + if (s->object->is_sync_completed(dpp, obj_mtime)) { + s->object->set_atomic(); + rgw::sal::Attrs setattrs, rmattrs; + bufferlist bl; + bl.append("COMPLETED"); + setattrs[RGW_ATTR_OBJ_REPLICATION_STATUS] = bl; + int ret = s->object->set_obj_attrs(dpp, &setattrs, &rmattrs, s->yield, 0); + if (ret == 0) { + ldpp_dout(dpp, 20) << *s->object << " has amz-replication-status header set to COMPLETED" << dendl; + } + } + } +} + /* * GET on CloudTiered objects is processed only when sent from the sync client. * In all other cases, fail with `ERR_INVALID_OBJECT_STATE`. @@ -2326,6 +2348,7 @@ void RGWGetObj::execute(optional_yield y) } #endif + op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info); if (op_ret < 0) { ldpp_dout(this, 0) << "ERROR: failed to decode compression info, cannot decompress" << dendl; @@ -2344,6 +2367,8 @@ void RGWGetObj::execute(optional_yield y) filter = &*decompress; } + handle_replication_status_header(this, attrs, s, lastmod); + attr_iter = attrs.find(RGW_ATTR_OBJ_REPLICATION_TRACE); if (attr_iter != attrs.end()) { try { @@ -2361,6 +2386,7 @@ void RGWGetObj::execute(optional_yield y) } catch (const buffer::error&) {} } + if (get_type() == RGW_OP_GET_OBJ && get_data) { op_ret = handle_cloudtier_obj(attrs, sync_cloudtiered); if (op_ret < 0) { @@ -4480,6 +4506,19 @@ void RGWPutObj::execute(optional_yield y) } } + RGWBucketSyncPolicyHandlerRef policy_handler; + op_ret = driver->get_sync_policy_handler(this, std::nullopt, s->bucket->get_key(), &policy_handler, s->yield); + + if (op_ret < 0) { + ldpp_dout(this, 0) << "failed to read sync policy for bucket: " << s->bucket << dendl; + return; + } + if (policy_handler && policy_handler->bucket_exports_object(s->object->get_name(), *obj_tags)) { + bufferlist repl_bl; + repl_bl.append("PENDING"); + emplace_attr(RGW_ATTR_OBJ_REPLICATION_STATUS, std::move(repl_bl)); + } + if (slo_info) { bufferlist manifest_bl; encode(*slo_info, manifest_bl); @@ -5037,7 +5076,7 @@ void RGWPutMetadataObject::execute(optional_yield y) } } - op_ret = s->object->set_obj_attrs(this, &attrs, &rmattrs, s->yield); + op_ret = s->object->set_obj_attrs(this, &attrs, &rmattrs, s->yield, rgw::sal::FLAG_LOG_OP); } int RGWDeleteObj::handle_slo_manifest(bufferlist& bl, optional_yield y) @@ -7678,7 +7717,7 @@ void RGWRMAttrs::execute(optional_yield y) s->object->set_atomic(); - op_ret = s->object->set_obj_attrs(this, nullptr, &attrs, y); + op_ret = s->object->set_obj_attrs(this, nullptr, &attrs, y, rgw::sal::FLAG_LOG_OP); if (op_ret < 0) { ldpp_dout(this, 0) << "ERROR: failed to delete obj attrs, obj=" << s->object << " ret=" << op_ret << dendl; @@ -7715,7 +7754,7 @@ void RGWSetAttrs::execute(optional_yield y) if (!rgw::sal::Object::empty(s->object.get())) { rgw::sal::Attrs a(attrs); - op_ret = s->object->set_obj_attrs(this, &a, nullptr, y); + op_ret = s->object->set_obj_attrs(this, &a, nullptr, y, rgw::sal::FLAG_LOG_OP); } else { op_ret = s->bucket->merge_and_store_attrs(this, attrs, y); } diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index e3a892b6992..eab0ad13e08 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -1178,6 +1178,9 @@ class Object { virtual void set_compressed() = 0; /** Check if this object is compressed */ virtual bool is_compressed() = 0; + /** Check if object is synced */ + virtual bool is_sync_completed(const DoutPrefixProvider* dpp, + const ceph::real_time& obj_mtime) = 0; /** Invalidate cached info about this object, except atomic, prefetch, and * compressed */ virtual void invalidate() = 0; @@ -1193,7 +1196,7 @@ class Object { virtual void set_obj_state(RGWObjState& _state) = 0; /** Set attributes for this object from the backing store. Attrs can be set or * deleted. @note the attribute APIs may be revisited in the future. */ - virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) = 0; + virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) = 0; /** Get attributes for this object */ virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) = 0; /** Modify attributes for this object. */ diff --git a/src/rgw/rgw_sal_dbstore.cc b/src/rgw/rgw_sal_dbstore.cc index 2ce6304646e..b6bbcf7a030 100644 --- a/src/rgw/rgw_sal_dbstore.cc +++ b/src/rgw/rgw_sal_dbstore.cc @@ -527,7 +527,7 @@ namespace rgw::sal { return read_op.prepare(dpp); } - int DBObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) + int DBObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) { Attrs empty; DB::Object op_target(store->getDB(), @@ -552,7 +552,7 @@ namespace rgw::sal { } set_atomic(); state.attrset[attr_name] = attr_val; - return set_obj_attrs(dpp, &state.attrset, nullptr, y); + return set_obj_attrs(dpp, &state.attrset, nullptr, y, 0); } int DBObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) @@ -562,7 +562,7 @@ namespace rgw::sal { set_atomic(); rmattr[attr_name] = bl; - return set_obj_attrs(dpp, nullptr, &rmattr, y); + return set_obj_attrs(dpp, nullptr, &rmattr, y, 0); } bool DBObject::is_expired() { diff --git a/src/rgw/rgw_sal_dbstore.h b/src/rgw/rgw_sal_dbstore.h index 4770713e762..516ab47393a 100644 --- a/src/rgw/rgw_sal_dbstore.h +++ b/src/rgw/rgw_sal_dbstore.h @@ -542,7 +542,7 @@ protected: virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; } virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override; - virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override; + virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override; virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override; virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override; virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override; diff --git a/src/rgw/rgw_sal_filter.cc b/src/rgw/rgw_sal_filter.cc index 66a2466040b..0dd76b333fa 100644 --- a/src/rgw/rgw_sal_filter.cc +++ b/src/rgw/rgw_sal_filter.cc @@ -1037,9 +1037,9 @@ int FilterObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **pst } int FilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, - Attrs* delattrs, optional_yield y) + Attrs* delattrs, optional_yield y, uint32_t flags) { - return next->set_obj_attrs(dpp, setattrs, delattrs, y); + return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags); } int FilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index 95d00960cbd..2e9a08ed0b5 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -746,6 +746,8 @@ public: virtual bool is_prefetch_data() override { return next->is_prefetch_data(); } virtual void set_compressed() override { return next->set_compressed(); } virtual bool is_compressed() override { return next->is_compressed(); } + virtual bool is_sync_completed(const DoutPrefixProvider* dpp, + const ceph::real_time& obj_mtime) override { return next->is_sync_completed(dpp, obj_mtime); } virtual void invalidate() override { return next->invalidate(); } virtual bool empty() const override { return next->empty(); } virtual const std::string &get_name() const override { return next->get_name(); } @@ -754,7 +756,7 @@ public: optional_yield y, bool follow_olh = true) override; virtual void set_obj_state(RGWObjState& _state) override { return next->set_obj_state(_state); } virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, - Attrs* delattrs, optional_yield y) override; + Attrs* delattrs, optional_yield y, uint32_t flags) override; virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override; virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h index 8bfff802526..bf192c52dc9 100644 --- a/src/rgw/rgw_sal_store.h +++ b/src/rgw/rgw_sal_store.h @@ -206,6 +206,8 @@ class StoreObject : public Object { virtual bool is_prefetch_data() override { return state.prefetch_data; } virtual void set_compressed() override { state.compressed = true; } virtual bool is_compressed() override { return state.compressed; } + virtual bool is_sync_completed(const DoutPrefixProvider* dpp, + const ceph::real_time& obj_mtime) override { return false; } virtual void invalidate() override { rgw_obj obj = state.obj; bool is_atomic = state.is_atomic; diff --git a/src/rgw/rgw_sync_policy.cc b/src/rgw/rgw_sync_policy.cc index 0568262de67..b65752959e9 100644 --- a/src/rgw/rgw_sync_policy.cc +++ b/src/rgw/rgw_sync_policy.cc @@ -74,6 +74,14 @@ void rgw_sync_pipe_filter::set_prefix(std::optional opt_prefix, } } +bool rgw_sync_pipe_filter::check_prefix(const std::string& obj_name) const +{ + if (prefix.has_value()) { + return boost::starts_with(obj_name, prefix.value()); + } + return true; +} + void rgw_sync_pipe_filter::set_tags(std::list& tags_add, std::list& tags_rm) { diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index ec9d1f2c623..062fb115324 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -244,6 +244,7 @@ struct rgw_sync_pipe_filter { bool check_tag(const std::string& k, const std::string& v) const; bool check_tags(const std::vector& tags) const; bool check_tags(const RGWObjTags::tag_map_t& tags) const; + bool check_prefix(const std::string& obj_name) const; }; WRITE_CLASS_ENCODER(rgw_sync_pipe_filter) diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index f0b36865ed1..cb3f2bf8ed4 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -1921,6 +1921,33 @@ def test_role_delete_sync(): zone.iam_conn.get_role, RoleName=role_name) log.info(f'success, zone: {zone.name} does not have role: {role_name}') +def test_replication_status(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + zone = zonegroup_conns.rw_zones[0] + + bucket = zone.conn.create_bucket(gen_bucket_name()) + obj_name = "a" + k = new_key(zone, bucket.name, obj_name) + k.set_contents_from_string('foo') + zonegroup_meta_checkpoint(zonegroup) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + head_res = zone.head_object(bucket.name, obj_name) + log.info("checking if object has PENDING ReplicationStatus") + assert(head_res["ReplicationStatus"] == "PENDING") + + bilog_autotrim(zone.zone) + zonegroup_data_checkpoint(zonegroup_conns) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + head_res = zone.head_object(bucket.name, obj_name) + log.info("checking if object has COMPLETED ReplicationStatus") + assert(head_res["ReplicationStatus"] == "COMPLETED") + + log.info("checking that ReplicationStatus update did not write a bilog") + bilog = bilog_list(zone.zone, bucket.name) + assert(len(bilog) == 0) @attr('data_sync_init') def test_bucket_full_sync_after_data_sync_init(): diff --git a/src/test/rgw/rgw_multi/zone_rados.py b/src/test/rgw/rgw_multi/zone_rados.py index 3761676a3d7..ce0530543e0 100644 --- a/src/test/rgw/rgw_multi/zone_rados.py +++ b/src/test/rgw/rgw_multi/zone_rados.py @@ -170,6 +170,9 @@ class RadosZone(Zone): return out['TopicConfigurations'] return [] + def head_object(self, bucket_name, obj_name): + return self.s3_client.head_object(Bucket=bucket_name, Key=obj_name) + def get_conn(self, credentials): return self.Conn(self, credentials)