]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: data sync: guard against racing source object change
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 26 Nov 2019 20:14:35 +0000 (12:14 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:39 +0000 (10:20 -0800)
Source object might have changed since we tried to sync it,
and the decisions we made originally might be irrelevant.
After reading object header, check if the original dest params
are the same dest params we find now for this object, otherwise
need to retry.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync_policy.h

index 473e3c667dae63c7805babdcc8fef84379768ca3..03793eccb26349d6c077f24c439934e74d63c812 100644 (file)
@@ -2149,6 +2149,7 @@ class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default {
   rgw_bucket_sync_pipe sync_pipe;
 
   std::shared_ptr<RGWUserPermHandler::Bucket> bucket_perms;
+  std::optional<rgw_sync_pipe_dest_params> verify_dest_params;
 
   std::optional<ceph::real_time> mtime;
   std::optional<string> etag;
@@ -2156,10 +2157,18 @@ class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default {
 
   std::unique_ptr<rgw::auth::Identity> identity;
 
+  std::shared_ptr<bool> need_retry;
+
 public:
   RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe,
-                         std::shared_ptr<RGWUserPermHandler::Bucket>& _bucket_perms) : sync_pipe(_sync_pipe),
-                                                                                       bucket_perms(_bucket_perms) {}
+                         std::shared_ptr<RGWUserPermHandler::Bucket>& _bucket_perms,
+                         std::optional<rgw_sync_pipe_dest_params>&& _verify_dest_params,
+                         std::shared_ptr<bool>& _need_retry) : sync_pipe(_sync_pipe),
+                                         bucket_perms(_bucket_perms),
+                                         verify_dest_params(std::move(_verify_dest_params)),
+                                         need_retry(_need_retry) {
+    *need_retry = false;
+  }
 
   int filter(CephContext *cct,
              const rgw_obj_key& source_key,
@@ -2200,6 +2209,14 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct,
     return abort_err;
   }
 
+  if (verify_dest_params &&
+      !(*verify_dest_params == params.dest)) {
+    /* raced! original dest params were different, will need to retry */
+    ldout(cct, 0) << "WARNING: " << __func__ << ": pipe dest params are different than original params, must have raced with object rewrite, retrying" << dendl;
+    *need_retry = true;
+    return -ECANCELED;
+  }
+
   std::optional<std::map<string, bufferlist> > new_attrs;
 
   if (params.dest.acl_translation) {
@@ -2257,13 +2274,16 @@ class RGWObjFetchCR : public RGWCoroutine {
   map<string, string> src_headers;
 
   std::optional<rgw_user> param_user;
-  std::optional<rgw_user> param_acl_translation;
-  std::optional<string> param_storage_class;
   rgw_sync_pipe_params::Mode param_mode;
 
   std::optional<RGWUserPermHandler> user_perms;
   std::shared_ptr<RGWUserPermHandler::Bucket> source_bucket_perms;
   RGWUserPermHandler::Bucket dest_bucket_perms;
+
+  std::optional<rgw_sync_pipe_dest_params> dest_params;
+
+  int try_num{0};
+  std::shared_ptr<bool> need_retry;
 public:
   RGWObjFetchCR(RGWDataSyncCtx *_sc,
                 rgw_bucket_sync_pipe& _sync_pipe,
@@ -2276,129 +2296,146 @@ public:
                                               key(_key),
                                               dest_key(_dest_key),
                                               versioned_epoch(_versioned_epoch),
-                                              zones_trace(_zones_trace) {}
+                                              zones_trace(_zones_trace) {
+  }
 
 
   int operate() override {
     reenter(this) {
 
-      {
-        if (!sync_pipe.info.handler.find_basic_info_without_tags(key,
-                                                                 &param_user,
-                                                                 &param_acl_translation,
-                                                                 &param_storage_class,
-                                                                 &param_mode,
-                                                                 &need_more_info)) {
-          if (!need_more_info) {
-            return set_cr_error(-ERR_PRECONDITION_FAILED);
+#define MAX_RACE_RETRIES_OBJ_FETCH 10
+      for (try_num = 0; try_num < MAX_RACE_RETRIES_OBJ_FETCH; ++try_num) {
+
+        {
+          std::optional<rgw_user> param_acl_translation;
+          std::optional<string> param_storage_class;
+
+          if (!sync_pipe.info.handler.find_basic_info_without_tags(key,
+                                                                   &param_user,
+                                                                   &param_acl_translation,
+                                                                   &param_storage_class,
+                                                                   &param_mode,
+                                                                   &need_more_info)) {
+            if (!need_more_info) {
+              return set_cr_error(-ERR_PRECONDITION_FAILED);
+            }
           }
         }
-      }
 
-      if (need_more_info) {
-        ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl;
-        /*
-         * we need to fetch info about source object, so that we can determine
-         * the correct policy configuration. This can happen if there are multiple
-         * policy rules, and some depend on the object tagging */
-        yield call(new RGWStatRemoteObjCR(sync_env->async_rados,
-                                          sync_env->store,
-                                          sc->source_zone,
-                                          sync_pipe.info.source_bs.bucket,
-                                          key,
-                                          &src_mtime,
-                                          &src_size,
-                                          &src_etag,
-                                          &src_attrs,
-                                          &src_headers));
-        if (retcode < 0) {
-          return set_cr_error(retcode);
-        }
-    
-        RGWObjTags obj_tags;
-
-        auto iter = src_attrs.find(RGW_ATTR_TAGS);
-        if (iter != src_attrs.end()) {
-          try {
-            auto it = iter->second.cbegin();
-            obj_tags.decode(it);
-          } catch (buffer::error &err) {
-            ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+        if (need_more_info) {
+          ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl;
+          /*
+           * we need to fetch info about source object, so that we can determine
+           * the correct policy configuration. This can happen if there are multiple
+           * policy rules, and some depend on the object tagging */
+          yield call(new RGWStatRemoteObjCR(sync_env->async_rados,
+                                            sync_env->store,
+                                            sc->source_zone,
+                                            sync_pipe.info.source_bs.bucket,
+                                            key,
+                                            &src_mtime,
+                                            &src_size,
+                                            &src_etag,
+                                            &src_attrs,
+                                            &src_headers));
+          if (retcode < 0) {
+            return set_cr_error(retcode);
           }
-        }
 
-        rgw_sync_pipe_params params;
-        if (!sync_pipe.info.handler.find_obj_params(key,
-                                                    obj_tags.get_tags(),
-                                                    &params)) {
-          return set_cr_error(-ERR_PRECONDITION_FAILED);
-        }
+          RGWObjTags obj_tags;
 
-        param_user = params.user;
-        if (params.dest.acl_translation) {
-          param_acl_translation = params.dest.acl_translation->owner;
-        }
-        param_storage_class = params.dest.storage_class;
-        param_mode = params.mode;
-      }
+          auto iter = src_attrs.find(RGW_ATTR_TAGS);
+          if (iter != src_attrs.end()) {
+            try {
+              auto it = iter->second.cbegin();
+              obj_tags.decode(it);
+            } catch (buffer::error &err) {
+              ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+            }
+          }
 
-      if (param_mode == rgw_sync_pipe_params::MODE_USER) {
-        if (!param_user) {
-          ldout(cct, 20) << "ERROR: " << __func__ << ": user level sync but user param not set" << dendl;
-          return set_cr_error(-EPERM);
-        }
-        user_perms.emplace(sync_env, *param_user);
+          rgw_sync_pipe_params params;
+          if (!sync_pipe.info.handler.find_obj_params(key,
+                                                      obj_tags.get_tags(),
+                                                      &params)) {
+            return set_cr_error(-ERR_PRECONDITION_FAILED);
+          }
 
-        yield call(user_perms->init_cr());
-        if (retcode < 0) {
-          ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init user perms manager for uid=" << *param_user << dendl;
-          return set_cr_error(retcode);
-        }
+          param_user = params.user;
+          param_mode = params.mode;
 
-        /* verify that user is allowed to write at the target bucket */
-        int r = user_perms->init_bucket(sync_pipe.dest_bucket_info,
-                                        sync_pipe.dest_bucket_attrs,
-                                        &dest_bucket_perms);
-        if (r < 0) {
-          ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
-          return set_cr_error(retcode);
+          dest_params = params.dest;
         }
 
-        if (!dest_bucket_perms.verify_bucket_permission(RGW_PERM_WRITE)) {
-          ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe.info.dest_bs.bucket.get_key() << ")" << dendl;
-          return -EPERM;
+        if (param_mode == rgw_sync_pipe_params::MODE_USER) {
+          if (!param_user) {
+            ldout(cct, 20) << "ERROR: " << __func__ << ": user level sync but user param not set" << dendl;
+            return set_cr_error(-EPERM);
+          }
+          user_perms.emplace(sync_env, *param_user);
+
+          yield call(user_perms->init_cr());
+          if (retcode < 0) {
+            ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init user perms manager for uid=" << *param_user << dendl;
+            return set_cr_error(retcode);
+          }
+
+          /* verify that user is allowed to write at the target bucket */
+          int r = user_perms->init_bucket(sync_pipe.dest_bucket_info,
+                                          sync_pipe.dest_bucket_attrs,
+                                          &dest_bucket_perms);
+          if (r < 0) {
+            ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
+            return set_cr_error(retcode);
+          }
+
+          if (!dest_bucket_perms.verify_bucket_permission(RGW_PERM_WRITE)) {
+            ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe.info.dest_bs.bucket.get_key() << ")" << dendl;
+            return -EPERM;
+          }
+
+          /* init source bucket permission structure */
+          source_bucket_perms = make_shared<RGWUserPermHandler::Bucket>();
+          r = user_perms->init_bucket(sync_pipe.source_bucket_info,
+                                      sync_pipe.source_bucket_attrs,
+                                      source_bucket_perms.get());
+          if (r < 0) {
+            ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
+            return set_cr_error(retcode);
+          }
         }
 
-        /* init source bucket permission structure */
-        source_bucket_perms = make_shared<RGWUserPermHandler::Bucket>();
-        r = user_perms->init_bucket(sync_pipe.source_bucket_info,
-                                        sync_pipe.source_bucket_attrs,
-                                        source_bucket_perms.get());
-        if (r < 0) {
-          ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
+        yield {
+          if (!need_retry) {
+            need_retry = make_shared<bool>();
+          }
+          auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe,
+                                                            source_bucket_perms,
+                                                            std::move(dest_params),
+                                                            need_retry);
+
+          call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+                                       nullopt,
+                                       sync_pipe.info.source_bs.bucket,
+                                       std::nullopt, sync_pipe.dest_bucket_info,
+                                       key, dest_key, versioned_epoch,
+                                       true,
+                                       std::static_pointer_cast<RGWFetchObjFilter>(filter),
+                                       zones_trace, sync_env->counters, sync_env->dpp));
+        }
+        if (retcode < 0) {
+          if (*need_retry) {
+            continue;
+          }
           return set_cr_error(retcode);
         }
-      }
 
-      yield {
-
-        auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe, source_bucket_perms);
-
-#warning FIXME: race guard
-        call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
-                                     param_user,
-                                     sync_pipe.info.source_bs.bucket,
-                                     std::nullopt, sync_pipe.dest_bucket_info,
-                                     key, dest_key, versioned_epoch,
-                                     true,
-                                     std::static_pointer_cast<RGWFetchObjFilter>(filter),
-                                     zones_trace, sync_env->counters, sync_env->dpp));
-      }
-      if (retcode < 0) {
-        return set_cr_error(retcode);
+        return set_cr_done();
       }
 
-      return set_cr_done();
+      ldout(cct, 0) << "ERROR: " << __func__ << ": Too many retries trying to fetch object, possibly a bug: bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << " key=" << key << dendl;
+
+      return set_cr_error(-EIO);
     }
     return 0;
   }
index 0aff6b0f894e03955a7d15a724c4ad77737ae61f..091ce7ddce09c5d8114cc0750eabca784fe2e9f9 100644 (file)
@@ -323,6 +323,11 @@ struct rgw_sync_pipe_dest_params {
 
   void dump(ceph::Formatter *f) const;
   void decode_json(JSONObj *obj);
+
+  bool operator==(const rgw_sync_pipe_dest_params& rhs) const {
+    return (acl_translation == rhs.acl_translation &&
+           storage_class == rhs.storage_class);
+  }
 };
 WRITE_CLASS_ENCODER(rgw_sync_pipe_dest_params)