int operate() override;
};
+struct all_bucket_info {
+ RGWBucketInfo bucket_info;
+ map<string, bufferlist> attrs;
+};
+
struct rgw_sync_pipe_info_entity
{
private:
RGWBucketInfo bucket_info;
+ map<string, bufferlist> bucket_attrs;
bool _has_bucket_info{false};
public:
rgw_sync_pipe_info_entity() {}
rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e,
- std::optional<RGWBucketInfo>& binfo) {
+ std::optional<all_bucket_info>& binfo) {
if (e.zone) {
zone = *e.zone;
}
return;
}
if (!binfo ||
- binfo->bucket != *e.bucket) {
+ binfo->bucket_info.bucket != *e.bucket) {
bucket_info.bucket = *e.bucket;
} else {
set_bucket_info(*binfo);
}
}
- void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+ void update_empty_bucket_info(const std::map<rgw_bucket, all_bucket_info>& buckets_info) {
if (_has_bucket_info) {
return;
}
return _has_bucket_info;
}
- void set_bucket_info(const RGWBucketInfo& _bucket_info) {
- bucket_info = _bucket_info;
+ void set_bucket_info(const all_bucket_info& all_info) {
+ bucket_info = all_info.bucket_info;
+ bucket_attrs = all_info.attrs;
_has_bucket_info = true;
}
rgw_sync_pipe_handler_info() {}
rgw_sync_pipe_handler_info(const RGWBucketSyncFlowManager::pipe_handler& _handler,
- std::optional<RGWBucketInfo> source_bucket_info,
- std::optional<RGWBucketInfo> target_bucket_info) : handler(_handler),
- source(handler.source, source_bucket_info),
- target(handler.dest, target_bucket_info) {
+ std::optional<all_bucket_info> source_bucket_info,
+ std::optional<all_bucket_info> target_bucket_info) : handler(_handler),
+ source(handler.source, source_bucket_info),
+ target(handler.dest, target_bucket_info) {
}
bool operator<(const rgw_sync_pipe_handler_info& p) const {
return (target < p.target);
}
- void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+ void update_empty_bucket_info(const std::map<rgw_bucket, all_bucket_info>& buckets_info) {
source.update_empty_bucket_info(buckets_info);
target.update_empty_bucket_info(buckets_info);
}
}
void insert(const RGWBucketSyncFlowManager::pipe_handler& handler,
- std::optional<RGWBucketInfo>& source_bucket_info,
- std::optional<RGWBucketInfo>& target_bucket_info) {
+ std::optional<all_bucket_info>& source_bucket_info,
+ std::optional<all_bucket_info>& target_bucket_info) {
rgw_sync_pipe_handler_info p(handler, source_bucket_info, target_bucket_info);
handlers.insert(p);
}
return handlers.empty();
}
- void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
+ void update_empty_bucket_info(const std::map<rgw_bucket, all_bucket_info>& buckets_info) {
if (buckets_info.empty()) {
return;
}
return 0;
}
+class RGWUserPermHandler {
+ friend struct Init;
+ friend class Bucket;
+
+ RGWDataSyncEnv *sync_env;
+ rgw_user uid;
+
+ struct _info {
+ RGWUserInfo user_info;
+ rgw::IAM::Environment env;
+ std::unique_ptr<rgw::auth::Identity> identity;
+ RGWAccessControlPolicy user_acl;
+ };
+
+ std::shared_ptr<_info> info;
+
+ struct Init;
+
+ std::shared_ptr<Init> init_action;
+
+ struct Init : public RGWGenericAsyncCR::Action {
+ RGWDataSyncEnv *sync_env;
+
+ rgw_user uid;
+ std::shared_ptr<RGWUserPermHandler::_info> info;
+
+ int ret{0};
+
+ Init(RGWUserPermHandler *handler) : sync_env(handler->sync_env),
+ uid(handler->uid),
+ info(handler->info) {}
+ int operate() override {
+ auto user_ctl = sync_env->store->getRados()->ctl.user;
+
+ ret = user_ctl->get_info_by_uid(uid, &info->user_info, null_yield);
+ if (ret < 0) {
+ return ret;
+ }
+
+ info->identity = rgw::auth::transform_old_authinfo(sync_env->cct,
+ uid,
+ RGW_PERM_FULL_CONTROL,
+ false, /* system_request? */
+ TYPE_RGW);
+
+ map<string, bufferlist> uattrs;
+
+ ret = user_ctl->get_attrs_by_uid(uid, &uattrs, null_yield);
+ if (ret == 0) {
+ ret = RGWUserPermHandler::policy_from_attrs(sync_env->cct, uattrs, &info->user_acl);
+ }
+ if (ret == -ENOENT) {
+ info->user_acl.create_default(uid, info->user_info.display_name);
+ }
+
+ return 0;
+ }
+ };
+
+public:
+ RGWUserPermHandler(RGWDataSyncEnv *_sync_env,
+ const rgw_user& _uid) : sync_env(_sync_env),
+ uid(_uid) {}
+
+ RGWCoroutine *init_cr() {
+ info = make_shared<_info>();
+ init_action = make_shared<Init>(this);
+
+ return new RGWGenericAsyncCR(sync_env->cct,
+ sync_env->async_rados,
+ init_action);
+ }
+
+ class Bucket {
+ RGWDataSyncEnv *sync_env;
+ std::shared_ptr<_info> info;
+ RGWAccessControlPolicy bucket_acl;
+ std::optional<perm_state> ps;
+ public:
+ Bucket() {}
+
+ int init(RGWUserPermHandler *handler,
+ const RGWBucketInfo& bucket_info,
+ const map<string, bufferlist>& bucket_attrs);
+
+ bool verify_bucket_permission(int perm);
+ bool verify_object_permission(const map<string, bufferlist>& obj_attrs,
+ int perm);
+ };
+
+ static int policy_from_attrs(CephContext *cct,
+ const map<string, bufferlist>& attrs,
+ RGWAccessControlPolicy *acl) {
+ acl->set_ctx(cct);
+
+ auto aiter = attrs.find(RGW_ATTR_ACL);
+ if (aiter == attrs.end()) {
+ return -ENOENT;
+ }
+ auto iter = aiter->second.begin();
+ try {
+ acl->decode(iter);
+ } catch (buffer::error& err) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): could not decode policy, caught buffer::error" << dendl;
+ return -EIO;
+ }
+
+ return 0;
+ }
+
+ int init_bucket(const RGWBucketInfo& bucket_info,
+ const map<string, bufferlist>& bucket_attrs,
+ Bucket *bs) {
+ return bs->init(this, bucket_info, bucket_attrs);
+ }
+};
+
+int RGWUserPermHandler::Bucket::init(RGWUserPermHandler *handler,
+ const RGWBucketInfo& bucket_info,
+ const map<string, bufferlist>& bucket_attrs)
+{
+ sync_env = handler->sync_env;
+ info = handler->info;
+
+ int r = RGWUserPermHandler::policy_from_attrs(sync_env->cct, bucket_attrs, &bucket_acl);
+ if (r < 0) {
+ return r;
+ }
+
+ ps.emplace(sync_env->cct,
+ info->env,
+ info->identity.get(),
+ bucket_info,
+ info->identity->get_perm_mask(),
+ false, /* defer to bucket acls */
+ nullptr, /* referer */
+ false); /* request_payer */
+
+ return 0;
+}
+
+bool RGWUserPermHandler::Bucket::verify_bucket_permission(int perm)
+{
+ return verify_bucket_permission_no_policy(sync_env->dpp,
+ &(*ps),
+ &info->user_acl,
+ &bucket_acl,
+ perm);
+}
+
+bool RGWUserPermHandler::Bucket::verify_object_permission(const map<string, bufferlist>& obj_attrs,
+ int perm)
+{
+ RGWAccessControlPolicy obj_acl;
+
+ int r = policy_from_attrs(sync_env->cct, obj_attrs, &obj_acl);
+ if (r < 0) {
+ return r;
+ }
+
+ return verify_bucket_permission_no_policy(sync_env->dpp,
+ &(*ps),
+ &bucket_acl,
+ &obj_acl,
+ perm);
+}
+
class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default {
rgw_bucket_sync_pipe sync_pipe;
+ std::shared_ptr<RGWUserPermHandler::Bucket> bucket_perms;
+
std::optional<ceph::real_time> mtime;
std::optional<string> etag;
std::optional<uint64_t> obj_size;
+ std::unique_ptr<rgw::auth::Identity> identity;
+
public:
- RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe) : sync_pipe(_sync_pipe) {
- }
+ RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe,
+ std::shared_ptr<RGWUserPermHandler::Bucket>& _bucket_perms) : sync_pipe(_sync_pipe),
+ bucket_perms(_bucket_perms) {}
int filter(CephContext *cct,
const rgw_obj_key& source_key,
std::optional<std::map<string, bufferlist> > new_attrs;
if (params.mode == rgw_sync_pipe_params::MODE_USER) {
-#if 0
- RGWAccessControlPolicy policy;
- auto iter = obj_attrs.find(RGW_ATTR_ACL);
- if (iter == obj_attrs.end()) {
- ldout(cct, 0) << "ERROR: " << __func__ << ": No policy header to object: aborting sync" << dendl;
- return abort_err;
- }
- try {
- auto it = iter->second.cbegin();
- decode(policy, it);
- } catch (buffer::error &err) {
- ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode policy " << dendl;
- return abort_err;
+ if (!bucket_perms->verify_object_permission(obj_attrs, RGW_PERM_READ)) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to fetch object" << dendl;
+ return -EPERM;
}
-#endif
-
}
if (!dest_placement_rule &&
prule);
}
-
class RGWObjFetchCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_sync_pipe& sync_pipe;
rgw_obj_key& key;
+ std::optional<rgw_obj_key> dest_key;
std::optional<uint64_t> versioned_epoch;
rgw_zone_set *zones_trace;
std::optional<rgw_user> param_acl_translation;
std::optional<string> param_storage_class;
rgw_sync_pipe_params::Mode param_mode;
+
+ std::optional<RGWUserPermHandler> user_perms;
+ std::shared_ptr<RGWUserPermHandler::Bucket> source_bucket_perms;
+ RGWUserPermHandler::Bucket dest_bucket_perms;
public:
RGWObjFetchCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe,
rgw_obj_key& _key,
+ std::optional<rgw_obj_key> _dest_key,
std::optional<uint64_t> _versioned_epoch,
rgw_zone_set *_zones_trace) : RGWCoroutine(_sc->cct),
sc(_sc), sync_env(_sc->env),
sync_pipe(_sync_pipe),
key(_key),
+ dest_key(_dest_key),
versioned_epoch(_versioned_epoch),
zones_trace(_zones_trace) {}
param_mode = params.mode;
}
- yield {
- auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
+ if (param_mode == rgw_sync_pipe_params::MODE_USER) {
+ if (!param_user) {
+ ldout(cct, 20) << "ERROR: " << __func__ << ": user level sync but user param not set" << dendl;
+ return set_cr_error(-EPERM);
+ }
+ user_perms.emplace(sync_env, *param_user);
- std::optional<rgw_user> uid;
- if (param_mode == rgw_sync_pipe_params::MODE_USER) {
- uid = param_user;
+ yield call(user_perms->init_cr());
+ if (retcode < 0) {
+ ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init user perms manager for uid=" << *param_user << dendl;
+ return set_cr_error(retcode);
+ }
+
+ /* verify that user is allowed to write at the target bucket */
+ int r = user_perms->init_bucket(sync_pipe.dest_bucket_info,
+ sync_pipe.dest_bucket_attrs,
+ &dest_bucket_perms);
+ if (r < 0) {
+ ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (!dest_bucket_perms.verify_bucket_permission(RGW_PERM_WRITE)) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe.info.dest_bs.bucket.get_key() << ")" << dendl;
+ return -EPERM;
}
+
+ /* init source bucket permission structure */
+ source_bucket_perms = make_shared<RGWUserPermHandler::Bucket>();
+ r = user_perms->init_bucket(sync_pipe.source_bucket_info,
+ sync_pipe.source_bucket_attrs,
+ source_bucket_perms.get());
+ if (r < 0) {
+ ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
+ return set_cr_error(retcode);
+ }
+ }
+
+ yield {
+
+ auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe, source_bucket_perms);
+
#warning FIXME: race guard
call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
- uid,
+ param_user,
sync_pipe.info.source_bs.bucket,
std::nullopt, sync_pipe.dest_bucket_info,
- key, std::nullopt, versioned_epoch,
+ key, dest_key, versioned_epoch,
true,
std::static_pointer_cast<RGWFetchObjFilter>(filter),
zones_trace, sync_env->counters, sync_env->dpp));
{
auto sync_env = sc->env;
- auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
-
- return new RGWObjFetchCR(sc, sync_pipe, key, versioned_epoch, zones_trace);
+ return new RGWObjFetchCR(sc, sync_pipe, key, std::nullopt, versioned_epoch, zones_trace);
}
RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
}
}
- auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
-
- return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, nullopt,
- sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
- key, dest_key, versioned_epoch,
- true,
- std::static_pointer_cast<RGWFetchObjFilter>(filter),
- zones_trace, nullptr, sync_env->dpp);
+ return new RGWObjFetchCR(sc, sync_pipe, key, dest_key, versioned_epoch, zones_trace);
}
RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
}
};
+static bool ignore_sync_error(int err) {
+ switch (err) {
+ case -ENOENT:
+ case -EPERM:
+ return true;
+ default:
+ break;
+ }
+ return false;
+}
+
template <class T, class K>
class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
set_status() << "failed to sync obj; retcode=" << retcode;
tn->log(0, SSTR("ERROR: failed to sync object: "
<< bucket_shard_str{bs} << "/" << key.name));
- error_ss << bucket_shard_str{bs} << "/" << key.name;
- sync_status = retcode;
+ if (!ignore_sync_error(retcode)) {
+ error_ss << bucket_shard_str{bs} << "/" << key.name;
+ sync_status = retcode;
+ }
}
if (!error_ss.str().empty()) {
yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
std::optional<rgw_bucket> source_bucket;
rgw_sync_pipe_info_set *pipes;
- map<rgw_bucket, RGWBucketInfo> buckets_info;
- map<rgw_bucket, RGWBucketInfo>::iterator siiter;
- std::optional<RGWBucketInfo> target_bucket_info;
- std::optional<RGWBucketInfo> source_bucket_info;
+ map<rgw_bucket, all_bucket_info> buckets_info;
+ map<rgw_bucket, all_bucket_info>::iterator siiter;
+ std::optional<all_bucket_info> target_bucket_info;
+ std::optional<all_bucket_info> source_bucket_info;
rgw_sync_pipe_info_set::iterator siter;
RGWDataSyncEnv *sync_env;
rgw_bucket bucket;
RGWBucketInfo *pbucket_info;
+ map<string, bufferlist> *pattrs;
RGWMetaSyncEnv meta_sync_env;
RGWSyncTraceNodeRef tn;
RGWSyncGetBucketInfoCR(RGWDataSyncEnv *_sync_env,
const rgw_bucket& _bucket,
RGWBucketInfo *_pbucket_info,
+ map<string, bufferlist> *_pattrs,
const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
bucket(_bucket),
pbucket_info(_pbucket_info),
+ pattrs(_pattrs),
tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_info",
SSTR(bucket))) {
}
int RGWSyncGetBucketInfoCR::operate()
{
reenter(this) {
- yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info));
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info, pattrs));
if (retcode == -ENOENT) {
/* bucket instance info has not been synced in yet, fetch it now */
yield {
return set_cr_error(retcode);
}
- yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info));
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info, pattrs));
}
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bucket}));
for (siter = pipes->begin(); siter != pipes->end(); ++siter) {
if (!siter->source.has_bucket_info()) {
- buckets_info.emplace(siter->source.get_bucket(), RGWBucketInfo());
+ buckets_info.emplace(siter->source.get_bucket(), all_bucket_info());
}
if (!siter->target.has_bucket_info()) {
- buckets_info.emplace(siter->target.get_bucket(), RGWBucketInfo());
+ buckets_info.emplace(siter->target.get_bucket(), all_bucket_info());
}
}
}
for (siter = pipes->begin(); siter != pipes->end(); ++siter) {
if (!siter->source.has_bucket_info()) {
- buckets_info.emplace(siter->source.get_bucket(), RGWBucketInfo());
+ buckets_info.emplace(siter->source.get_bucket(), all_bucket_info());
}
if (!siter->target.has_bucket_info()) {
- buckets_info.emplace(siter->target.get_bucket(), RGWBucketInfo());
+ buckets_info.emplace(siter->target.get_bucket(), all_bucket_info());
}
}
}
return set_cr_error(retcode);
}
- source_bucket_info = source_policy->policy_handler->get_bucket_info();
+ if (source_policy->policy_handler) {
+ auto& opt_bucket_info = source_policy->policy_handler->get_bucket_info();
+ auto& opt_attrs = source_policy->policy_handler->get_bucket_attrs();
+ if (opt_bucket_info && opt_attrs) {
+ source_bucket_info.emplace();
+ source_bucket_info->bucket_info = *opt_bucket_info;
+ source_bucket_info->attrs = *opt_attrs;
+ }
+ }
if (!target_bucket) {
get_hint_targets_action = make_shared<GetHintTargets>(sync_env, *source_bucket);
update_from_source_bucket_policy();
for (siiter = buckets_info.begin(); siiter != buckets_info.end(); ++siiter) {
- if (siiter->second.bucket.name.empty()) {
- yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn));
+ if (siiter->second.bucket_info.bucket.name.empty()) {
+ yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first,
+ &siiter->second.bucket_info,
+ &siiter->second.attrs,
+ tn));
}
}
tn->log(20, SSTR("sync status for source bucket: " << sync_status.state));
- yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info, tn));
+ yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info,
+ &sync_pipe.source_bucket_attrs, tn));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
lease_cr->go_down();
return set_cr_error(retcode);
}
- yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, tn));
+ yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info,
+ &sync_pipe.dest_bucket_attrs, tn));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
lease_cr->go_down();