#include "common/utf8.h"
#include "common/ceph_json.h"
#include "common/safe_io.h"
+#include "common/errno.h"
#include "auth/Crypto.h"
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
dump_start(s);
}
+namespace {
+
+bool is_valid_status(const string& s) {
+ return (s == "Enabled" ||
+ s == "Disabled");
+}
+
+static string enabled_group_id = "s3-bucket-replication:enabled";
+static string disabled_group_id = "s3-bucket-replication:disabled";
+
+struct ReplicationConfiguration {
+ string role;
+
+ struct Rule {
+ struct DeleteMarkerReplication {
+ string status;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Status", status, obj);
+ }
+
+ bool is_valid(CephContext *cct) const {
+ bool result = is_valid_status(status);
+ if (!result) {
+ ldout(cct, 5) << "NOTICE: bad status provided in DeleteMarkerReplication element (status=" << status << ")" << dendl;
+ }
+ return result;
+ }
+ };
+
+ struct Destination {
+ struct AccessControlTranslation {
+ string owner;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Owner", owner, obj);
+ }
+ };
+
+ std::optional<AccessControlTranslation> acl_translation;
+ std::optional<string> account;
+ string bucket;
+ std::optional<string> storage_class;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("AccessControlTranslation", acl_translation, obj);
+ RGWXMLDecoder::decode_xml("Account", account, obj);
+ if (account && account->empty()) {
+ account.reset();
+ }
+ RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+ RGWXMLDecoder::decode_xml("StorageClass", storage_class, obj);
+ if (storage_class && storage_class->empty()) {
+ storage_class.reset();
+ }
+ }
+ };
+
+ struct Filter {
+ struct Tag {
+ string key;
+ string value;
+
+ bool empty() const {
+ return key.empty() && value.empty();
+ }
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Key", key, obj);
+ RGWXMLDecoder::decode_xml("Value", value, obj);
+ };
+ };
+
+ struct AndElements {
+ std::optional<string> prefix;
+ std::vector<Tag> tags;
+
+ bool empty() const {
+ return !prefix &&
+ (tags.size() == 0);
+ }
+
+ void decode_xml(XMLObj *obj) {
+ std::vector<Tag> _tags;
+ RGWXMLDecoder::decode_xml("Prefix", prefix, obj);
+ if (prefix && prefix->empty()) {
+ prefix.reset();
+ }
+ RGWXMLDecoder::decode_xml("Tag", _tags, obj);
+ for (auto& t : _tags) {
+ if (!t.empty()) {
+ tags.push_back(std::move(t));
+ }
+ }
+ };
+ };
+
+ std::optional<string> prefix;
+ std::optional<Tag> tag;
+ std::optional<AndElements> and_elements;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Prefix", prefix, obj);
+ if (prefix && prefix->empty()) {
+ prefix.reset();
+ }
+ RGWXMLDecoder::decode_xml("Tag", tag, obj);
+ if (tag && tag->empty()) {
+ tag.reset();
+ }
+ RGWXMLDecoder::decode_xml("And", and_elements, obj);
+ if (and_elements && and_elements->empty()) {
+ and_elements.reset();
+ }
+ };
+
+ bool is_valid(CephContext *cct) const {
+ if (tag && prefix) {
+ ldout(cct, 5) << "NOTICE: both tag and prefix were provided in replication filter rule" << dendl;
+ return false;
+ }
+
+ if (and_elements) {
+ if (prefix && and_elements->prefix) {
+ ldout(cct, 5) << "NOTICE: too many prefixes were provided in re" << dendl;
+ return false;
+ }
+ }
+ return true;
+ };
+
+ int to_sync_pipe_filter(CephContext *cct,
+ rgw_sync_pipe_filter *f) const {
+ if (!is_valid(cct)) {
+ return -EINVAL;
+ }
+ if (prefix) {
+ f->prefix = *prefix;
+ }
+ if (tag) {
+ f->tags.insert(rgw_sync_pipe_filter_tag(tag->key, tag->value));
+ }
+
+ if (and_elements) {
+ if (and_elements->prefix) {
+ f->prefix = *and_elements->prefix;
+ }
+ for (auto& t : and_elements->tags) {
+ f->tags.insert(rgw_sync_pipe_filter_tag(t.key, t.value));
+ }
+ }
+ return 0;
+ }
+ };
+
+ std::optional<DeleteMarkerReplication> delete_marker_replication;
+ Destination destination;
+ std::optional<Filter> filter;
+ string id;
+ int32_t priority;
+ string status;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("DeleteMarkerReplication", delete_marker_replication, obj);
+ RGWXMLDecoder::decode_xml("Destination", destination, obj);
+ RGWXMLDecoder::decode_xml("ID", id, obj);
+
+ std::optional<string> prefix;
+ RGWXMLDecoder::decode_xml("Prefix", prefix, obj);
+ if (prefix) {
+ filter.emplace();
+ filter->prefix = prefix;
+ }
+
+ if (!filter) {
+ RGWXMLDecoder::decode_xml("Filter", filter, obj);
+ } else {
+ RGWXMLDecoder::decode_xml("Filter", *filter, obj);
+ }
+
+ RGWXMLDecoder::decode_xml("Priority", priority, obj);
+ RGWXMLDecoder::decode_xml("Status", status, obj);
+ }
+
+ bool is_valid(CephContext *cct) const {
+ if (!is_valid_status(status)) {
+ ldout(cct, 5) << "NOTICE: bad status provided in rule (status=" << status << ")" << dendl;
+ return false;
+ }
+ if ((filter && !filter->is_valid(cct)) ||
+ (delete_marker_replication && !delete_marker_replication->is_valid(cct))) {
+ return false;
+ }
+ return true;
+ }
+
+ int to_sync_policy_pipe(req_state *s,
+ rgw_sync_bucket_pipes *pipe,
+ bool *enabled) const {
+ if (!is_valid(s->cct)) {
+ return -EINVAL;
+ }
+
+ pipe->id = id;
+ pipe->params.priority = priority;
+
+ const auto& user_id = s->user->get_id();
+
+ rgw_bucket_key dest_bk(user_id.tenant,
+ destination.bucket);
+
+ pipe->source.set_all_zones(true); /*
+ all zones that correspond with configured flow,
+ we can introduce extended api to select specific
+ zones
+ */
+ pipe->dest.set_all_zones(true);
+ pipe->dest.bucket.emplace(dest_bk);
+
+ if (filter) {
+ int r = filter->to_sync_pipe_filter(s->cct, &pipe->params.source.filter);
+ if (r < 0) {
+ return r;
+ }
+ }
+ if (destination.acl_translation) {
+ rgw_user u;
+ u.tenant = user_id.tenant;
+ u.from_str(destination.acl_translation->owner); /* explicit tenant will override tenant,
+ otherwise will inherit it from s->user */
+ pipe->params.dest.acl_translation.emplace();
+ pipe->params.dest.acl_translation->owner = u;
+ }
+ pipe->params.dest.storage_class = destination.storage_class;
+
+ *enabled = (status == "Enabled");
+
+ pipe->params.mode = rgw_sync_pipe_params::Mode::MODE_USER;
+ pipe->params.user = user_id.to_str();
+
+ return 0;
+ }
+ };
+
+ std::vector<Rule> rules;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Role", role, obj);
+ RGWXMLDecoder::decode_xml("Rule", rules, obj);
+ }
+
+ int to_sync_policy_groups(req_state *s,
+ vector<rgw_sync_policy_group> *result) const {
+ result->resize(2);
+
+ rgw_sync_policy_group& enabled_group = (*result)[0];
+ rgw_sync_policy_group& disabled_group = (*result)[1];
+
+ enabled_group.id = enabled_group_id;
+ enabled_group.status = rgw_sync_policy_group::Status::ENABLED;
+ disabled_group.id = disabled_group_id;
+ disabled_group.status = rgw_sync_policy_group::Status::ALLOWED; /* not enabled, not forbidden */
+
+ for (auto& rule : rules) {
+ rgw_sync_bucket_pipes pipe;
+ bool enabled;
+ int r = rule.to_sync_policy_pipe(s, &pipe, &enabled);
+ if (r < 0) {
+ ldout(s->cct, 5) << "NOTICE: failed to convert replication configuration into sync policy pipe (rule.id=" << rule.id << "): " << cpp_strerror(-r) << dendl;
+ return r;
+ }
+
+ if (enabled) {
+ enabled_group.pipes.emplace_back(std::move(pipe));
+ } else {
+ disabled_group.pipes.emplace_back(std::move(pipe));
+ }
+ }
+ return 0;
+ }
+};
+
+}
+
+int RGWPutBucketReplication_ObjStore_S3::get_params()
+{
+ RGWXMLParser parser;
+
+ if (!parser.init()){
+ return -EINVAL;
+ }
+
+ const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+ int r = 0;
+ bufferlist data;
+
+ std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
+
+ if (r < 0)
+ return r;
+
+ if (!parser.parse(data.c_str(), data.length(), 1)) {
+ return -ERR_MALFORMED_XML;
+ }
+
+ ReplicationConfiguration conf;
+ try {
+ RGWXMLDecoder::decode_xml("ReplicationConfiguration", conf, &parser);
+ } catch (RGWXMLDecoder::err& err) {
+
+ ldout(s->cct, 5) << "Malformed tagging request: " << err << dendl;
+ return -ERR_MALFORMED_XML;
+ }
+
+ r = conf.to_sync_policy_groups(s, &sync_policy_groups);
+ if (r < 0) {
+ return r;
+ }
+
+ // forward requests to meta master zone
+ if (!store->svc()->zone->is_meta_master()) {
+ /* only need to keep this data around if we're not meta master */
+ in_data = std::move(data);
+ }
+
+ return 0;
+}
+
+void RGWPutBucketReplication_ObjStore_S3::send_response()
+{
+ if (op_ret)
+ set_req_state_err(s, op_ret);
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+ dump_start(s);
+}
+
void RGWDeleteBucketTags_ObjStore_S3::send_response()
{
if (op_ret)
return new RGWGetBucketObjectLock_ObjStore_S3;
} else if (is_notification_op()) {
return RGWHandler_REST_PSNotifs_S3::create_get_op();
+ } else if (is_replication_op()) {
+ return nullptr; // new RGWGetBucketReplication_ObjStore_S3;
}
return get_obj_op(true);
}
return new RGWPutBucketObjectLock_ObjStore_S3;
} else if (is_notification_op()) {
return RGWHandler_REST_PSNotifs_S3::create_put_op();
+ } else if (is_replication_op()) {
+ return new RGWPutBucketReplication_ObjStore_S3;
}
return new RGWCreateBucket_ObjStore_S3;
}
return new RGWDeleteBucketPolicy;
} else if (is_notification_op()) {
return RGWHandler_REST_PSNotifs_S3::create_delete_op();
+ } else if (is_replication_op()) {
+ return nullptr; // new RGWDeleteBucketReplication_ObjStore_S3;
}
if (s->info.args.sub_resource_exists("website")) {
case RGW_OP_PUT_BUCKET_POLICY:
case RGW_OP_PUT_OBJ_TAGGING:
case RGW_OP_PUT_BUCKET_TAGGING:
+ case RGW_OP_PUT_BUCKET_REPLICATION:
case RGW_OP_PUT_LC:
case RGW_OP_SET_REQUEST_PAYMENT:
case RGW_OP_PUBSUB_NOTIF_CREATE: