From: Yehuda Sadeh Date: Tue, 26 Nov 2019 20:14:35 +0000 (-0800) Subject: rgw: data sync: guard against racing source object change X-Git-Tag: v15.1.0~22^2~42 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=009608dd50744d662b4e55b7609098cf479ec9f6;p=ceph.git rgw: data sync: guard against racing source object change Source object might have changed since we tried to sync it, and the decisions we made originally might be irrelevant. After reading object header, check if the original dest params are the same dest params we find now for this object, otherwise need to retry. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 473e3c667dae..03793eccb263 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2149,6 +2149,7 @@ class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default { rgw_bucket_sync_pipe sync_pipe; std::shared_ptr bucket_perms; + std::optional verify_dest_params; std::optional mtime; std::optional etag; @@ -2156,10 +2157,18 @@ class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default { std::unique_ptr identity; + std::shared_ptr need_retry; + public: RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe, - std::shared_ptr& _bucket_perms) : sync_pipe(_sync_pipe), - bucket_perms(_bucket_perms) {} + std::shared_ptr& _bucket_perms, + std::optional&& _verify_dest_params, + std::shared_ptr& _need_retry) : sync_pipe(_sync_pipe), + bucket_perms(_bucket_perms), + verify_dest_params(std::move(_verify_dest_params)), + need_retry(_need_retry) { + *need_retry = false; + } int filter(CephContext *cct, const rgw_obj_key& source_key, @@ -2200,6 +2209,14 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct, return abort_err; } + if (verify_dest_params && + !(*verify_dest_params == params.dest)) { + /* raced! original dest params were different, will need to retry */ + ldout(cct, 0) << "WARNING: " << __func__ << ": pipe dest params are different than original params, must have raced with object rewrite, retrying" << dendl; + *need_retry = true; + return -ECANCELED; + } + std::optional > new_attrs; if (params.dest.acl_translation) { @@ -2257,13 +2274,16 @@ class RGWObjFetchCR : public RGWCoroutine { map src_headers; std::optional param_user; - std::optional param_acl_translation; - std::optional param_storage_class; rgw_sync_pipe_params::Mode param_mode; std::optional user_perms; std::shared_ptr source_bucket_perms; RGWUserPermHandler::Bucket dest_bucket_perms; + + std::optional dest_params; + + int try_num{0}; + std::shared_ptr need_retry; public: RGWObjFetchCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, @@ -2276,129 +2296,146 @@ public: key(_key), dest_key(_dest_key), versioned_epoch(_versioned_epoch), - zones_trace(_zones_trace) {} + zones_trace(_zones_trace) { + } int operate() override { reenter(this) { - { - if (!sync_pipe.info.handler.find_basic_info_without_tags(key, - ¶m_user, - ¶m_acl_translation, - ¶m_storage_class, - ¶m_mode, - &need_more_info)) { - if (!need_more_info) { - return set_cr_error(-ERR_PRECONDITION_FAILED); +#define MAX_RACE_RETRIES_OBJ_FETCH 10 + for (try_num = 0; try_num < MAX_RACE_RETRIES_OBJ_FETCH; ++try_num) { + + { + std::optional param_acl_translation; + std::optional param_storage_class; + + if (!sync_pipe.info.handler.find_basic_info_without_tags(key, + ¶m_user, + ¶m_acl_translation, + ¶m_storage_class, + ¶m_mode, + &need_more_info)) { + if (!need_more_info) { + return set_cr_error(-ERR_PRECONDITION_FAILED); + } } } - } - if (need_more_info) { - ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl; - /* - * we need to fetch info about source object, so that we can determine - * the correct policy configuration. This can happen if there are multiple - * policy rules, and some depend on the object tagging */ - yield call(new RGWStatRemoteObjCR(sync_env->async_rados, - sync_env->store, - sc->source_zone, - sync_pipe.info.source_bs.bucket, - key, - &src_mtime, - &src_size, - &src_etag, - &src_attrs, - &src_headers)); - if (retcode < 0) { - return set_cr_error(retcode); - } - - RGWObjTags obj_tags; - - auto iter = src_attrs.find(RGW_ATTR_TAGS); - if (iter != src_attrs.end()) { - try { - auto it = iter->second.cbegin(); - obj_tags.decode(it); - } catch (buffer::error &err) { - ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl; + if (need_more_info) { + ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl; + /* + * we need to fetch info about source object, so that we can determine + * the correct policy configuration. This can happen if there are multiple + * policy rules, and some depend on the object tagging */ + yield call(new RGWStatRemoteObjCR(sync_env->async_rados, + sync_env->store, + sc->source_zone, + sync_pipe.info.source_bs.bucket, + key, + &src_mtime, + &src_size, + &src_etag, + &src_attrs, + &src_headers)); + if (retcode < 0) { + return set_cr_error(retcode); } - } - rgw_sync_pipe_params params; - if (!sync_pipe.info.handler.find_obj_params(key, - obj_tags.get_tags(), - ¶ms)) { - return set_cr_error(-ERR_PRECONDITION_FAILED); - } + RGWObjTags obj_tags; - param_user = params.user; - if (params.dest.acl_translation) { - param_acl_translation = params.dest.acl_translation->owner; - } - param_storage_class = params.dest.storage_class; - param_mode = params.mode; - } + auto iter = src_attrs.find(RGW_ATTR_TAGS); + if (iter != src_attrs.end()) { + try { + auto it = iter->second.cbegin(); + obj_tags.decode(it); + } catch (buffer::error &err) { + ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl; + } + } - if (param_mode == rgw_sync_pipe_params::MODE_USER) { - if (!param_user) { - ldout(cct, 20) << "ERROR: " << __func__ << ": user level sync but user param not set" << dendl; - return set_cr_error(-EPERM); - } - user_perms.emplace(sync_env, *param_user); + rgw_sync_pipe_params params; + if (!sync_pipe.info.handler.find_obj_params(key, + obj_tags.get_tags(), + ¶ms)) { + return set_cr_error(-ERR_PRECONDITION_FAILED); + } - yield call(user_perms->init_cr()); - if (retcode < 0) { - ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init user perms manager for uid=" << *param_user << dendl; - return set_cr_error(retcode); - } + param_user = params.user; + param_mode = params.mode; - /* verify that user is allowed to write at the target bucket */ - int r = user_perms->init_bucket(sync_pipe.dest_bucket_info, - sync_pipe.dest_bucket_attrs, - &dest_bucket_perms); - if (r < 0) { - ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl; - return set_cr_error(retcode); + dest_params = params.dest; } - if (!dest_bucket_perms.verify_bucket_permission(RGW_PERM_WRITE)) { - ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe.info.dest_bs.bucket.get_key() << ")" << dendl; - return -EPERM; + if (param_mode == rgw_sync_pipe_params::MODE_USER) { + if (!param_user) { + ldout(cct, 20) << "ERROR: " << __func__ << ": user level sync but user param not set" << dendl; + return set_cr_error(-EPERM); + } + user_perms.emplace(sync_env, *param_user); + + yield call(user_perms->init_cr()); + if (retcode < 0) { + ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init user perms manager for uid=" << *param_user << dendl; + return set_cr_error(retcode); + } + + /* verify that user is allowed to write at the target bucket */ + int r = user_perms->init_bucket(sync_pipe.dest_bucket_info, + sync_pipe.dest_bucket_attrs, + &dest_bucket_perms); + if (r < 0) { + ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl; + return set_cr_error(retcode); + } + + if (!dest_bucket_perms.verify_bucket_permission(RGW_PERM_WRITE)) { + ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe.info.dest_bs.bucket.get_key() << ")" << dendl; + return -EPERM; + } + + /* init source bucket permission structure */ + source_bucket_perms = make_shared(); + r = user_perms->init_bucket(sync_pipe.source_bucket_info, + sync_pipe.source_bucket_attrs, + source_bucket_perms.get()); + if (r < 0) { + ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl; + return set_cr_error(retcode); + } } - /* init source bucket permission structure */ - source_bucket_perms = make_shared(); - r = user_perms->init_bucket(sync_pipe.source_bucket_info, - sync_pipe.source_bucket_attrs, - source_bucket_perms.get()); - if (r < 0) { - ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl; + yield { + if (!need_retry) { + need_retry = make_shared(); + } + auto filter = make_shared(sync_pipe, + source_bucket_perms, + std::move(dest_params), + need_retry); + + call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, + nullopt, + sync_pipe.info.source_bs.bucket, + std::nullopt, sync_pipe.dest_bucket_info, + key, dest_key, versioned_epoch, + true, + std::static_pointer_cast(filter), + zones_trace, sync_env->counters, sync_env->dpp)); + } + if (retcode < 0) { + if (*need_retry) { + continue; + } return set_cr_error(retcode); } - } - yield { - - auto filter = make_shared(sync_pipe, source_bucket_perms); - -#warning FIXME: race guard - call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, - param_user, - sync_pipe.info.source_bs.bucket, - std::nullopt, sync_pipe.dest_bucket_info, - key, dest_key, versioned_epoch, - true, - std::static_pointer_cast(filter), - zones_trace, sync_env->counters, sync_env->dpp)); - } - if (retcode < 0) { - return set_cr_error(retcode); + return set_cr_done(); } - return set_cr_done(); + ldout(cct, 0) << "ERROR: " << __func__ << ": Too many retries trying to fetch object, possibly a bug: bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << " key=" << key << dendl; + + return set_cr_error(-EIO); } return 0; } diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 0aff6b0f894e..091ce7ddce09 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -323,6 +323,11 @@ struct rgw_sync_pipe_dest_params { void dump(ceph::Formatter *f) const; void decode_json(JSONObj *obj); + + bool operator==(const rgw_sync_pipe_dest_params& rhs) const { + return (acl_translation == rhs.acl_translation && + storage_class == rhs.storage_class); + } }; WRITE_CLASS_ENCODER(rgw_sync_pipe_dest_params)