From: Yehuda Sadeh Date: Thu, 28 Nov 2019 01:14:28 +0000 (-0800) Subject: rgw: implement s3 put-bucket-replication api X-Git-Tag: v15.1.0~22^2~33 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=d2c70d9297d829718bfc75e4be6708c08a5665eb;p=ceph-ci.git rgw: implement s3 put-bucket-replication api Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 6ad291218a3..08860fcde06 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -397,5 +397,9 @@ public: bool bucket_exports_data() const; bool bucket_imports_data() const; + + const rgw_sync_policy_info& get_sync_policy() const { + return sync_policy; + } }; diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 497af6bc8ae..9174414340f 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -531,6 +531,9 @@ enum RGWOpType { RGW_OP_GET_BUCKET_TAGGING, RGW_OP_PUT_BUCKET_TAGGING, RGW_OP_DELETE_BUCKET_TAGGING, + RGW_OP_GET_BUCKET_REPLICATION, + RGW_OP_PUT_BUCKET_REPLICATION, + RGW_OP_DELETE_BUCKET_REPLICATION, }; class RGWAccessControlPolicy; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index f652448c86b..cbaf67d60ad 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1249,6 +1249,40 @@ void RGWDeleteBucketTags::execute() }); } +int RGWPutBucketReplication::verify_permission() { + return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketTagging); +} + +void RGWPutBucketReplication::execute() { + + op_ret = get_params(); + if (op_ret < 0) + return; + + if (!store->svc()->zone->is_meta_master()) { + op_ret = forward_request_to_master(s, nullptr, store, in_data, nullptr); + if (op_ret < 0) { + ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl; + } + if (op_ret < 0) { + return; + } + } + + op_ret = retry_raced_bucket_write(store->getRados(), s, [this] { + auto sync_policy = (s->bucket_info.sync_policy ? *s->bucket_info.sync_policy : rgw_sync_policy_info()); + + for (auto& group : sync_policy_groups) { + sync_policy.groups[group.id] = group; + } + + s->bucket_info.set_sync_policy(std::move(sync_policy)); + + return store->getRados()->put_bucket_instance_info(s->bucket_info, false, real_time(), + &s->bucket_attrs); + }); +} + int RGWOp::do_aws4_auth_completion() { ldpp_dout(this, 5) << "NOTICE: call to do_aws4_auth_completion" << dendl; diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 06caad7651c..e5ed28ea0ab 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -466,6 +466,23 @@ public: RGWOpType get_type() override { return RGW_OP_DELETE_BUCKET_TAGGING;} }; +struct rgw_sync_policy_group; + +class RGWPutBucketReplication : public RGWOp { +protected: + bufferlist in_data; + std::vector sync_policy_groups; +public: + int verify_permission() override; + void execute() override; + + virtual void send_response() override = 0; + virtual int get_params() = 0; + const char* name() const override { return "put_bucket_replication"; } + virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; } + RGWOpType get_type() override { return RGW_OP_PUT_BUCKET_REPLICATION; } +}; + class RGWBulkDelete : public RGWOp { public: struct acct_path_t { diff --git a/src/rgw/rgw_rest.h b/src/rgw/rgw_rest.h index f5dc175e134..c0aa96a86b8 100644 --- a/src/rgw/rgw_rest.h +++ b/src/rgw/rgw_rest.h @@ -194,6 +194,12 @@ public: virtual ~RGWPutBucketTags_ObjStore() = default; }; +class RGWPutBucketReplication_ObjStore: public RGWPutBucketReplication { +public: + RGWPutBucketReplication_ObjStore() = default; + virtual ~RGWPutBucketReplication_ObjStore() = default; +}; + class RGWListBuckets_ObjStore : public RGWListBuckets { public: RGWListBuckets_ObjStore() {} diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index b97ad466b49..be0d1b9cbea 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -10,6 +10,7 @@ #include "common/utf8.h" #include "common/ceph_json.h" #include "common/safe_io.h" +#include "common/errno.h" #include "auth/Crypto.h" #include #include @@ -592,6 +593,343 @@ void RGWPutBucketTags_ObjStore_S3::send_response() dump_start(s); } +namespace { + +bool is_valid_status(const string& s) { + return (s == "Enabled" || + s == "Disabled"); +} + +static string enabled_group_id = "s3-bucket-replication:enabled"; +static string disabled_group_id = "s3-bucket-replication:disabled"; + +struct ReplicationConfiguration { + string role; + + struct Rule { + struct DeleteMarkerReplication { + string status; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Status", status, obj); + } + + bool is_valid(CephContext *cct) const { + bool result = is_valid_status(status); + if (!result) { + ldout(cct, 5) << "NOTICE: bad status provided in DeleteMarkerReplication element (status=" << status << ")" << dendl; + } + return result; + } + }; + + struct Destination { + struct AccessControlTranslation { + string owner; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Owner", owner, obj); + } + }; + + std::optional acl_translation; + std::optional account; + string bucket; + std::optional storage_class; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("AccessControlTranslation", acl_translation, obj); + RGWXMLDecoder::decode_xml("Account", account, obj); + if (account && account->empty()) { + account.reset(); + } + RGWXMLDecoder::decode_xml("Bucket", bucket, obj); + RGWXMLDecoder::decode_xml("StorageClass", storage_class, obj); + if (storage_class && storage_class->empty()) { + storage_class.reset(); + } + } + }; + + struct Filter { + struct Tag { + string key; + string value; + + bool empty() const { + return key.empty() && value.empty(); + } + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Key", key, obj); + RGWXMLDecoder::decode_xml("Value", value, obj); + }; + }; + + struct AndElements { + std::optional prefix; + std::vector tags; + + bool empty() const { + return !prefix && + (tags.size() == 0); + } + + void decode_xml(XMLObj *obj) { + std::vector _tags; + RGWXMLDecoder::decode_xml("Prefix", prefix, obj); + if (prefix && prefix->empty()) { + prefix.reset(); + } + RGWXMLDecoder::decode_xml("Tag", _tags, obj); + for (auto& t : _tags) { + if (!t.empty()) { + tags.push_back(std::move(t)); + } + } + }; + }; + + std::optional prefix; + std::optional tag; + std::optional and_elements; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Prefix", prefix, obj); + if (prefix && prefix->empty()) { + prefix.reset(); + } + RGWXMLDecoder::decode_xml("Tag", tag, obj); + if (tag && tag->empty()) { + tag.reset(); + } + RGWXMLDecoder::decode_xml("And", and_elements, obj); + if (and_elements && and_elements->empty()) { + and_elements.reset(); + } + }; + + bool is_valid(CephContext *cct) const { + if (tag && prefix) { + ldout(cct, 5) << "NOTICE: both tag and prefix were provided in replication filter rule" << dendl; + return false; + } + + if (and_elements) { + if (prefix && and_elements->prefix) { + ldout(cct, 5) << "NOTICE: too many prefixes were provided in re" << dendl; + return false; + } + } + return true; + }; + + int to_sync_pipe_filter(CephContext *cct, + rgw_sync_pipe_filter *f) const { + if (!is_valid(cct)) { + return -EINVAL; + } + if (prefix) { + f->prefix = *prefix; + } + if (tag) { + f->tags.insert(rgw_sync_pipe_filter_tag(tag->key, tag->value)); + } + + if (and_elements) { + if (and_elements->prefix) { + f->prefix = *and_elements->prefix; + } + for (auto& t : and_elements->tags) { + f->tags.insert(rgw_sync_pipe_filter_tag(t.key, t.value)); + } + } + return 0; + } + }; + + std::optional delete_marker_replication; + Destination destination; + std::optional filter; + string id; + int32_t priority; + string status; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("DeleteMarkerReplication", delete_marker_replication, obj); + RGWXMLDecoder::decode_xml("Destination", destination, obj); + RGWXMLDecoder::decode_xml("ID", id, obj); + + std::optional prefix; + RGWXMLDecoder::decode_xml("Prefix", prefix, obj); + if (prefix) { + filter.emplace(); + filter->prefix = prefix; + } + + if (!filter) { + RGWXMLDecoder::decode_xml("Filter", filter, obj); + } else { + RGWXMLDecoder::decode_xml("Filter", *filter, obj); + } + + RGWXMLDecoder::decode_xml("Priority", priority, obj); + RGWXMLDecoder::decode_xml("Status", status, obj); + } + + bool is_valid(CephContext *cct) const { + if (!is_valid_status(status)) { + ldout(cct, 5) << "NOTICE: bad status provided in rule (status=" << status << ")" << dendl; + return false; + } + if ((filter && !filter->is_valid(cct)) || + (delete_marker_replication && !delete_marker_replication->is_valid(cct))) { + return false; + } + return true; + } + + int to_sync_policy_pipe(req_state *s, + rgw_sync_bucket_pipes *pipe, + bool *enabled) const { + if (!is_valid(s->cct)) { + return -EINVAL; + } + + pipe->id = id; + pipe->params.priority = priority; + + const auto& user_id = s->user->get_id(); + + rgw_bucket_key dest_bk(user_id.tenant, + destination.bucket); + + pipe->source.set_all_zones(true); /* + all zones that correspond with configured flow, + we can introduce extended api to select specific + zones + */ + pipe->dest.set_all_zones(true); + pipe->dest.bucket.emplace(dest_bk); + + if (filter) { + int r = filter->to_sync_pipe_filter(s->cct, &pipe->params.source.filter); + if (r < 0) { + return r; + } + } + if (destination.acl_translation) { + rgw_user u; + u.tenant = user_id.tenant; + u.from_str(destination.acl_translation->owner); /* explicit tenant will override tenant, + otherwise will inherit it from s->user */ + pipe->params.dest.acl_translation.emplace(); + pipe->params.dest.acl_translation->owner = u; + } + pipe->params.dest.storage_class = destination.storage_class; + + *enabled = (status == "Enabled"); + + pipe->params.mode = rgw_sync_pipe_params::Mode::MODE_USER; + pipe->params.user = user_id.to_str(); + + return 0; + } + }; + + std::vector rules; + + void decode_xml(XMLObj *obj) { + RGWXMLDecoder::decode_xml("Role", role, obj); + RGWXMLDecoder::decode_xml("Rule", rules, obj); + } + + int to_sync_policy_groups(req_state *s, + vector *result) const { + result->resize(2); + + rgw_sync_policy_group& enabled_group = (*result)[0]; + rgw_sync_policy_group& disabled_group = (*result)[1]; + + enabled_group.id = enabled_group_id; + enabled_group.status = rgw_sync_policy_group::Status::ENABLED; + disabled_group.id = disabled_group_id; + disabled_group.status = rgw_sync_policy_group::Status::ALLOWED; /* not enabled, not forbidden */ + + for (auto& rule : rules) { + rgw_sync_bucket_pipes pipe; + bool enabled; + int r = rule.to_sync_policy_pipe(s, &pipe, &enabled); + if (r < 0) { + ldout(s->cct, 5) << "NOTICE: failed to convert replication configuration into sync policy pipe (rule.id=" << rule.id << "): " << cpp_strerror(-r) << dendl; + return r; + } + + if (enabled) { + enabled_group.pipes.emplace_back(std::move(pipe)); + } else { + disabled_group.pipes.emplace_back(std::move(pipe)); + } + } + return 0; + } +}; + +} + +int RGWPutBucketReplication_ObjStore_S3::get_params() +{ + RGWXMLParser parser; + + if (!parser.init()){ + return -EINVAL; + } + + const auto max_size = s->cct->_conf->rgw_max_put_param_size; + int r = 0; + bufferlist data; + + std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false); + + if (r < 0) + return r; + + if (!parser.parse(data.c_str(), data.length(), 1)) { + return -ERR_MALFORMED_XML; + } + + ReplicationConfiguration conf; + try { + RGWXMLDecoder::decode_xml("ReplicationConfiguration", conf, &parser); + } catch (RGWXMLDecoder::err& err) { + + ldout(s->cct, 5) << "Malformed tagging request: " << err << dendl; + return -ERR_MALFORMED_XML; + } + + r = conf.to_sync_policy_groups(s, &sync_policy_groups); + if (r < 0) { + return r; + } + + // forward requests to meta master zone + if (!store->svc()->zone->is_meta_master()) { + /* only need to keep this data around if we're not meta master */ + in_data = std::move(data); + } + + return 0; +} + +void RGWPutBucketReplication_ObjStore_S3::send_response() +{ + if (op_ret) + set_req_state_err(s, op_ret); + dump_errno(s); + end_header(s, this, "application/xml"); + dump_start(s); +} + void RGWDeleteBucketTags_ObjStore_S3::send_response() { if (op_ret) @@ -3703,6 +4041,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_get() return new RGWGetBucketObjectLock_ObjStore_S3; } else if (is_notification_op()) { return RGWHandler_REST_PSNotifs_S3::create_get_op(); + } else if (is_replication_op()) { + return nullptr; // new RGWGetBucketReplication_ObjStore_S3; } return get_obj_op(true); } @@ -3746,6 +4086,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_put() return new RGWPutBucketObjectLock_ObjStore_S3; } else if (is_notification_op()) { return RGWHandler_REST_PSNotifs_S3::create_put_op(); + } else if (is_replication_op()) { + return new RGWPutBucketReplication_ObjStore_S3; } return new RGWCreateBucket_ObjStore_S3; } @@ -3762,6 +4104,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_delete() return new RGWDeleteBucketPolicy; } else if (is_notification_op()) { return RGWHandler_REST_PSNotifs_S3::create_delete_op(); + } else if (is_replication_op()) { + return nullptr; // new RGWDeleteBucketReplication_ObjStore_S3; } if (s->info.args.sub_resource_exists("website")) { @@ -4622,6 +4966,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s, case RGW_OP_PUT_BUCKET_POLICY: case RGW_OP_PUT_OBJ_TAGGING: case RGW_OP_PUT_BUCKET_TAGGING: + case RGW_OP_PUT_BUCKET_REPLICATION: case RGW_OP_PUT_LC: case RGW_OP_SET_REQUEST_PAYMENT: case RGW_OP_PUBSUB_NOTIF_CREATE: diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index ba49ab77950..104f08faafa 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -101,6 +101,13 @@ public: void send_response() override; }; +class RGWPutBucketReplication_ObjStore_S3 : public RGWPutBucketReplication_ObjStore +{ +public: + int get_params() override; + void send_response() override; +}; + class RGWListBuckets_ObjStore_S3 : public RGWListBuckets_ObjStore { public: RGWListBuckets_ObjStore_S3() {} @@ -652,6 +659,9 @@ protected: } return false; } + bool is_replication_op() const { + return s->info.args.exists("replication"); + } RGWOp *get_obj_op(bool get_data) const; diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 58e05d0f8a6..7b4d1059698 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -487,6 +487,13 @@ struct rgw_sync_bucket_entities { } static string bucket_key(std::optional b); + + void set_all_zones(bool state) { + all_zones = state; + if (all_zones) { + zones.reset(); + } + } }; WRITE_CLASS_ENCODER(rgw_sync_bucket_entities) diff --git a/src/rgw/rgw_xml.h b/src/rgw/rgw_xml.h index b73f17bcf06..7712572d6f8 100644 --- a/src/rgw/rgw_xml.h +++ b/src/rgw/rgw_xml.h @@ -192,6 +192,13 @@ void decode_xml_obj(bufferlist& val, XMLObj *obj); class utime_t; void decode_xml_obj(utime_t& val, XMLObj *obj); +template +void decode_xml_obj(std::optional& val, XMLObj *obj) +{ + val.emplace(); + decode_xml_obj(*val, obj); +} + template void do_decode_xml_obj(list& l, const string& name, XMLObj *obj) {