]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: implement x-amz-replication-status for PENDING & COMPLETED
authorAlex Wojno <awojno@bloomberg.net>
Tue, 23 Apr 2024 17:58:05 +0000 (13:58 -0400)
committerAlex Wojno <awojno@bloomberg.net>
Wed, 20 Nov 2024 20:39:36 +0000 (20:39 +0000)
Signed-off-by: Alex Wojno <awojno@bloomberg.net>
(cherry picked from commit 3161e24894afafa43cbf0bf7a3b316a6d05b0420)

Conflicts:
        src/rgw/driver/rados/rgw_sal_rados.cc
        src/test/rgw/rgw_multi/tests.py

25 files changed:
src/rgw/driver/d4n/rgw_sal_d4n.cc
src/rgw/driver/d4n/rgw_sal_d4n.h
src/rgw/driver/daos/rgw_sal_daos.cc
src/rgw/driver/daos/rgw_sal_daos.h
src/rgw/driver/motr/rgw_sal_motr.cc
src/rgw/driver/motr/rgw_sal_motr.h
src/rgw/driver/posix/rgw_sal_posix.cc
src/rgw/driver/posix/rgw_sal_posix.h
src/rgw/driver/rados/rgw_bucket_sync.cc
src/rgw/driver/rados/rgw_bucket_sync.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/rgw_op.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_dbstore.cc
src/rgw/rgw_sal_dbstore.h
src/rgw/rgw_sal_filter.cc
src/rgw/rgw_sal_filter.h
src/rgw/rgw_sal_store.h
src/rgw/rgw_sync_policy.cc
src/rgw/rgw_sync_policy.h
src/test/rgw/rgw_multi/tests.py
src/test/rgw/rgw_multi/zone_rados.py

index 6776681423326f8a560031408219d86cdb87995c..fb4aa4653208d6bd0ac93bf712177a474d7b611a 100644 (file)
@@ -151,7 +151,7 @@ int D4NFilterObject::copy_object(const ACLOwner& owner,
 }
 
 int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                            Attrs* delattrs, optional_yield y
+                            Attrs* delattrs, optional_yield y, uint32_t flags)
 {
   if (setattrs != NULL) {
     /* Ensure setattrs and delattrs do not overlap */
@@ -198,7 +198,7 @@ int D4NFilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattr
     }
   }
 
-  return next->set_obj_attrs(dpp, setattrs, delattrs, y);  
+  return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
 }
 
 int D4NFilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
index 770fa9005800bf9a315e7b5b84367d9cb2e315f5..fd973f32e85575bb2c5af0043d3c09cd9aafd773 100644 (file)
@@ -139,7 +139,7 @@ class D4NFilterObject : public FilterObject {
                const DoutPrefixProvider* dpp, optional_yield y) override;
     virtual const std::string &get_name() const override { return next->get_name(); }
     virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                            Attrs* delattrs, optional_yield y) override;
+                            Attrs* delattrs, optional_yield y, uint32_t flags) override;
     virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
                             rgw_obj* target_obj = NULL) override;
     virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
index 0b169a949daeb1840bbef4ce625ef67842495fc5..6c7dfdfebccfce6658061f727dd3376c16d49fc4 100644 (file)
@@ -900,7 +900,7 @@ int DaosObject::get_obj_state(const DoutPrefixProvider* dpp,
 DaosObject::~DaosObject() { close(nullptr); }
 
 int DaosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                              Attrs* delattrs, optional_yield y) {
+                              Attrs* delattrs, optional_yield y, uint32_t flags) {
   ldpp_dout(dpp, 20) << "DEBUG: DaosObject::set_obj_attrs()" << dendl;
   // TODO handle target_obj
   // Get object's metadata (those stored in rgw_bucket_dir_entry)
@@ -959,7 +959,7 @@ int DaosObject::delete_obj_attrs(const DoutPrefixProvider* dpp,
   bufferlist bl;
 
   rmattr[attr_name] = bl;
-  return set_obj_attrs(dpp, nullptr, &rmattr, y);
+  return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP);
 }
 
 bool DaosObject::is_expired() {
index 91122e3e063245fde248b42fb8226e5cf53c96e3..9556d71d8b530b8c4fcf0cc69c9aa6d1aa87d2f7 100644 (file)
@@ -621,7 +621,7 @@ class DaosObject : public StoreObject {
   virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState** state,
                             optional_yield y, bool follow_olh = true) override;
   virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                            Attrs* delattrs, optional_yield y) override;
+                            Attrs* delattrs, optional_yield y, uint32_t flags) override;
   virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
                             rgw_obj* target_obj = NULL) override;
   virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
index 6a078bdaa255f67de4b2547485cc10ccc1b4624f..263c697ee495ad09439537e5977247899d5f8baf 100644 (file)
@@ -1182,7 +1182,7 @@ MotrObject::~MotrObject() {
 //    return read_op.prepare(dpp);
 //  }
 
-int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
+int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags)
 {
   // TODO: implement
   ldpp_dout(dpp, 20) <<__func__<< ": MotrObject::set_obj_attrs()" << dendl;
@@ -1238,7 +1238,7 @@ int MotrObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, op
   }
   set_atomic();
   state.attrset[attr_name] = attr_val;
-  return set_obj_attrs(dpp, &state.attrset, nullptr, y);
+  return set_obj_attrs(dpp, &state.attrset, nullptr, y, rgw::sal::FLAG_LOG_OP);
 }
 
 int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y)
@@ -1249,7 +1249,7 @@ int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr
 
   set_atomic();
   rmattr[attr_name] = bl;
-  return set_obj_attrs(dpp, nullptr, &rmattr, y);
+  return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP);
 }
 
 bool MotrObject::is_expired() {
index 5e1e4ae271a735cba387609651c161da8270a67c..71c74696ea63b9fe22e1bcc9ea05feec66944929 100644 (file)
@@ -679,7 +679,7 @@ class MotrObject : public StoreObject {
     virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
     virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
     virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
-    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
     virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
     virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
     virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
index 534b0ec7db234d2bedff58372c76d74f98b93574..b3673a4143c470990f801718c5b07859e6a436a5 100644 (file)
@@ -1557,7 +1557,7 @@ int POSIXObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **psta
 }
 
 int POSIXObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                            Attrs* delattrs, optional_yield y)
+                            Attrs* delattrs, optional_yield y, uint32_t flags)
 {
   if (delattrs) {
     for (auto& it : *delattrs) {
@@ -2422,7 +2422,7 @@ int POSIXObject::copy(const DoutPrefixProvider *dpp, optional_yield y,
     return ret;
   }
 
-  ret = dobj->set_obj_attrs(dpp, &get_attrs(), NULL, y);
+  ret = dobj->set_obj_attrs(dpp, &get_attrs(), NULL, y, rgw::sal::FLAG_LOG_OP);
   if (ret < 0) {
     ldpp_dout(dpp, 0) << "ERROR: could not write attrs to dest object "
                       << dobj->get_name() << dendl;
@@ -2531,7 +2531,7 @@ int POSIXMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y,
 
   attrs[RGW_POSIX_ATTR_MPUPLOAD] = bl;
 
-  return meta_obj->set_obj_attrs(dpp, &attrs, nullptr, y);
+  return meta_obj->set_obj_attrs(dpp, &attrs, nullptr, y, rgw::sal::FLAG_LOG_OP);
 }
 
 int POSIXMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct,
index fc99060993d9fef5a6dcfa0e7d311db54c73b3e2..4206fe488455be7a0a96f7a2956795c608ce7729 100644 (file)
@@ -343,7 +343,7 @@ public:
   virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
   virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
   virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                           Attrs* delattrs, optional_yield y) override;
+                           Attrs* delattrs, optional_yield y, uint32_t flags) override;
   virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
                            rgw_obj* target_obj = NULL) override;
   virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
index dafbb6df46f4ce107ce876372947905c9a5ff260..d59d79a008580faa46f77b6d49c0d1b2d74d061a 100644 (file)
@@ -984,6 +984,19 @@ void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sour
   }
 }
 
+bool RGWBucketSyncPolicyHandler::bucket_exports_object(const std::string& obj_name, const RGWObjTags& tags) {
+  if (bucket_exports_data()) {
+    for (auto& entry : target_pipes.pipe_map) {
+      auto& filter = entry.second.params.source.filter;
+      if (filter.check_prefix(obj_name) && filter.check_tags(tags.get_tags())) {
+       return true;
+      }
+    }
+  }
+
+  return false;
+}
+
 bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
 {
   if (!bucket) {
index d425ecf1732f59c5d070cff107bf1ef27a0d5f34..db542833792a0c38e6021173741cf1596463c5f7 100644 (file)
@@ -402,6 +402,7 @@ public:
     return target_hints;
   }
 
+  bool bucket_exports_object(const std::string& obj_name, const RGWObjTags& tags);
   bool bucket_exports_data() const;
   bool bucket_imports_data() const;
 
index ab6e87cfdfb40d4f13d86c8d7a31e11b512794e6..14161411a0cf9134c8accd630219697a757b2cb8 100644 (file)
@@ -5479,7 +5479,7 @@ static int resync_encrypted_multipart(const DoutPrefixProvider* dpp,
   };
 
   return store->set_attrs(dpp, &obj_ctx, bucket_info, state.obj,
-                          add_attrs, nullptr, y, set_mtime);
+                          add_attrs, nullptr, y, true, set_mtime);
 }
 
 static void try_resync_encrypted_multipart(const DoutPrefixProvider* dpp,
@@ -6483,13 +6483,14 @@ int RGWRados::set_attr(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBuc
 {
   map<string, bufferlist> attrs;
   attrs[name] = bl;
-  return set_attrs(dpp, octx, bucket_info, obj, attrs, NULL, y);
+  return set_attrs(dpp, octx, bucket_info, obj, attrs, NULL, y, true);
 }
 
 int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBucketInfo& bucket_info, const rgw_obj& src_obj,
                         map<string, bufferlist>& attrs,
                         map<string, bufferlist>* rmattrs,
                         optional_yield y,
+                        bool log_op,
                         ceph::real_time set_mtime /* = zero() */)
 {
   rgw_obj obj = src_obj;
@@ -6561,7 +6562,7 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu
     string tag;
     append_rand_alpha(cct, tag, tag, 32);
     state->write_tag = tag;
-    r = index_op.prepare(dpp, CLS_RGW_OP_ADD, &state->write_tag, y);
+    r = index_op.prepare(dpp, CLS_RGW_OP_ADD, &state->write_tag, y, log_op);
 
     if (r < 0)
       return r;
@@ -6617,9 +6618,9 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, RGWObjectCtx* octx, RGWBu
       int64_t poolid = ioctx.get_id();
       r = index_op.complete(dpp, poolid, epoch, state->size, state->accounted_size,
                             mtime, etag, content_type, storage_class, owner,
-                            RGWObjCategory::Main, nullptr, y);
+                            RGWObjCategory::Main, nullptr, y, nullptr, false, log_op);
     } else {
-      int ret = index_op.cancel(dpp, nullptr, y);
+      int ret = index_op.cancel(dpp, nullptr, y, log_op);
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl;
       }
index b61f78492840c03b042d5f38a7834caebba8dc4b..38695dd4d168ab8804a530ba2b42b96afc6caf58 100644 (file)
@@ -1287,6 +1287,7 @@ public:
                         std::map<std::string, bufferlist>& attrs,
                         std::map<std::string, bufferlist>* rmattrs,
                         optional_yield y,
+                        bool log_op,
                         ceph::real_time set_mtime = ceph::real_clock::zero());
 
   int get_obj_state(const DoutPrefixProvider *dpp, RGWObjectCtx *rctx,
index 3e9f9070296c763dbb805044cd4ee63abe39c2f3..fb38b973154404d89880c37709d8986bf96328cc 100644 (file)
@@ -59,6 +59,8 @@
 #include "services/svc_meta.h"
 #include "services/svc_meta_be_sobj.h"
 #include "services/svc_cls.h"
+#include "services/svc_bilog_rados.h"
+#include "services/svc_bi_rados.h"
 #include "services/svc_zone.h"
 #include "services/svc_tier_rados.h"
 #include "services/svc_quota.h"
@@ -2254,6 +2256,40 @@ RadosObject::~RadosObject()
     delete rados_ctx;
 }
 
+bool RadosObject::is_sync_completed(const DoutPrefixProvider* dpp,
+   const ceph::real_time& obj_mtime)
+{
+  const auto& bucket_info = get_bucket()->get_info();
+  if (bucket_info.is_indexless()) {
+    ldpp_dout(dpp, 0) << "ERROR: Trying to check object replication status for object in an indexless bucket. obj=" << get_key() << dendl;
+    return false;
+  }
+
+  const auto& log_layout = bucket_info.layout.logs.front();
+  const uint32_t shard_count = num_shards(log_to_index_layout(log_layout));
+
+  std::string marker;
+  bool truncated;
+  list<rgw_bi_log_entry> entries;
+
+  const int shard_id = RGWSI_BucketIndex_RADOS::bucket_shard_index(get_key(), shard_count);
+
+  int ret = store->svc()->bilog_rados->log_list(dpp, bucket_info, log_layout, shard_id,
+    marker, 1, entries, &truncated);
+
+  if (ret < 0) {
+    ldpp_dout(dpp, 0) << "ERROR: Failed to retrieve bilog info for obj=" << get_key() << dendl;
+    return false;
+  }
+
+  if (entries.empty()) {
+    return true;
+  }
+
+  const rgw_bi_log_entry& earliest_marker = entries.front();
+  return earliest_marker.timestamp > obj_mtime;
+}
+
 int RadosObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **pstate, optional_yield y, bool follow_olh)
 {
   int ret = store->getRados()->get_obj_state(dpp, rados_ctx, bucket->get_info(), get_obj(), pstate, &manifest, follow_olh, y);
@@ -2285,18 +2321,19 @@ int RadosObject::read_attrs(const DoutPrefixProvider* dpp, RGWRados::Object::Rea
   return read_op.prepare(y, dpp);
 }
 
-int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
+int RadosObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags)
 {
   Attrs empty;
+  const bool log_op = flags & rgw::sal::FLAG_LOG_OP;
   // make a tiny adjustment to the existing mtime so that fetch_remote_obj()
   // won't return ERR_NOT_MODIFIED when syncing the modified object
-  const auto mtime = state.mtime + std::chrono::nanoseconds(1);
+  const auto mtime = log_op ? state.mtime + std::chrono::nanoseconds(1) : state.mtime;
   return store->getRados()->set_attrs(dpp, rados_ctx,
                        bucket->get_info(),
                        get_obj(),
                        setattrs ? *setattrs : empty,
                        delattrs ? delattrs : nullptr,
-                       y, mtime);
+                       y, log_op, mtime);
 }
 
 int RadosObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj)
@@ -2320,7 +2357,7 @@ int RadosObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, o
   state.obj = target;
   set_atomic();
   state.attrset[attr_name] = attr_val;
-  r = set_obj_attrs(dpp, &state.attrset, nullptr, y);
+  r = set_obj_attrs(dpp, &state.attrset, nullptr, y, rgw::sal::FLAG_LOG_OP);
   /* Restore target */
   state.obj = save;
 
@@ -2334,7 +2371,7 @@ int RadosObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* att
 
   set_atomic();
   rmattr[attr_name] = bl;
-  return set_obj_attrs(dpp, nullptr, &rmattr, y);
+  return set_obj_attrs(dpp, nullptr, &rmattr, y, rgw::sal::FLAG_LOG_OP);
 }
 
 bool RadosObject::is_expired() {
@@ -2487,7 +2524,7 @@ int RadosObject::chown(User& new_user, const DoutPrefixProvider* dpp, optional_y
   set_atomic();
   map<string, bufferlist> attrs;
   attrs[RGW_ATTR_ACL] = bl;
-  r = set_obj_attrs(dpp, &attrs, nullptr, y);
+  r = set_obj_attrs(dpp, &attrs, nullptr, y, rgw::sal::FLAG_LOG_OP);
   if (r < 0) {
     ldpp_dout(dpp, 0) << "ERROR: modify attr failed " << cpp_strerror(-r) << dendl;
     return r;
index 0a4e03a102c920458621d00402d96197c8d1630b..e594e5bea710095a79931c929e345ac8b2bbeb42 100644 (file)
@@ -593,8 +593,10 @@ class RadosObject : public StoreObject {
       StoreObject::set_compressed();
     }
 
+    virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+      const ceph::real_time& obj_mtime) override;
     virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
-    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
     virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
     virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
     virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
index 414e1196691e992b40a2dfda00d22d976305a43e..bc901240efb9ee2dd5f4bfc94abb592903b06f9b 100644 (file)
@@ -57,6 +57,7 @@
 #include "rgw_lua_data_filter.h"
 #include "rgw_lua.h"
 #include "rgw_iam_managed_policy.h"
+#include "rgw_bucket_sync.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_quota.h"
@@ -906,6 +907,27 @@ void rgw_build_iam_environment(rgw::sal::Driver* driver,
   }
 }
 
+void handle_replication_status_header(
+    const DoutPrefixProvider *dpp,
+    rgw::sal::Attrs& attrs,
+    req_state* s,
+    const ceph::real_time &obj_mtime) {
+  auto attr_iter = attrs.find(RGW_ATTR_OBJ_REPLICATION_STATUS);
+  if (attr_iter != attrs.end() && attr_iter->second.to_str() == "PENDING") {
+    if (s->object->is_sync_completed(dpp, obj_mtime)) {
+        s->object->set_atomic();
+        rgw::sal::Attrs setattrs, rmattrs;
+        bufferlist bl;
+        bl.append("COMPLETED");
+        setattrs[RGW_ATTR_OBJ_REPLICATION_STATUS] = bl;
+       int ret = s->object->set_obj_attrs(dpp, &setattrs, &rmattrs, s->yield, 0);
+       if (ret == 0) {
+         ldpp_dout(dpp, 20) << *s->object << " has amz-replication-status header set to COMPLETED" << dendl;
+       }
+    }
+  }
+}
+
 /*
  * GET on CloudTiered objects is processed only when sent from the sync client.
  * In all other cases, fail with `ERR_INVALID_OBJECT_STATE`.
@@ -2294,6 +2316,7 @@ void RGWGetObj::execute(optional_yield y)
   }
 #endif
 
+
   op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
@@ -2312,6 +2335,8 @@ void RGWGetObj::execute(optional_yield y)
     filter = &*decompress;
   }
 
+  handle_replication_status_header(this, attrs, s, lastmod);
+
   attr_iter = attrs.find(RGW_ATTR_OBJ_REPLICATION_TRACE);
   if (attr_iter != attrs.end()) {
     try {
@@ -2329,6 +2354,7 @@ void RGWGetObj::execute(optional_yield y)
     } catch (const buffer::error&) {}
   }
 
+
   if (get_type() == RGW_OP_GET_OBJ && get_data) {
     op_ret = handle_cloudtier_obj(attrs, sync_cloudtiered);
     if (op_ret < 0) {
@@ -4449,6 +4475,19 @@ void RGWPutObj::execute(optional_yield y)
     }
   }
 
+  RGWBucketSyncPolicyHandlerRef policy_handler;
+  op_ret = driver->get_sync_policy_handler(this, std::nullopt, s->bucket->get_key(), &policy_handler, s->yield);
+
+  if (op_ret < 0) {
+    ldpp_dout(this, 0) << "failed to read sync policy for bucket: " << s->bucket << dendl;
+    return;
+  }
+  if (policy_handler && policy_handler->bucket_exports_object(s->object->get_name(), *obj_tags)) {
+    bufferlist repl_bl;
+    repl_bl.append("PENDING");
+    emplace_attr(RGW_ATTR_OBJ_REPLICATION_STATUS, std::move(repl_bl));
+  }
+
   if (slo_info) {
     bufferlist manifest_bl;
     encode(*slo_info, manifest_bl);
@@ -5006,7 +5045,7 @@ void RGWPutMetadataObject::execute(optional_yield y)
     }
   }
 
-  op_ret = s->object->set_obj_attrs(this, &attrs, &rmattrs, s->yield);
+  op_ret = s->object->set_obj_attrs(this, &attrs, &rmattrs, s->yield, rgw::sal::FLAG_LOG_OP);
 }
 
 int RGWDeleteObj::handle_slo_manifest(bufferlist& bl, optional_yield y)
@@ -7665,7 +7704,7 @@ void RGWRMAttrs::execute(optional_yield y)
 
   s->object->set_atomic();
 
-  op_ret = s->object->set_obj_attrs(this, nullptr, &attrs, y);
+  op_ret = s->object->set_obj_attrs(this, nullptr, &attrs, y, rgw::sal::FLAG_LOG_OP);
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "ERROR: failed to delete obj attrs, obj=" << s->object
                       << " ret=" << op_ret << dendl;
@@ -7702,7 +7741,7 @@ void RGWSetAttrs::execute(optional_yield y)
 
   if (!rgw::sal::Object::empty(s->object.get())) {
     rgw::sal::Attrs a(attrs);
-    op_ret = s->object->set_obj_attrs(this, &a, nullptr, y);
+    op_ret = s->object->set_obj_attrs(this, &a, nullptr, y, rgw::sal::FLAG_LOG_OP);
   } else {
     op_ret = s->bucket->merge_and_store_attrs(this, attrs, y);
   }
index 6aa055bcaa4103e8a6dcc424f29e0d62caafb094..1f9768db88c0ac577654fa26d8d0c0a0e28485d7 100644 (file)
@@ -1191,6 +1191,9 @@ class Object {
     virtual void set_compressed() = 0;
     /** Check if this object is compressed */
     virtual bool is_compressed() = 0;
+    /** Check if object is synced */
+    virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+      const ceph::real_time& obj_mtime) = 0;
     /** Invalidate cached info about this object, except atomic, prefetch, and
      * compressed */
     virtual void invalidate() = 0;
@@ -1206,7 +1209,7 @@ class Object {
     virtual void set_obj_state(RGWObjState& _state) = 0;
     /** Set attributes for this object from the backing store.  Attrs can be set or
      * deleted.  @note the attribute APIs may be revisited in the future. */
-    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) = 0;
+    virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) = 0;
     /** Get attributes for this object */
     virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) = 0;
     /** Modify attributes for this object. */
index d6ee772bc2e72f84f2fef7d08acdf11b31e457a0..50a29af6705cfb2c267fc4af12dbbbf15ae65a00 100644 (file)
@@ -527,7 +527,7 @@ namespace rgw::sal {
     return read_op.prepare(dpp);
   }
 
-  int DBObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
+  int DBObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags)
   {
     Attrs empty;
     DB::Object op_target(store->getDB(),
@@ -552,7 +552,7 @@ namespace rgw::sal {
     }
     set_atomic();
     state.attrset[attr_name] = attr_val;
-    return set_obj_attrs(dpp, &state.attrset, nullptr, y);
+    return set_obj_attrs(dpp, &state.attrset, nullptr, y, 0);
   }
 
   int DBObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y)
@@ -562,7 +562,7 @@ namespace rgw::sal {
 
     set_atomic();
     rmattr[attr_name] = bl;
-    return set_obj_attrs(dpp, nullptr, &rmattr, y);
+    return set_obj_attrs(dpp, nullptr, &rmattr, y, 0);
   }
 
   bool DBObject::is_expired() {
index 417cc7111c681138776bfd4a756b1e4c1b9f321d..67c1521f7faca3a153d25ce8d29c60519317fb49 100644 (file)
@@ -550,7 +550,7 @@ protected:
       virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
 
       virtual int get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **state, optional_yield y, bool follow_olh = true) override;
-      virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) override;
+      virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y, uint32_t flags) override;
       virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj = NULL) override;
       virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) override;
       virtual int delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) override;
index a04eb733af28f7a35dc749463b1f3df7f5e4ef5e..30216d4701ceb1ef9487032c7345399d7f464152 100644 (file)
@@ -1053,9 +1053,9 @@ int FilterObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **pst
 }
 
 int FilterObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                               Attrs* delattrs, optional_yield y)
+                               Attrs* delattrs, optional_yield y, uint32_t flags)
 {
-  return next->set_obj_attrs(dpp, setattrs, delattrs, y);
+  return next->set_obj_attrs(dpp, setattrs, delattrs, y, flags);
 }
 
 int FilterObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
index 344aaf9d5b5af222c6fe52213671df1e0c1ad25d..8adcd0f600ae7f1b6f468aceef48231d81e97209 100644 (file)
@@ -754,6 +754,8 @@ public:
   virtual bool is_prefetch_data() override { return next->is_prefetch_data(); }
   virtual void set_compressed() override { return next->set_compressed(); }
   virtual bool is_compressed() override { return next->is_compressed(); }
+  virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+    const ceph::real_time& obj_mtime) override { return next->is_sync_completed(dpp, obj_mtime); }
   virtual void invalidate() override { return next->invalidate(); }
   virtual bool empty() const override { return next->empty(); }
   virtual const std::string &get_name() const override { return next->get_name(); }
@@ -762,7 +764,7 @@ public:
                            optional_yield y, bool follow_olh = true) override;
   virtual void set_obj_state(RGWObjState& _state) override { return next->set_obj_state(_state); }
   virtual int set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs,
-                           Attrs* delattrs, optional_yield y) override;
+                           Attrs* delattrs, optional_yield y, uint32_t flags) override;
   virtual int get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp,
                            rgw_obj* target_obj = NULL) override;
   virtual int modify_obj_attrs(const char* attr_name, bufferlist& attr_val,
index 1e3d76309847b73fc0ac6a79ae88175f7e029550..1bde38c9ac3fd7015151e3cd0e65e8017e51ff79 100644 (file)
@@ -206,6 +206,8 @@ class StoreObject : public Object {
     virtual bool is_prefetch_data() override { return state.prefetch_data; }
     virtual void set_compressed() override { state.compressed = true; }
     virtual bool is_compressed() override { return state.compressed; }
+    virtual bool is_sync_completed(const DoutPrefixProvider* dpp,
+      const ceph::real_time& obj_mtime) override { return false; }
     virtual void invalidate() override {
       rgw_obj obj = state.obj;
       bool is_atomic = state.is_atomic;
index 0568262de6756587c6481d1e3d39f6d1f34d470c..b65752959e9c93bb456d2656e2f7999030a62c00 100644 (file)
@@ -74,6 +74,14 @@ void rgw_sync_pipe_filter::set_prefix(std::optional<std::string> opt_prefix,
   }
 }
 
+bool rgw_sync_pipe_filter::check_prefix(const std::string& obj_name) const
+{
+  if (prefix.has_value()) {
+    return boost::starts_with(obj_name, prefix.value());
+  }
+  return true;
+}
+
 void rgw_sync_pipe_filter::set_tags(std::list<std::string>& tags_add,
                                     std::list<std::string>& tags_rm)
 {
index ec9d1f2c623417dc01029ab31f050c6afe76c4d3..062fb11532440d933a559ff2904954cdc022285c 100644 (file)
@@ -244,6 +244,7 @@ struct rgw_sync_pipe_filter {
   bool check_tag(const std::string& k, const std::string& v) const;
   bool check_tags(const std::vector<std::string>& tags) const;
   bool check_tags(const RGWObjTags::tag_map_t& tags) const;
+  bool check_prefix(const std::string& obj_name) const;
 };
 WRITE_CLASS_ENCODER(rgw_sync_pipe_filter)
 
index fe3e012b4b509177b30a1c29be6138d2a85b5886..8e55a2b7fa992b1990802f807741ef4c43dfc72a 100644 (file)
@@ -1928,36 +1928,34 @@ def test_role_delete_sync():
                       zone.iam_conn.get_role, RoleName=role_name)
         log.info(f'success, zone: {zone.name} does not have role: {role_name}')
 
-def test_object_acl():
+def test_replication_status():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
-    primary = zonegroup_conns.rw_zones[0]
-    secondary = zonegroup_conns.rw_zones[1]
-
-    bucket = primary.create_bucket(gen_bucket_name())
-    log.debug('created bucket=%s', bucket.name)
+    zone = zonegroup_conns.rw_zones[0]
 
-    # upload a dummy object and wait for sync.
-    k = new_key(primary, bucket, 'dummy')
+    bucket = zone.conn.create_bucket(gen_bucket_name())
+    obj_name = "a"
+    k = new_key(zone, bucket.name, obj_name)
     k.set_contents_from_string('foo')
     zonegroup_meta_checkpoint(zonegroup)
-    zonegroup_data_checkpoint(zonegroup_conns)
+    zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
-    #check object on secondary before setacl
-    bucket2 = get_bucket(secondary, bucket.name)
-    before_set_acl = bucket2.get_acl(k)
-    assert(len(before_set_acl.acl.grants) == 1)
+    head_res = zone.head_object(bucket.name, obj_name)
+    log.info("checking if object has PENDING ReplicationStatus")
+    assert(head_res["ReplicationStatus"] == "PENDING")
 
-    #set object acl on primary and wait for sync.
-    bucket.set_canned_acl('public-read', key_name=k)
-    log.debug('set acl=%s', bucket.name)
+    bilog_autotrim(zone.zone)
     zonegroup_data_checkpoint(zonegroup_conns)
     zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
 
-    #check object secondary after setacl
-    bucket2 = get_bucket(secondary, bucket.name)
-    after_set_acl = bucket2.get_acl(k)
-    assert(len(after_set_acl.acl.grants) == 2) # read grant added on AllUsers
+    head_res = zone.head_object(bucket.name, obj_name)
+    log.info("checking if object has COMPLETED ReplicationStatus")
+    assert(head_res["ReplicationStatus"] == "COMPLETED")
+
+    log.info("checking that ReplicationStatus update did not write a bilog")
+    bilog = bilog_list(zone.zone, bucket.name)
+    assert(len(bilog) == 0)
+
 
 @attr('fails_with_rgw')
 @attr('data_sync_init')
index 3761676a3d781368f2267b6495de9c80541d5047..ce0530543e061f7f199b741c84f709eb6347c6e5 100644 (file)
@@ -170,6 +170,9 @@ class RadosZone(Zone):
               return out['TopicConfigurations']
             return []
 
+        def head_object(self, bucket_name, obj_name):
+            return self.s3_client.head_object(Bucket=bucket_name, Key=obj_name)
+
     def get_conn(self, credentials):
         return self.Conn(self, credentials)