]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: implement s3 put-bucket-replication api
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 28 Nov 2019 01:14:28 +0000 (17:14 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:39 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_common.h
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rest.h
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h
src/rgw/rgw_sync_policy.h
src/rgw/rgw_xml.h

index 6ad291218a30e7be3fe14e1a90100257ffd5b18a..08860fcde06633949dd4ca966b8f68de436488c5 100644 (file)
@@ -397,5 +397,9 @@ public:
 
   bool bucket_exports_data() const;
   bool bucket_imports_data() const;
+
+  const rgw_sync_policy_info& get_sync_policy() const {
+    return sync_policy;
+  }
 };
 
index 497af6bc8aea7d178cfb9b95d11a7faab70a93a1..9174414340f610be90c3c67ce99dfac94b92c11d 100644 (file)
@@ -531,6 +531,9 @@ enum RGWOpType {
   RGW_OP_GET_BUCKET_TAGGING,
   RGW_OP_PUT_BUCKET_TAGGING,
   RGW_OP_DELETE_BUCKET_TAGGING,
+  RGW_OP_GET_BUCKET_REPLICATION,
+  RGW_OP_PUT_BUCKET_REPLICATION,
+  RGW_OP_DELETE_BUCKET_REPLICATION,
 };
 
 class RGWAccessControlPolicy;
index f652448c86b5d2e89e202bf927ebb378bd7ecfd4..cbaf67d60adc053dd141a80d9e9cc464533b63ec 100644 (file)
@@ -1249,6 +1249,40 @@ void RGWDeleteBucketTags::execute()
   });
 }
 
+int RGWPutBucketReplication::verify_permission() {
+  return verify_bucket_owner_or_policy(s, rgw::IAM::s3PutBucketTagging);
+}
+
+void RGWPutBucketReplication::execute() {
+
+  op_ret = get_params();
+  if (op_ret < 0) 
+    return;
+
+  if (!store->svc()->zone->is_meta_master()) {
+    op_ret = forward_request_to_master(s, nullptr, store, in_data, nullptr);
+    if (op_ret < 0) {
+      ldpp_dout(this, 0) << "forward_request_to_master returned ret=" << op_ret << dendl;
+    }
+    if (op_ret < 0) {
+      return;
+    }
+  }
+
+  op_ret = retry_raced_bucket_write(store->getRados(), s, [this] {
+    auto sync_policy = (s->bucket_info.sync_policy ? *s->bucket_info.sync_policy : rgw_sync_policy_info());
+
+    for (auto& group : sync_policy_groups) {
+      sync_policy.groups[group.id] = group;
+    }
+
+    s->bucket_info.set_sync_policy(std::move(sync_policy));
+
+    return store->getRados()->put_bucket_instance_info(s->bucket_info, false, real_time(),
+                                                       &s->bucket_attrs);
+  });
+}
+
 int RGWOp::do_aws4_auth_completion()
 {
   ldpp_dout(this, 5) << "NOTICE: call to do_aws4_auth_completion"  << dendl;
index 06caad7651c09de2f5675968bc0cef15daf83610..e5ed28ea0ab2e3e5163ad604fcc00a8583ab2eac 100644 (file)
@@ -466,6 +466,23 @@ public:
   RGWOpType get_type() override { return RGW_OP_DELETE_BUCKET_TAGGING;}
 };
 
+struct rgw_sync_policy_group;
+
+class RGWPutBucketReplication : public RGWOp {
+protected:
+  bufferlist in_data;
+  std::vector<rgw_sync_policy_group> sync_policy_groups;
+public:
+  int verify_permission() override;
+  void execute() override;
+
+  virtual void send_response() override = 0;
+  virtual int get_params() = 0;
+  const char* name() const override { return "put_bucket_replication"; }
+  virtual uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+  RGWOpType get_type() override { return RGW_OP_PUT_BUCKET_REPLICATION; }
+};
+
 class RGWBulkDelete : public RGWOp {
 public:
   struct acct_path_t {
index f5dc175e134b3a1bf7810abaeff4db492ace746f..c0aa96a86b86c2ef5e0f93305dae5973bd3bafe1 100644 (file)
@@ -194,6 +194,12 @@ public:
   virtual ~RGWPutBucketTags_ObjStore() = default; 
 };
 
+class RGWPutBucketReplication_ObjStore: public RGWPutBucketReplication {
+public:
+  RGWPutBucketReplication_ObjStore() = default;
+  virtual ~RGWPutBucketReplication_ObjStore() = default; 
+};
+
 class RGWListBuckets_ObjStore : public RGWListBuckets {
 public:
   RGWListBuckets_ObjStore() {}
index b97ad466b499a6145a8024047fe018f283b3eeaf..be0d1b9cbea2c23bb533bd3d3a965a4d935d318d 100644 (file)
@@ -10,6 +10,7 @@
 #include "common/utf8.h"
 #include "common/ceph_json.h"
 #include "common/safe_io.h"
+#include "common/errno.h"
 #include "auth/Crypto.h"
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/replace.hpp>
@@ -592,6 +593,343 @@ void RGWPutBucketTags_ObjStore_S3::send_response()
   dump_start(s);
 }
 
+namespace {
+
+bool is_valid_status(const string& s) {
+  return (s == "Enabled" ||
+          s == "Disabled");
+}
+
+static string enabled_group_id = "s3-bucket-replication:enabled";
+static string disabled_group_id = "s3-bucket-replication:disabled";
+
+struct ReplicationConfiguration {
+  string role;
+
+  struct Rule {
+    struct DeleteMarkerReplication {
+      string status;
+
+      void decode_xml(XMLObj *obj) {
+        RGWXMLDecoder::decode_xml("Status", status, obj);
+      }
+
+      bool is_valid(CephContext *cct) const {
+        bool result = is_valid_status(status);
+        if (!result) {
+          ldout(cct, 5) << "NOTICE: bad status provided in DeleteMarkerReplication element (status=" << status << ")" << dendl;
+        }
+        return result;
+      }
+    };
+
+    struct Destination {
+      struct AccessControlTranslation {
+        string owner;
+
+        void decode_xml(XMLObj *obj) {
+          RGWXMLDecoder::decode_xml("Owner", owner, obj);
+        }
+      };
+
+      std::optional<AccessControlTranslation> acl_translation;
+      std::optional<string> account;
+      string bucket;
+      std::optional<string> storage_class;
+
+      void decode_xml(XMLObj *obj) {
+        RGWXMLDecoder::decode_xml("AccessControlTranslation", acl_translation, obj);
+        RGWXMLDecoder::decode_xml("Account", account, obj);
+        if (account && account->empty()) {
+          account.reset();
+        }
+        RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+        RGWXMLDecoder::decode_xml("StorageClass", storage_class, obj);
+        if (storage_class && storage_class->empty()) {
+          storage_class.reset();
+        }
+      }
+    };
+
+    struct Filter {
+      struct Tag {
+        string key;
+        string value;
+
+        bool empty() const {
+          return key.empty() && value.empty();
+        }
+
+        void decode_xml(XMLObj *obj) {
+          RGWXMLDecoder::decode_xml("Key", key, obj);
+          RGWXMLDecoder::decode_xml("Value", value, obj);
+        };
+      };
+
+      struct AndElements {
+        std::optional<string> prefix;
+        std::vector<Tag> tags;
+
+        bool empty() const {
+          return !prefix &&
+            (tags.size() == 0);
+        }
+
+        void decode_xml(XMLObj *obj) {
+          std::vector<Tag> _tags;
+          RGWXMLDecoder::decode_xml("Prefix", prefix, obj);
+          if (prefix && prefix->empty()) {
+            prefix.reset();
+          }
+          RGWXMLDecoder::decode_xml("Tag", _tags, obj);
+          for (auto& t : _tags) {
+            if (!t.empty()) {
+              tags.push_back(std::move(t));
+            }
+          }
+        };
+      };
+
+      std::optional<string> prefix;
+      std::optional<Tag> tag;
+      std::optional<AndElements> and_elements;
+
+      void decode_xml(XMLObj *obj) {
+        RGWXMLDecoder::decode_xml("Prefix", prefix, obj);
+        if (prefix && prefix->empty()) {
+          prefix.reset();
+        }
+        RGWXMLDecoder::decode_xml("Tag", tag, obj);
+        if (tag && tag->empty()) {
+          tag.reset();
+        }
+        RGWXMLDecoder::decode_xml("And", and_elements, obj);
+        if (and_elements && and_elements->empty()) {
+          and_elements.reset();
+        }
+      };
+
+      bool is_valid(CephContext *cct) const {
+        if (tag && prefix) {
+          ldout(cct, 5) << "NOTICE: both tag and prefix were provided in replication filter rule" << dendl;
+          return false;
+        }
+
+        if (and_elements) {
+          if (prefix && and_elements->prefix) {
+            ldout(cct, 5) << "NOTICE: too many prefixes were provided in re" << dendl;
+            return false;
+          }
+        }
+        return true;
+      };
+
+      int to_sync_pipe_filter(CephContext *cct,
+                              rgw_sync_pipe_filter *f) const {
+        if (!is_valid(cct)) {
+          return -EINVAL;
+        }
+        if (prefix) {
+          f->prefix = *prefix;
+        }
+        if (tag) {
+          f->tags.insert(rgw_sync_pipe_filter_tag(tag->key, tag->value));
+        }
+
+        if (and_elements) {
+          if (and_elements->prefix) {
+            f->prefix = *and_elements->prefix;
+          }
+          for (auto& t : and_elements->tags) {
+            f->tags.insert(rgw_sync_pipe_filter_tag(t.key, t.value));
+          }
+        }
+        return 0;
+      }
+    };
+
+    std::optional<DeleteMarkerReplication> delete_marker_replication;
+    Destination destination;
+    std::optional<Filter> filter;
+    string id;
+    int32_t priority;
+    string status;
+
+    void decode_xml(XMLObj *obj) {
+      RGWXMLDecoder::decode_xml("DeleteMarkerReplication", delete_marker_replication, obj);
+      RGWXMLDecoder::decode_xml("Destination", destination, obj);
+      RGWXMLDecoder::decode_xml("ID", id, obj);
+
+      std::optional<string> prefix;
+      RGWXMLDecoder::decode_xml("Prefix", prefix, obj);
+      if (prefix) {
+        filter.emplace();
+        filter->prefix = prefix;
+      }
+
+      if (!filter) {
+        RGWXMLDecoder::decode_xml("Filter", filter, obj);
+      } else {
+        RGWXMLDecoder::decode_xml("Filter", *filter, obj);
+      }
+
+      RGWXMLDecoder::decode_xml("Priority", priority, obj);
+      RGWXMLDecoder::decode_xml("Status", status, obj);
+    }
+
+    bool is_valid(CephContext *cct) const {
+      if (!is_valid_status(status)) {
+        ldout(cct, 5) << "NOTICE: bad status provided in rule (status=" << status << ")" << dendl;
+        return false;
+      }
+      if ((filter && !filter->is_valid(cct)) ||
+          (delete_marker_replication && !delete_marker_replication->is_valid(cct))) {
+        return false;
+      }
+      return true;
+    }
+
+    int to_sync_policy_pipe(req_state *s,
+                            rgw_sync_bucket_pipes *pipe,
+                            bool *enabled) const {
+      if (!is_valid(s->cct)) {
+        return -EINVAL;
+      }
+
+      pipe->id = id;
+      pipe->params.priority = priority;
+
+      const auto& user_id = s->user->get_id();
+
+      rgw_bucket_key dest_bk(user_id.tenant,
+                             destination.bucket);
+
+      pipe->source.set_all_zones(true); /*
+                                           all zones that correspond with configured flow,
+                                           we can introduce extended api to select specific
+                                           zones
+                                         */
+      pipe->dest.set_all_zones(true);
+      pipe->dest.bucket.emplace(dest_bk);
+
+      if (filter) {
+        int r = filter->to_sync_pipe_filter(s->cct, &pipe->params.source.filter);
+        if (r < 0) {
+          return r;
+        }
+      }
+      if (destination.acl_translation) {
+        rgw_user u;
+        u.tenant = user_id.tenant;
+        u.from_str(destination.acl_translation->owner); /* explicit tenant will override tenant,
+                                                           otherwise will inherit it from s->user */
+        pipe->params.dest.acl_translation.emplace();
+        pipe->params.dest.acl_translation->owner = u;
+      }
+      pipe->params.dest.storage_class = destination.storage_class;
+
+      *enabled = (status == "Enabled");
+
+      pipe->params.mode = rgw_sync_pipe_params::Mode::MODE_USER;
+      pipe->params.user = user_id.to_str();
+
+      return 0;
+    }
+  };
+
+  std::vector<Rule> rules;
+
+  void decode_xml(XMLObj *obj) {
+    RGWXMLDecoder::decode_xml("Role", role, obj);
+    RGWXMLDecoder::decode_xml("Rule", rules, obj);
+  }
+
+  int to_sync_policy_groups(req_state *s,
+                            vector<rgw_sync_policy_group> *result) const {
+    result->resize(2);
+
+    rgw_sync_policy_group& enabled_group = (*result)[0];
+    rgw_sync_policy_group& disabled_group = (*result)[1];
+
+    enabled_group.id = enabled_group_id;
+    enabled_group.status = rgw_sync_policy_group::Status::ENABLED;
+    disabled_group.id = disabled_group_id;
+    disabled_group.status = rgw_sync_policy_group::Status::ALLOWED; /* not enabled, not forbidden */
+
+    for (auto& rule : rules) {
+      rgw_sync_bucket_pipes pipe;
+      bool enabled;
+      int r = rule.to_sync_policy_pipe(s, &pipe, &enabled);
+      if (r < 0) {
+        ldout(s->cct, 5) << "NOTICE: failed to convert replication configuration into sync policy pipe (rule.id=" << rule.id << "): " << cpp_strerror(-r) << dendl;
+        return r;
+      }
+
+      if (enabled) {
+        enabled_group.pipes.emplace_back(std::move(pipe));
+      } else {
+        disabled_group.pipes.emplace_back(std::move(pipe));
+      }
+    }
+    return 0;
+  }
+};
+
+}
+
+int RGWPutBucketReplication_ObjStore_S3::get_params()
+{
+  RGWXMLParser parser;
+
+  if (!parser.init()){
+    return -EINVAL;
+  }
+
+  const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+  int r = 0;
+  bufferlist data;
+
+  std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false);
+
+  if (r < 0)
+    return r;
+
+  if (!parser.parse(data.c_str(), data.length(), 1)) {
+    return -ERR_MALFORMED_XML;
+  }
+
+  ReplicationConfiguration conf;
+  try {
+    RGWXMLDecoder::decode_xml("ReplicationConfiguration", conf, &parser);
+  } catch (RGWXMLDecoder::err& err) {
+    
+    ldout(s->cct, 5) << "Malformed tagging request: " << err << dendl;
+    return -ERR_MALFORMED_XML;
+  }
+
+  r = conf.to_sync_policy_groups(s, &sync_policy_groups);
+  if (r < 0) {
+    return r;
+  }
+
+  // forward requests to meta master zone
+  if (!store->svc()->zone->is_meta_master()) {
+    /* only need to keep this data around if we're not meta master */
+    in_data = std::move(data);
+  }
+
+  return 0;
+}
+
+void RGWPutBucketReplication_ObjStore_S3::send_response()
+{
+  if (op_ret)
+    set_req_state_err(s, op_ret);
+  dump_errno(s);
+  end_header(s, this, "application/xml");
+  dump_start(s);
+}
+
 void RGWDeleteBucketTags_ObjStore_S3::send_response() 
 {
   if (op_ret)
@@ -3703,6 +4041,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_get()
     return new RGWGetBucketObjectLock_ObjStore_S3;
   } else if (is_notification_op()) {
     return RGWHandler_REST_PSNotifs_S3::create_get_op();
+  } else if (is_replication_op()) {
+    return nullptr; // new RGWGetBucketReplication_ObjStore_S3;
   }
   return get_obj_op(true);
 }
@@ -3746,6 +4086,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_put()
     return new RGWPutBucketObjectLock_ObjStore_S3;
   } else if (is_notification_op()) {
     return RGWHandler_REST_PSNotifs_S3::create_put_op();
+  } else if (is_replication_op()) {
+    return new RGWPutBucketReplication_ObjStore_S3;
   }
   return new RGWCreateBucket_ObjStore_S3;
 }
@@ -3762,6 +4104,8 @@ RGWOp *RGWHandler_REST_Bucket_S3::op_delete()
     return new RGWDeleteBucketPolicy;
   } else if (is_notification_op()) {
     return RGWHandler_REST_PSNotifs_S3::create_delete_op();
+  } else if (is_replication_op()) {
+    return nullptr; // new RGWDeleteBucketReplication_ObjStore_S3;
   }
 
   if (s->info.args.sub_resource_exists("website")) {
@@ -4622,6 +4966,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
         case RGW_OP_PUT_BUCKET_POLICY:
         case RGW_OP_PUT_OBJ_TAGGING:
        case RGW_OP_PUT_BUCKET_TAGGING:
+       case RGW_OP_PUT_BUCKET_REPLICATION:
         case RGW_OP_PUT_LC:
         case RGW_OP_SET_REQUEST_PAYMENT:
         case RGW_OP_PUBSUB_NOTIF_CREATE:
index ba49ab779509ae3b47591ec97d0d5723153c5499..104f08faafa792845312ab0ed6bec1804ce3f883 100644 (file)
@@ -101,6 +101,13 @@ public:
   void send_response() override;
 };
 
+class RGWPutBucketReplication_ObjStore_S3 : public RGWPutBucketReplication_ObjStore
+{
+public:
+  int get_params() override;
+  void send_response() override;
+};
+
 class RGWListBuckets_ObjStore_S3 : public RGWListBuckets_ObjStore {
 public:
   RGWListBuckets_ObjStore_S3() {}
@@ -652,6 +659,9 @@ protected:
     }
     return false;
   }
+  bool is_replication_op() const {
+    return s->info.args.exists("replication");
+  }
 
   RGWOp *get_obj_op(bool get_data) const;
 
index 58e05d0f8a6b73e5143f782b4d15d9a7af6e4461..7b4d10596985ae63ca1ff0f47b5b5c2f57687032 100644 (file)
@@ -487,6 +487,13 @@ struct rgw_sync_bucket_entities {
   }
 
   static string bucket_key(std::optional<rgw_bucket> b);
+
+  void set_all_zones(bool state) {
+    all_zones = state;
+    if (all_zones) {
+      zones.reset();
+    }
+  }
 };
 WRITE_CLASS_ENCODER(rgw_sync_bucket_entities)
 
index b73f17bcf066614e65c0e7b6315dda3ef6a0895f..7712572d6f8af5a6aa1d7fa1090ff277f447c3c3 100644 (file)
@@ -192,6 +192,13 @@ void decode_xml_obj(bufferlist& val, XMLObj *obj);
 class utime_t;
 void decode_xml_obj(utime_t& val, XMLObj *obj);
 
+template<class T>
+void decode_xml_obj(std::optional<T>& val, XMLObj *obj)
+{
+  val.emplace();
+  decode_xml_obj(*val, obj);
+}
+
 template<class T>
 void do_decode_xml_obj(list<T>& l, const string& name, XMLObj *obj)
 {