]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement x-amz-replication-status for PENDING & COMPLETED
authorAlex Wojno <awojno@bloomberg.net>
Tue, 23 Apr 2024 17:58:05 +0000 (13:58 -0400)
committerAlex Wojno <awojno@bloomberg.net>
Mon, 13 May 2024 18:59:24 +0000 (14:59 -0400)
Signed-off-by: Alex Wojno <awojno@bloomberg.net>
25 files changed:
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/driver/daos/rgw_sal_daos.cc
src/rgw/driver/daos/rgw_sal_daos.h
src/rgw/driver/motr/rgw_sal_motr.cc
src/rgw/driver/motr/rgw_sal_motr.h
src/rgw/driver/posix/rgw_sal_posix.cc
src/rgw/driver/posix/rgw_sal_posix.h
src/rgw/driver/rados/rgw_bucket_sync.cc
src/rgw/driver/rados/rgw_bucket_sync.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/rgw_op.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_dbstore.cc
src/rgw/rgw_sal_dbstore.h
src/rgw/rgw_sal_filter.cc
src/rgw/rgw_sal_filter.h
src/rgw/rgw_sal_store.h
src/rgw/rgw_sync_policy.cc
src/rgw/rgw_sync_policy.h
src/test/rgw/rgw_multi/tests.py
src/test/rgw/rgw_multi/zone_rados.py

index 1afd6822fe2fc321b531fd0a96b3ec0d9761a827..7a5d6402067574162cb17f66a50e4a7443464b9e 100644 (file)
@@ -210,7 +210,7 @@ int D4NFilterObject::copy_object(const ACLOwner& owner,
 }
 
 int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                            Attrs* delattrs, optional_yield y
+                            Attrs* delattrs, optional_yield y, uint32_t flags)
 {
   if (setattrs != NULL) {
     /* Ensure setattrs and delattrs do not overlap */
@@ -241,7 +241,7 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr
       ldpp_dout(dpp, 10) << "D4NFilterObject::" << __func__ << "(): CacheDriver delete_attrs method failed." << dendl;
   }
 
-  return next->set_obj_attrs(dpp, setattrs, delattrs, y);  
+  return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
 }
 
 int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
index 42436b92d1d51766ec48de7de9b811797008c4a8..3d3a1125e02790ad72811775d858974e86606329 100644 (file)
@@ -195,7 +195,7 @@ class D4NFilterObject : public FilterObject {
                const DoutPrefixProvider* dpp, optional_yield y) override;
     virtual const std::string &get_name() const override { return next->get_name(); }
     virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                            Attrs* delattrs, optional_yield y) override;
+                            Attrs* delattrs, optional_yield y, uint32_t flags) override;
     virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
                             rgw_obj* target_obj = NULL) override;
     virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
index f8f60d82d0294ea49ab6558be5fbc420e64473bf..d39422e4da8a4cc1fe0275a073689cbd99714ad8 100644 (file)
@@ -900,7 +900,7 @@ int DaosObject::get_obj_state(const DoutPrefixProvider* dpp,
 DaosObject::~DaosObject() { close(nullptr); }
 
 int DaosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                              Attrs* delattrs, optional_yield y) {
+                              Attrs* delattrs, optional_yield y, uint32_t flags) {
   ldpp_dout(dpp, 20) << "DEBUG: DaosObject::set_obj_attrs()" << dendl;
   // TODO handle target_obj
   // Get object's metadata (those stored in rgw_bucket_dir_entry)
@@ -959,7 +959,7 @@ int DaosObject::delete_obj_attrs(const DoutPrefixProvider* dpp,
   bufferlist bl;
 
   rmattr[attr_name] = bl;
-  return set_obj_attrs(dpp, nullptr, &rmattr, y);
+  return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP);
 }
 
 bool DaosObject::is_expired() {
index cf1583fc174713c06d4b0f9607d99f972468ffb5..8c72fa68fa6723247bd5377e753081beeabf0a94 100644 (file)
@@ -620,7 +620,7 @@ class DaosObject : public StoreObject {
   virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState** state,
                             optional_yield y, bool follow_olh = true) override;
   virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                            Attrs* delattrs, optional_yield y) override;
+                            Attrs* delattrs, optional_yield y, uint32_t flags) override;
   virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
                             rgw_obj* target_obj = NULL) override;
   virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
index ae86ec9e7d352641b2b8ded929cab16e140d3c92..419e613d0f831c56d0faa8ef7c48cad2ced42055 100644 (file)
@@ -1182,7 +1182,7 @@ MotrObject::~MotrObject() {
 //    return read_op.prepare(dpp);
 //  }
 
-int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
+int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags)
 {
   // TODO: implement
   ldpp_dout(dpp, 20) <<__func__<< ": MotrObject::set_obj_attrs()" << dendl;
@@ -1238,7 +1238,7 @@ int MotrObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, op
   }
   set_atomic();
   state.attrset[attr_name] = attr_val;
-  return set_obj_attrs(dpp, &state.attrset, nullptr, y);
+  return set_obj_attrs(dpp, &state.attrset, nullptr, y, rgw::sal::FLAG_LOG_OP);
 }
 
 int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y)
@@ -1249,7 +1249,7 @@ int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr
 
   set_atomic();
   rmattr[attr_name] = bl;
-  return set_obj_attrs(dpp, nullptr, &rmattr, y);
+  return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP);
 }
 
 bool MotrObject::is_expired() {
index e278728c7e7fd9962f156cf191f4b6b2f7a1da50..233e99e8f59ed0a5088459bfa8b1b76fe36d708c 100644 (file)
@@ -677,7 +677,7 @@ class MotrObject : public StoreObject {
     virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
     virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
     virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
-    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
     virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
     virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
     virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
index 9b1b34fa9e4e0dcda73663175129241e2debc30e..8bb0df5dca1501bc48aed259df754009d28bba56 100644 (file)
@@ -1555,7 +1555,7 @@ int POSIXObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **psta
 }
 
 int POSIXObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                            Attrs* delattrs, optional_yield y)
+                            Attrs* delattrs, optional_yield y, uint32_t flags)
 {
   if (delattrs) {
     for (auto& it : *delattrs) {
@@ -2420,7 +2420,7 @@ int POSIXObject::copy(const DoutPrefixProvider *dpp, optional_yield y,
     return ret;
   }
 
-  ret = dobj->set_obj_attrs(dpp, &get_attrs(), NULL, y);
+  ret = dobj->set_obj_attrs(dpp, &get_attrs(), NULL, y, rgw::sal::FLAG_LOG_OP);
   if (ret < 0) {
     ldpp_dout(dpp, 0) << "ERROR: could not write attrs to dest object "
                       << dobj->get_name() << dendl;
@@ -2529,7 +2529,7 @@ int POSIXMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y,
 
   attrs[RGW_POSIX_ATTR_MPUPLOAD] = bl;
 
-  return meta_obj->set_obj_attrs(dpp, &attrs, nullptr, y);
+  return meta_obj->set_obj_attrs(dpp, &attrs, nullptr, y, rgw::sal::FLAG_LOG_OP);
 }
 
 int POSIXMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct,
index 587bf783e9018d163b09f22f884479fef22f41b9..22445d066672af64ed5954984459c1d1e447aa83 100644 (file)
@@ -341,7 +341,7 @@ public:
   virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
   virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
   virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                           Attrs* delattrs, optional_yield y) override;
+                           Attrs* delattrs, optional_yield y, uint32_t flags) override;
   virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
                            rgw_obj* target_obj = NULL) override;
   virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
index dafbb6df46f4ce107ce876372947905c9a5ff260..d59d79a008580faa46f77b6d49c0d1b2d74d061a 100644 (file)
@@ -984,6 +984,19 @@ void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sour
   }
 }
 
+bool RGWBucketSyncPolicyHandler::bucket_exports_object(const std::string& obj_name, const RGWObjTags& tags) {
+  if (bucket_exports_data()) {
+    for (auto& entry : target_pipes.pipe_map) {
+      auto& filter = entry.second.params.source.filter;
+      if (filter.check_prefix(obj_name) && filter.check_tags(tags.get_tags())) {
+       return true;
+      }
+    }
+  }
+
+  return false;
+}
+
 bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
 {
   if (!bucket) {
index d425ecf1732f59c5d070cff107bf1ef27a0d5f34..db542833792a0c38e6021173741cf1596463c5f7 100644 (file)
@@ -402,6 +402,7 @@ public:
     return target_hints;
   }
 
+  bool bucket_exports_object(const std::string& obj_name, const RGWObjTags& tags);
   bool bucket_exports_data() const;
   bool bucket_imports_data() const;
 
index 95f05f149a062029901f6fe7c146830d329b5b32..3ff6ff2201ec669b93232372f59ceb7ecae05a99 100644 (file)
@@ -5470,7 +5470,7 @@ static int resync_encrypted_multipart(const DoutPrefixProvider* dpp,
   };
 
   return store->set_attrs(dpp, &obj_ctx, bucket_info, state.obj,
-                          add_attrs, nullptr, y, set_mtime);
+                          add_attrs, nullptr, y, true, set_mtime);
 }
 
 static void try_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
@@ -6469,13 +6469,14 @@ int RGWRados::set_attr(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBuc
 {
   map<string, bufferlist> attrs;
   attrs[name] = bl;
-  return set_attrs(dpp, octx, bucket_info, obj, attrs, NULL, y);
+  return set_attrs(dpp, octx, bucket_info, obj, attrs, NULL, y, true);
 }
 
 int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBucketInfo& bucket_info, const rgw_obj& src_obj,
                         map<string, bufferlist>& attrs,
                         map<string, bufferlist>* rmattrs,
                         optional_yield y,
+                        bool log_op,
                         ceph::real_time set_mtime /* = zero() */)
 {
   rgw_obj obj = src_obj;
@@ -6547,7 +6548,7 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu
     string tag;
     append_rand_alpha(cct, tag, tag, 32);
     state->write_tag = tag;
-    r = index_op.prepare(dpp, CLS_RGW_OP_ADD, &state->write_tag, y);
+    r = index_op.prepare(dpp, CLS_RGW_OP_ADD, &state->write_tag, y, log_op);
 
     if (r < 0)
       return r;
@@ -6591,9 +6592,9 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu
       int64_t poolid = ioctx.get_id();
       r = index_op.complete(dpp, poolid, epoch, state->size, state->accounted_size,
                             mtime, etag, content_type, storage_class, owner,
-                            RGWObjCategory::Main, nullptr, y);
+                            RGWObjCategory::Main, nullptr, y, nullptr, false, log_op);
     } else {
-      int ret = index_op.cancel(dpp, nullptr, y);
+      int ret = index_op.cancel(dpp, nullptr, y, log_op);
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl;
       }
index 481a94a140dc91cf75bbcc9a02e9d55c7f7a15f0..b182b8885823dd2f5f482f56578442cc34f114e6 100644 (file)
@@ -1284,6 +1284,7 @@ public:
                         std::map<std::string, bufferlist>& attrs,
                         std::map<std::string, bufferlist>* rmattrs,
                         optional_yield y,
+                        bool log_op,
                         ceph::real_time set_mtime = ceph::real_clock::zero());
 
   int get_obj_state(const DoutPrefixProvider *dpp, RGWObjectCtx *rctx,
index 78f76218e84d55e9e56869e3174cea877a8e66c8..aa1e5c902b68d7eb2e8ae79a09c6e4f6824df9a0 100644 (file)
@@ -59,6 +59,8 @@
 #include "services/svc_meta.h"
 #include "services/svc_meta_be_sobj.h"
 #include "services/svc_cls.h"
+#include "services/svc_bilog_rados.h"
+#include "services/svc_bi_rados.h"
 #include "services/svc_zone.h"
 #include "services/svc_tier_rados.h"
 #include "services/svc_quota.h"
@@ -2238,6 +2240,40 @@ RadosObject::~RadosObject()
     delete rados_ctx;
 }
 
+bool RadosObject::is_sync_completed(const DoutPrefixProvider* dpp,
+   const ceph::real_time& obj_mtime)
+{
+  const auto& bucket_info = get_bucket()->get_info();
+  if (bucket_info.is_indexless()) {
+    ldpp_dout(dpp, 0) << "ERROR: Trying to check object replication status for object in an indexless bucket. obj=" << get_key() << dendl;
+    return false;
+  }
+
+  const auto& log_layout = bucket_info.layout.logs.front();
+  const uint32_t shard_count = num_shards(log_to_index_layout(log_layout));
+
+  std::string marker;
+  bool truncated;
+  list<rgw_bi_log_entry> entries;
+
+  const int shard_id = RGWSI_BucketIndex_RADOS::bucket_shard_index(get_key(), shard_count);
+
+  int ret = store->svc()->bilog_rados->log_list(dpp, bucket_info, log_layout, shard_id,
+    marker, 1, entries, &truncated);
+
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: Failed to retrieve bilog info for obj=" << get_key() << dendl;
+    return false;
+  }
+
+  if (entries.empty()) {
+    return true;
+  }
+
+  const rgw_bi_log_entry& earliest_marker = entries.front();
+  return earliest_marker.timestamp > obj_mtime;
+}
+
 int RadosObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **pstate, optional_yield y, bool follow_olh)
 {
   int ret = store->getRados()->get_obj_state(dpp, rados_ctx, bucket->get_info(), get_obj(), pstate, &manifest, follow_olh, y);
@@ -2268,7 +2304,7 @@ int RadosObject::read_attrs(const DoutPrefixProvider* dpp, RGWRados::Object::Rea
   return read_op.prepare(y, dpp);
 }
 
-int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
+int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags)
 {
   Attrs empty;
   return store->getRados()->set_attrs(dpp, rados_ctx,
@@ -2276,7 +2312,7 @@ int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, A
                        get_obj(),
                        setattrs ? *setattrs : empty,
                        delattrs ? delattrs : nullptr,
-                       y);
+                       y, flags & rgw::sal::FLAG_LOG_OP);
 }
 
 int RadosObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj)
@@ -2300,7 +2336,7 @@ int RadosObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, o
   state.obj = target;
   set_atomic();
   state.attrset[attr_name] = attr_val;
-  r = set_obj_attrs(dpp, &state.attrset, nullptr, y);
+  r = set_obj_attrs(dpp, &state.attrset, nullptr, y, rgw::sal::FLAG_LOG_OP);
   /* Restore target */
   state.obj = save;
 
@@ -2314,7 +2350,7 @@ int RadosObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* att
 
   set_atomic();
   rmattr[attr_name] = bl;
-  return set_obj_attrs(dpp, nullptr, &rmattr, y);
+  return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP);
 }
 
 bool RadosObject::is_expired() {
@@ -2467,7 +2503,7 @@ int RadosObject::chown(User& new_user, const DoutPrefixProvider* dpp, optional_y
   set_atomic();
   map<string, bufferlist> attrs;
   attrs[RGW_ATTR_ACL] = bl;
-  r = set_obj_attrs(dpp, &attrs, nullptr, y);
+  r = set_obj_attrs(dpp, &attrs, nullptr, y, rgw::sal::FLAG_LOG_OP);
   if (r < 0) {
     ldpp_dout(dpp, 0) << "ERROR: modify attr failed " << cpp_strerror(-r) << dendl;
     return r;
index 78bd849717f605434aab8fac7257292d6a084d5d..a056b08a832bc0463c32a5de8c0c10581867d113 100644 (file)
@@ -585,8 +585,10 @@ class RadosObject : public StoreObject {
       StoreObject::set_compressed();
     }
 
+    virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+      const ceph::real_time& obj_mtime) override;
     virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
-    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
     virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
     virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
     virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
index 602280182d7a7f956538131f136cd0500277c29e..6045a9a60f3cd9cad1e91bc88b472ac08f848346 100644 (file)
@@ -57,6 +57,7 @@
 #include "rgw_lua_data_filter.h"
 #include "rgw_lua.h"
 #include "rgw_iam_managed_policy.h"
+#include "rgw_bucket_sync.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_quota.h"
@@ -908,6 +909,27 @@ void rgw_build_iam_environment(rgw::sal::Driver* driver,
   }
 }
 
+void handle_replication_status_header(
+    const DoutPrefixProvider *dpp,
+    rgw::sal::Attrs& attrs,
+    req_state* s,
+    const ceph::real_time &obj_mtime) {
+  auto attr_iter = attrs.find(RGW_ATTR_OBJ_REPLICATION_STATUS);
+  if (attr_iter != attrs.end() && attr_iter->second.to_str() == "PENDING") {
+    if (s->object->is_sync_completed(dpp, obj_mtime)) {
+        s->object->set_atomic();
+        rgw::sal::Attrs setattrs, rmattrs;
+        bufferlist bl;
+        bl.append("COMPLETED");
+        setattrs[RGW_ATTR_OBJ_REPLICATION_STATUS] = bl;
+       int ret = s->object->set_obj_attrs(dpp, &setattrs, &rmattrs, s->yield, 0);
+       if (ret == 0) {
+         ldpp_dout(dpp, 20) << *s->object << " has amz-replication-status header set to COMPLETED" << dendl;
+       }
+    }
+  }
+}
+
 /*
  * GET on CloudTiered objects is processed only when sent from the sync client.
  * In all other cases, fail with `ERR_INVALID_OBJECT_STATE`.
@@ -2326,6 +2348,7 @@ void RGWGetObj::execute(optional_yield y)
   }
 #endif
 
+
   op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
@@ -2344,6 +2367,8 @@ void RGWGetObj::execute(optional_yield y)
     filter = &*decompress;
   }
 
+  handle_replication_status_header(this, attrs, s, lastmod);
+
   attr_iter = attrs.find(RGW_ATTR_OBJ_REPLICATION_TRACE);
   if (attr_iter != attrs.end()) {
     try {
@@ -2361,6 +2386,7 @@ void RGWGetObj::execute(optional_yield y)
     } catch (const buffer::error&) {}
   }
 
+
   if (get_type() == RGW_OP_GET_OBJ && get_data) {
     op_ret = handle_cloudtier_obj(attrs, sync_cloudtiered);
     if (op_ret < 0) {
@@ -4480,6 +4506,19 @@ void RGWPutObj::execute(optional_yield y)
     }
   }
 
+  RGWBucketSyncPolicyHandlerRef policy_handler;
+  op_ret = driver->get_sync_policy_handler(this, std::nullopt, s->bucket->get_key(), &policy_handler, s->yield);
+
+  if (op_ret < 0) {
+    ldpp_dout(this, 0) << "failed to read sync policy for bucket: " << s->bucket << dendl;
+    return;
+  }
+  if (policy_handler && policy_handler->bucket_exports_object(s->object->get_name(), *obj_tags)) {
+    bufferlist repl_bl;
+    repl_bl.append("PENDING");
+    emplace_attr(RGW_ATTR_OBJ_REPLICATION_STATUS, std::move(repl_bl));
+  }
+
   if (slo_info) {
     bufferlist manifest_bl;
     encode(*slo_info, manifest_bl);
@@ -5037,7 +5076,7 @@ void RGWPutMetadataObject::execute(optional_yield y)
     }
   }
 
-  op_ret = s->object->set_obj_attrs(this, &attrs, &rmattrs, s->yield);
+  op_ret = s->object->set_obj_attrs(this, &attrs, &rmattrs, s->yield, rgw::sal::FLAG_LOG_OP);
 }
 
 int RGWDeleteObj::handle_slo_manifest(bufferlist& bl, optional_yield y)
@@ -7678,7 +7717,7 @@ void RGWRMAttrs::execute(optional_yield y)
 
   s->object->set_atomic();
 
-  op_ret = s->object->set_obj_attrs(this, nullptr, &attrs, y);
+  op_ret = s->object->set_obj_attrs(this, nullptr, &attrs, y, rgw::sal::FLAG_LOG_OP);
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "ERROR: failed to delete obj attrs, obj=" << s->object
                       << " ret=" << op_ret << dendl;
@@ -7715,7 +7754,7 @@ void RGWSetAttrs::execute(optional_yield y)
 
   if (!rgw::sal::Object::empty(s->object.get())) {
     rgw::sal::Attrs a(attrs);
-    op_ret = s->object->set_obj_attrs(this, &a, nullptr, y);
+    op_ret = s->object->set_obj_attrs(this, &a, nullptr, y, rgw::sal::FLAG_LOG_OP);
   } else {
     op_ret = s->bucket->merge_and_store_attrs(this, attrs, y);
   }
index e3a892b6992ceba12c336345763f17c7564ca568..eab0ad13e08e5991af63f79d264fab367bb524ea 100644 (file)
@@ -1178,6 +1178,9 @@ class Object {
     virtual void set_compressed() = 0;
     /** Check if this object is compressed */
     virtual bool is_compressed() = 0;
+    /** Check if object is synced */
+    virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+      const ceph::real_time& obj_mtime) = 0;
     /** Invalidate cached info about this object, except atomic, prefetch, and
      * compressed */
     virtual void invalidate() = 0;
@@ -1193,7 +1196,7 @@ class Object {
     virtual void set_obj_state(RGWObjState& _state) = 0;
     /** Set attributes for this object from the backing store.  Attrs can be set or
      * deleted.  @note the attribute APIs may be revisited in the future. */
-    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) = 0;
+    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) = 0;
     /** Get attributes for this object */
     virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) = 0;
     /** Modify attributes for this object. */
index 2ce6304646eac590a4b876a73022ca1855d69652..b6bbcf7a0300e86b20e9fc4163fecfb51f1fb74c 100644 (file)
@@ -527,7 +527,7 @@ namespace rgw::sal {
     return read_op.prepare(dpp);
   }
 
-  int DBObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
+  int DBObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags)
   {
     Attrs empty;
     DB::Object op_target(store->getDB(),
@@ -552,7 +552,7 @@ namespace rgw::sal {
     }
     set_atomic();
     state.attrset[attr_name] = attr_val;
-    return set_obj_attrs(dpp, &state.attrset, nullptr, y);
+    return set_obj_attrs(dpp, &state.attrset, nullptr, y, 0);
   }
 
   int DBObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y)
@@ -562,7 +562,7 @@ namespace rgw::sal {
 
     set_atomic();
     rmattr[attr_name] = bl;
-    return set_obj_attrs(dpp, nullptr, &rmattr, y);
+    return set_obj_attrs(dpp, nullptr, &rmattr, y, 0);
   }
 
   bool DBObject::is_expired() {
index 4770713e762a6fef597887b680a23057bb35c861..516ab47393a1a72bff157f2f4e9c94b844a0c508 100644 (file)
@@ -542,7 +542,7 @@ protected:
       virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
 
       virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
-      virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+      virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
       virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
       virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
       virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
index 66a2466040ba56f1c51893fa52f291a9fbbb3e3a..0dd76b333fa0f05544bee9ffb843651760f0a1a2 100644 (file)
@@ -1037,9 +1037,9 @@ int FilterObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **pst
 }
 
 int FilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                               Attrs* delattrs, optional_yield y)
+                               Attrs* delattrs, optional_yield y, uint32_t flags)
 {
-  return next->set_obj_attrs(dpp, setattrs, delattrs, y);
+  return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
 }
 
 int FilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
index 95d00960cbd2d4df9ea6af9defc86f8bd6a80502..2e9a08ed0b55e4fcfdde75d3f2d508409a6b7f55 100644 (file)
@@ -746,6 +746,8 @@ public:
   virtual bool is_prefetch_data() override { return next->is_prefetch_data(); }
   virtual void set_compressed() override { return next->set_compressed(); }
   virtual bool is_compressed() override { return next->is_compressed(); }
+  virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+    const ceph::real_time& obj_mtime) override { return next->is_sync_completed(dpp, obj_mtime); }
   virtual void invalidate() override { return next->invalidate(); }
   virtual bool empty() const override { return next->empty(); }
   virtual const std::string &get_name() const override { return next->get_name(); }
@@ -754,7 +756,7 @@ public:
                            optional_yield y, bool follow_olh = true) override;
   virtual void set_obj_state(RGWObjState& _state) override { return next->set_obj_state(_state); }
   virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                           Attrs* delattrs, optional_yield y) override;
+                           Attrs* delattrs, optional_yield y, uint32_t flags) override;
   virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
                            rgw_obj* target_obj = NULL) override;
   virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
index 8bfff8025260e4e1b4c0ccf1dcc1027c99d985d4..bf192c52dc9ea13c0ea66a62ce3a73a8e4d15fcb 100644 (file)
@@ -206,6 +206,8 @@ class StoreObject : public Object {
     virtual bool is_prefetch_data() override { return state.prefetch_data; }
     virtual void set_compressed() override { state.compressed = true; }
     virtual bool is_compressed() override { return state.compressed; }
+    virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+      const ceph::real_time& obj_mtime) override { return false; }
     virtual void invalidate() override {
       rgw_obj obj = state.obj;
       bool is_atomic = state.is_atomic;
index 0568262de6756587c6481d1e3d39f6d1f34d470c..b65752959e9c93bb456d2656e2f7999030a62c00 100644 (file)
@@ -74,6 +74,14 @@ void rgw_sync_pipe_filter::set_prefix(std::optional<std::string> opt_prefix,
   }
 }
 
+bool rgw_sync_pipe_filter::check_prefix(const std::string& obj_name) const
+{
+  if (prefix.has_value()) {
+    return boost::starts_with(obj_name, prefix.value());
+  }
+  return true;
+}
+
 void rgw_sync_pipe_filter::set_tags(std::list<std::string>& tags_add,
                                     std::list<std::string>& tags_rm)
 {
index ec9d1f2c623417dc01029ab31f050c6afe76c4d3..062fb11532440d933a559ff2904954cdc022285c 100644 (file)
@@ -244,6 +244,7 @@ struct rgw_sync_pipe_filter {
   bool check_tag(const std::string& k, const std::string& v) const;
   bool check_tags(const std::vector<std::string>& tags) const;
   bool check_tags(const RGWObjTags::tag_map_t& tags) const;
+  bool check_prefix(const std::string& obj_name) const;
 };
 WRITE_CLASS_ENCODER(rgw_sync_pipe_filter)
 
index f0b36865ed1c5afbd7c22bcb1c70fce85d04b0e2..cb3f2bf8ed437a5eced19aa253c1db2acf3b1095 100644 (file)
@@ -1921,6 +1921,33 @@ def test_role_delete_sync():
                       zone.iam_conn.get_role, RoleName=role_name)
         log.info(f'success, zone: {zone.name} does not have role: {role_name}')
 
+def test_replication_status():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    zone = zonegroup_conns.rw_zones[0]
+
+    bucket = zone.conn.create_bucket(gen_bucket_name())
+    obj_name = "a"
+    k = new_key(zone, bucket.name, obj_name)
+    k.set_contents_from_string('foo')
+    zonegroup_meta_checkpoint(zonegroup)
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    head_res = zone.head_object(bucket.name, obj_name)
+    log.info("checking if object has PENDING ReplicationStatus")
+    assert(head_res["ReplicationStatus"] == "PENDING")
+
+    bilog_autotrim(zone.zone)
+    zonegroup_data_checkpoint(zonegroup_conns)
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+    head_res = zone.head_object(bucket.name, obj_name)
+    log.info("checking if object has COMPLETED ReplicationStatus")
+    assert(head_res["ReplicationStatus"] == "COMPLETED")
+
+    log.info("checking that ReplicationStatus update did not write a bilog")
+    bilog = bilog_list(zone.zone, bucket.name)
+    assert(len(bilog) == 0)
 
 @attr('data_sync_init')
 def test_bucket_full_sync_after_data_sync_init():
index 3761676a3d781368f2267b6495de9c80541d5047..ce0530543e061f7f199b741c84f709eb6347c6e5 100644 (file)
@@ -170,6 +170,9 @@ class RadosZone(Zone):
               return out['TopicConfigurations']
             return []
 
+        def head_object(self, bucket_name, obj_name):
+            return self.s3_client.head_object(Bucket=bucket_name, Key=obj_name)
+
     def get_conn(self, credentials):
         return self.Conn(self, credentials)