rgw_sync_policy_info *policy{nullptr};
+ std::optional<rgw_user> owner;
+
public:
SyncPolicyContext(const string& zonegroup_id,
const string& zonegroup_name,
return ret;
}
+ owner = bucket_info.owner;
+
if (!bucket_info.sync_policy) {
rgw_sync_policy_info new_policy;
bucket_info.set_sync_policy(std::move(new_policy));
return *policy;
}
+ std::optional<rgw_user>& get_owner() {
+ return owner;
+ }
};
void resolve_zone_id_opt(std::optional<string>& zone_name, std::optional<rgw_zone_id>& zone_id)
std::optional<string> opt_prefix_rm;
std::optional<int> opt_priority;
+ std::optional<string> opt_mode;
rgw::notify::EventTypeList event_types;
opt_prefix_rm = val;
} else if (ceph_argparse_witharg(args, i, &val, "--priority", (char*)NULL)) {
opt_priority = atoi(val.c_str());
+ } else if (ceph_argparse_witharg(args, i, &val, "--mode", (char*)NULL)) {
+ opt_mode = val;
} else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
// do nothing
} else if (strncmp(*i, "-", 1) == 0) {
if (opt_priority) {
pipe->params.priority = *opt_priority;
}
+ if (opt_mode) {
+ if (*opt_mode == "system") {
+ pipe->params.mode = rgw_sync_pipe_params::MODE_SYSTEM;
+ } else if (*opt_mode == "user") {
+ pipe->params.mode = rgw_sync_pipe_params::MODE_USER;
+ } else {
+ cerr << "ERROR: bad mode value: should be one of the following: system, user" << std::endl;
+ return EINVAL;
+ }
+ }
+
+ if (!user_id.empty()) {
+ pipe->params.user = user_id;
+ } else if (pipe->params.user.empty()) {
+ auto owner = sync_policy_ctx.get_owner();
+ if (owner) {
+ pipe->params.user = *owner;
+ }
+ }
ret = sync_policy_ctx.write_policy();
if (ret < 0) {
}
bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key,
+ std::optional<rgw_user> *user,
std::optional<rgw_user> *acl_translation_owner,
std::optional<string> *storage_class,
rgw_sync_pipe_params::Mode *mode,
bool conflict = false;
+ std::optional<rgw_user> _user;
std::optional<rgw_sync_pipe_acl_translation> _acl_translation;
std::optional<string> _storage_class;
rgw_sync_pipe_params::Mode _mode;
for (auto& iter : iters) {
auto& rule_params = iter->second->params;
if (++i == 0) {
+ _user = rule_params.user;
_acl_translation = rule_params.dest.acl_translation;
_storage_class = rule_params.dest.storage_class;
_mode = rule_params.mode;
continue;
}
- conflict = !(_acl_translation == rule_params.dest.acl_translation &&
+ conflict = !(_user == rule_params.user &&
+ _acl_translation == rule_params.dest.acl_translation &&
_storage_class == rule_params.dest.storage_class &&
_mode == rule_params.mode);
if (conflict) {
}
}
+ *user = _user;
if (_acl_translation) {
*acl_translation_owner = _acl_translation->owner;
}
bucket_info(_bucket_info) {
if (_bucket_info.sync_policy) {
sync_policy = *_bucket_info.sync_policy;
+
+ for (auto& entry : sync_policy.groups) {
+ for (auto& pipe : entry.second.pipes) {
+ if (pipe.params.mode == rgw_sync_pipe_params::MODE_USER &&
+ pipe.params.user.empty()) {
+ pipe.params.user = _bucket_info.owner;
+ }
+ }
+ }
}
bucket = _bucket_info.bucket;
zone_svc = parent->zone_svc;
void insert(const rgw_sync_bucket_pipe& pipe);
bool find_basic_info_without_tags(const rgw_obj_key& key,
+ std::optional<rgw_user> *user,
std::optional<rgw_user> *acl_translation,
std::optional<string> *storage_class,
rgw_sync_pipe_params::Mode *mode,
}
bool find_basic_info_without_tags(const rgw_obj_key& key,
+ std::optional<rgw_user> *user,
std::optional<rgw_user> *acl_translation,
std::optional<string> *storage_class,
rgw_sync_pipe_params::Mode *mode,
if (!rules) {
return false;
}
- return rules->find_basic_info_without_tags(key, acl_translation, storage_class, mode, need_more_info);
+ return rules->find_basic_info_without_tags(key, user, acl_translation, storage_class, mode, need_more_info);
}
bool find_obj_params(const rgw_obj_key& key,
{
RGWObjectCtx obj_ctx(store);
- string user_id;
char buf[16];
snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
map<string, bufferlist> attrs;
std::optional<uint64_t> bytes_transferred;
int r = store->getRados()->fetch_remote_obj(obj_ctx,
- rgw_user(user_id),
+ user_id.value_or(rgw_user()),
NULL, /* req_info */
source_zone,
dest_obj,
rgw::sal::RGWRadosStore *store;
rgw_zone_id source_zone;
+ std::optional<rgw_user> user_id;
+
rgw_bucket src_bucket;
std::optional<rgw_placement_rule> dest_placement_rule;
RGWBucketInfo dest_bucket_info;
public:
RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
const rgw_zone_id& _source_zone,
+ std::optional<rgw_user>& _user_id,
const rgw_bucket& _src_bucket,
std::optional<rgw_placement_rule> _dest_placement_rule,
const RGWBucketInfo& _dest_bucket_info,
PerfCounters* counters, const DoutPrefixProvider *dpp)
: RGWAsyncRadosRequest(caller, cn), store(_store),
source_zone(_source_zone),
+ user_id(_user_id),
src_bucket(_src_bucket),
dest_placement_rule(_dest_placement_rule),
dest_bucket_info(_dest_bucket_info),
rgw::sal::RGWRadosStore *store;
rgw_zone_id source_zone;
+ std::optional<rgw_user> user_id;
+
rgw_bucket src_bucket;
std::optional<rgw_placement_rule> dest_placement_rule;
RGWBucketInfo dest_bucket_info;
public:
RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
const rgw_zone_id& _source_zone,
+ std::optional<rgw_user> _user_id,
const rgw_bucket& _src_bucket,
std::optional<rgw_placement_rule> _dest_placement_rule,
const RGWBucketInfo& _dest_bucket_info,
: RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
async_rados(_async_rados), store(_store),
source_zone(_source_zone),
+ user_id(_user_id),
src_bucket(_src_bucket),
dest_placement_rule(_dest_placement_rule),
dest_bucket_info(_dest_bucket_info),
int send_request() override {
req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
- source_zone, src_bucket, dest_placement_rule, dest_bucket_info,
+ source_zone, user_id, src_bucket, dest_placement_rule, dest_bucket_info,
key, dest_key, versioned_epoch, copy_if_newer, filter,
zones_trace, counters, dpp);
async_rados->queue(req);
class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default {
rgw_bucket_sync_pipe sync_pipe;
+ std::optional<ceph::real_time> mtime;
+ std::optional<string> etag;
+ std::optional<uint64_t> obj_size;
+
public:
RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe) : sync_pipe(_sync_pipe) {
}
std::optional<std::map<string, bufferlist> > new_attrs;
if (params.mode == rgw_sync_pipe_params::MODE_USER) {
+#if 0
RGWAccessControlPolicy policy;
auto iter = obj_attrs.find(RGW_ATTR_ACL);
if (iter == obj_attrs.end()) {
ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode policy " << dendl;
return abort_err;
}
-
+#endif
}
rgw_zone_set *zones_trace;
bool need_more_info{false};
+ bool check_change{false};
ceph::real_time src_mtime;
uint64_t src_size;
map<string, bufferlist> src_attrs;
map<string, string> src_headers;
+ std::optional<rgw_user> param_user;
std::optional<rgw_user> param_acl_translation;
std::optional<string> param_storage_class;
rgw_sync_pipe_params::Mode param_mode;
{
if (!sync_pipe.info.handler.find_basic_info_without_tags(key,
+ ¶m_user,
¶m_acl_translation,
¶m_storage_class,
¶m_mode,
¶ms)) {
return set_cr_error(-ERR_PRECONDITION_FAILED);
}
+
+ param_user = params.user;
+ if (params.dest.acl_translation) {
+ param_acl_translation = params.dest.acl_translation->owner;
+ }
+ param_storage_class = params.dest.storage_class;
+ param_mode = params.mode;
}
yield {
auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
+ std::optional<rgw_user> uid;
+ if (param_mode == rgw_sync_pipe_params::MODE_USER) {
+ uid = param_user;
+ }
+#warning FIXME: race guard
call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+ uid,
sync_pipe.info.source_bs.bucket,
std::nullopt, sync_pipe.dest_bucket_info,
key, std::nullopt, versioned_epoch,
auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
- return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+ return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, nullopt,
sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
key, dest_key, versioned_epoch,
true,
s = "user";
}
encode_json("mode", s, f);
+ encode_json("user", user, f);
}
void rgw_sync_pipe_params::decode_json(JSONObj *obj)
} else {
mode = MODE_USER;
}
+ JSONDecoder::decode_json("user", user, obj);
}
MODE_USER = 1,
} mode{MODE_SYSTEM};
int32_t priority{0};
+ rgw_user user;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(source, bl);
encode(dest, bl);
encode(priority, bl);
encode((uint8_t)mode, bl);
+ encode(user, bl);
ENCODE_FINISH(bl);
}
uint8_t m;
decode(m, bl);
mode = (Mode)m;
+ if (struct_v >= 2) {
+ decode(user, bl);
+ }
DECODE_FINISH(bl);
}