rgw_bucket_sync_pipe sync_pipe;
std::shared_ptr<RGWUserPermHandler::Bucket> bucket_perms;
+ std::optional<rgw_sync_pipe_dest_params> verify_dest_params;
std::optional<ceph::real_time> mtime;
std::optional<string> etag;
std::unique_ptr<rgw::auth::Identity> identity;
+ std::shared_ptr<bool> need_retry;
+
public:
RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe,
- std::shared_ptr<RGWUserPermHandler::Bucket>& _bucket_perms) : sync_pipe(_sync_pipe),
- bucket_perms(_bucket_perms) {}
+ std::shared_ptr<RGWUserPermHandler::Bucket>& _bucket_perms,
+ std::optional<rgw_sync_pipe_dest_params>&& _verify_dest_params,
+ std::shared_ptr<bool>& _need_retry) : sync_pipe(_sync_pipe),
+ bucket_perms(_bucket_perms),
+ verify_dest_params(std::move(_verify_dest_params)),
+ need_retry(_need_retry) {
+ *need_retry = false;
+ }
int filter(CephContext *cct,
const rgw_obj_key& source_key,
return abort_err;
}
+ if (verify_dest_params &&
+ !(*verify_dest_params == params.dest)) {
+ /* raced! original dest params were different, will need to retry */
+ ldout(cct, 0) << "WARNING: " << __func__ << ": pipe dest params are different than original params, must have raced with object rewrite, retrying" << dendl;
+ *need_retry = true;
+ return -ECANCELED;
+ }
+
std::optional<std::map<string, bufferlist> > new_attrs;
if (params.dest.acl_translation) {
map<string, string> src_headers;
std::optional<rgw_user> param_user;
- std::optional<rgw_user> param_acl_translation;
- std::optional<string> param_storage_class;
rgw_sync_pipe_params::Mode param_mode;
std::optional<RGWUserPermHandler> user_perms;
std::shared_ptr<RGWUserPermHandler::Bucket> source_bucket_perms;
RGWUserPermHandler::Bucket dest_bucket_perms;
+
+ std::optional<rgw_sync_pipe_dest_params> dest_params;
+
+ int try_num{0};
+ std::shared_ptr<bool> need_retry;
public:
RGWObjFetchCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe,
key(_key),
dest_key(_dest_key),
versioned_epoch(_versioned_epoch),
- zones_trace(_zones_trace) {}
+ zones_trace(_zones_trace) {
+ }
int operate() override {
reenter(this) {
- {
- if (!sync_pipe.info.handler.find_basic_info_without_tags(key,
- ¶m_user,
- ¶m_acl_translation,
- ¶m_storage_class,
- ¶m_mode,
- &need_more_info)) {
- if (!need_more_info) {
- return set_cr_error(-ERR_PRECONDITION_FAILED);
+#define MAX_RACE_RETRIES_OBJ_FETCH 10
+ for (try_num = 0; try_num < MAX_RACE_RETRIES_OBJ_FETCH; ++try_num) {
+
+ {
+ std::optional<rgw_user> param_acl_translation;
+ std::optional<string> param_storage_class;
+
+ if (!sync_pipe.info.handler.find_basic_info_without_tags(key,
+ ¶m_user,
+ ¶m_acl_translation,
+ ¶m_storage_class,
+ ¶m_mode,
+ &need_more_info)) {
+ if (!need_more_info) {
+ return set_cr_error(-ERR_PRECONDITION_FAILED);
+ }
}
}
- }
- if (need_more_info) {
- ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl;
- /*
- * we need to fetch info about source object, so that we can determine
- * the correct policy configuration. This can happen if there are multiple
- * policy rules, and some depend on the object tagging */
- yield call(new RGWStatRemoteObjCR(sync_env->async_rados,
- sync_env->store,
- sc->source_zone,
- sync_pipe.info.source_bs.bucket,
- key,
- &src_mtime,
- &src_size,
- &src_etag,
- &src_attrs,
- &src_headers));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
-
- RGWObjTags obj_tags;
-
- auto iter = src_attrs.find(RGW_ATTR_TAGS);
- if (iter != src_attrs.end()) {
- try {
- auto it = iter->second.cbegin();
- obj_tags.decode(it);
- } catch (buffer::error &err) {
- ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+ if (need_more_info) {
+ ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl;
+ /*
+ * we need to fetch info about source object, so that we can determine
+ * the correct policy configuration. This can happen if there are multiple
+ * policy rules, and some depend on the object tagging */
+ yield call(new RGWStatRemoteObjCR(sync_env->async_rados,
+ sync_env->store,
+ sc->source_zone,
+ sync_pipe.info.source_bs.bucket,
+ key,
+ &src_mtime,
+ &src_size,
+ &src_etag,
+ &src_attrs,
+ &src_headers));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
}
- }
- rgw_sync_pipe_params params;
- if (!sync_pipe.info.handler.find_obj_params(key,
- obj_tags.get_tags(),
- ¶ms)) {
- return set_cr_error(-ERR_PRECONDITION_FAILED);
- }
+ RGWObjTags obj_tags;
- param_user = params.user;
- if (params.dest.acl_translation) {
- param_acl_translation = params.dest.acl_translation->owner;
- }
- param_storage_class = params.dest.storage_class;
- param_mode = params.mode;
- }
+ auto iter = src_attrs.find(RGW_ATTR_TAGS);
+ if (iter != src_attrs.end()) {
+ try {
+ auto it = iter->second.cbegin();
+ obj_tags.decode(it);
+ } catch (buffer::error &err) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+ }
+ }
- if (param_mode == rgw_sync_pipe_params::MODE_USER) {
- if (!param_user) {
- ldout(cct, 20) << "ERROR: " << __func__ << ": user level sync but user param not set" << dendl;
- return set_cr_error(-EPERM);
- }
- user_perms.emplace(sync_env, *param_user);
+ rgw_sync_pipe_params params;
+ if (!sync_pipe.info.handler.find_obj_params(key,
+ obj_tags.get_tags(),
+ ¶ms)) {
+ return set_cr_error(-ERR_PRECONDITION_FAILED);
+ }
- yield call(user_perms->init_cr());
- if (retcode < 0) {
- ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init user perms manager for uid=" << *param_user << dendl;
- return set_cr_error(retcode);
- }
+ param_user = params.user;
+ param_mode = params.mode;
- /* verify that user is allowed to write at the target bucket */
- int r = user_perms->init_bucket(sync_pipe.dest_bucket_info,
- sync_pipe.dest_bucket_attrs,
- &dest_bucket_perms);
- if (r < 0) {
- ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
- return set_cr_error(retcode);
+ dest_params = params.dest;
}
- if (!dest_bucket_perms.verify_bucket_permission(RGW_PERM_WRITE)) {
- ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe.info.dest_bs.bucket.get_key() << ")" << dendl;
- return -EPERM;
+ if (param_mode == rgw_sync_pipe_params::MODE_USER) {
+ if (!param_user) {
+ ldout(cct, 20) << "ERROR: " << __func__ << ": user level sync but user param not set" << dendl;
+ return set_cr_error(-EPERM);
+ }
+ user_perms.emplace(sync_env, *param_user);
+
+ yield call(user_perms->init_cr());
+ if (retcode < 0) {
+ ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init user perms manager for uid=" << *param_user << dendl;
+ return set_cr_error(retcode);
+ }
+
+ /* verify that user is allowed to write at the target bucket */
+ int r = user_perms->init_bucket(sync_pipe.dest_bucket_info,
+ sync_pipe.dest_bucket_attrs,
+ &dest_bucket_perms);
+ if (r < 0) {
+ ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (!dest_bucket_perms.verify_bucket_permission(RGW_PERM_WRITE)) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe.info.dest_bs.bucket.get_key() << ")" << dendl;
+ return -EPERM;
+ }
+
+ /* init source bucket permission structure */
+ source_bucket_perms = make_shared<RGWUserPermHandler::Bucket>();
+ r = user_perms->init_bucket(sync_pipe.source_bucket_info,
+ sync_pipe.source_bucket_attrs,
+ source_bucket_perms.get());
+ if (r < 0) {
+ ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
+ return set_cr_error(retcode);
+ }
}
- /* init source bucket permission structure */
- source_bucket_perms = make_shared<RGWUserPermHandler::Bucket>();
- r = user_perms->init_bucket(sync_pipe.source_bucket_info,
- sync_pipe.source_bucket_attrs,
- source_bucket_perms.get());
- if (r < 0) {
- ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
+ yield {
+ if (!need_retry) {
+ need_retry = make_shared<bool>();
+ }
+ auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe,
+ source_bucket_perms,
+ std::move(dest_params),
+ need_retry);
+
+ call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+ nullopt,
+ sync_pipe.info.source_bs.bucket,
+ std::nullopt, sync_pipe.dest_bucket_info,
+ key, dest_key, versioned_epoch,
+ true,
+ std::static_pointer_cast<RGWFetchObjFilter>(filter),
+ zones_trace, sync_env->counters, sync_env->dpp));
+ }
+ if (retcode < 0) {
+ if (*need_retry) {
+ continue;
+ }
return set_cr_error(retcode);
}
- }
- yield {
-
- auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe, source_bucket_perms);
-
-#warning FIXME: race guard
- call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
- param_user,
- sync_pipe.info.source_bs.bucket,
- std::nullopt, sync_pipe.dest_bucket_info,
- key, dest_key, versioned_epoch,
- true,
- std::static_pointer_cast<RGWFetchObjFilter>(filter),
- zones_trace, sync_env->counters, sync_env->dpp));
- }
- if (retcode < 0) {
- return set_cr_error(retcode);
+ return set_cr_done();
}
- return set_cr_done();
+ ldout(cct, 0) << "ERROR: " << __func__ << ": Too many retries trying to fetch object, possibly a bug: bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << " key=" << key << dendl;
+
+ return set_cr_error(-EIO);
}
return 0;
}