]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/pubsub: remove unnecessary REST abstraction layer
authorYuval Lifshitz <ylifshit@redhat.com>
Mon, 21 Nov 2022 18:04:53 +0000 (20:04 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Wed, 30 Nov 2022 18:55:01 +0000 (18:55 +0000)
this was used for an already deprecated non s3 API for pubsub
also, some texts changes in docs and error messages

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
doc/radosgw/notifications.rst
doc/radosgw/s3-notification-compatibility.rst
src/rgw/CMakeLists.txt
src/rgw/store/rados/rgw_rest_pubsub.cc
src/rgw/store/rados/rgw_rest_pubsub_common.cc [deleted file]
src/rgw/store/rados/rgw_rest_pubsub_common.h [deleted file]

index e4b9766f1103bf706cec3515c8f6a4678e0d9261..94d3a25312f7fa867edb6100ce88c094e972428c 100644 (file)
@@ -128,7 +128,7 @@ Create a Topic
 
 This creates a new topic. Provide the topic with push endpoint parameters,
 which will be used later when a notification is created. A response is
-generated. A successful response includes the the topic's `ARN
+generated. A successful response includes the topic's `ARN
 <https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html>`_
 (the "Amazon Resource Name", a unique identifier used to reference the topic).
 To update a topic, use the same command that you used to create it (but when
@@ -209,7 +209,7 @@ Request parameters:
  - ``use-ssl``: If this is set to "true", a secure connection is used to
    connect to the broker. (This is "false" by default.)
  - ``ca-location``: If this is provided and a secure connection is used, the
-   specified CA will be used insted of the default CA to authenticate the
+   specified CA will be used instead of the default CA to authenticate the
    broker. 
  - user/password: This must be provided only over HTTPS. Topic creation
    requests will otherwise be rejected.
index 955570ee1dfad78d7d3ea971a0c5b07552218f34..b142e01c380b41e8b7a371911ba7ccb0a1927a9a 100644 (file)
@@ -2,7 +2,7 @@
 S3 Bucket Notifications Compatibility
 =====================================
 
-Ceph's `Bucket Notifications`_ API follow `AWS S3 Bucket Notifications API`_. However, some differences exist, as listed below.
+Ceph's `Bucket Notifications`_ API follows `AWS S3 Bucket Notifications API`_. However, some differences exist, as listed below.
 
 
 .. note:: 
@@ -59,7 +59,7 @@ Ceph's bucket notification API has the following extensions:
 Unsupported Fields in the Event Record
 --------------------------------------
 
-The records sent for bucket notification follow format described in: `Event Message Structure`_.
+The records sent for bucket notification follows the format described in: `Event Message Structure`_.
 However, the ``requestParameters.sourceIPAddress`` field will be sent empty.
 
 
index cc68a9cef457fba295238a99079662d67a7f7522..7b6ac917f854565b040ff0737f7b12819cb8c8c2 100644 (file)
@@ -163,7 +163,6 @@ set(librgw_common_srcs
   store/rados/rgw_otp.cc
   store/rados/rgw_period.cc
   store/rados/rgw_rest_pubsub.cc
-  store/rados/rgw_rest_pubsub_common.cc
   store/rados/rgw_rest_realm.cc
   store/rados/rgw_rest_user.cc
   store/rados/rgw_sal_rados.cc
index 5b216330bd415cf567f2762558ad415c4530b02e..21f48b7e55f78d20945fdacef951ce01fb6b1b52 100644 (file)
@@ -4,7 +4,6 @@
 #include <algorithm>
 #include <boost/tokenizer.hpp>
 #include <optional>
-#include "rgw_rest_pubsub_common.h"
 #include "rgw_rest_pubsub.h"
 #include "rgw_pubsub_push.h"
 #include "rgw_pubsub.h"
 #include "rgw_notify.h"
 #include "rgw_sal_rados.h"
 #include "services/svc_zone.h"
+#include "common/dout.h"
+#include "rgw_url.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
-using namespace std;
-
 static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
 
+bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
+  const auto is_secure = rgw_transport_is_secure(cct, env);
+  if (!is_secure && g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
+    ldout(cct, 0) << "WARNING: bypassing endpoint validation, allows sending secrets over insecure transport" << dendl;
+    return true;
+  }
+  return is_secure;
+}
+
+// make sure that endpoint is a valid URL
+// make sure that if user/password are passed inside URL, it is over secure connection
+// update rgw_pubsub_sub_dest to indicate that a password is stored in the URL
+bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) {
+  if (dest.push_endpoint.empty()) {
+      return true;
+  }
+  std::string user;
+  std::string password;
+  if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
+    ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
+    return false;
+  }
+  // this should be verified inside parse_url()
+  ceph_assert(user.empty() == password.empty());
+  if (!user.empty()) {
+      dest.stored_secret = true;
+      if (!verify_transport_security(cct, env)) {
+        ldout(cct, 1) << "endpoint validation error: sending secrets over insecure transport" << dendl;
+        return false;
+      }
+  }
+  return true;
+}
+
+bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
+    return topic.topic.dest.stored_secret;
+}
+
+bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
+    for (const auto& topic : topics.topics) {
+        if (topic_has_endpoint_secret(topic.second)) return true;
+    }
+    return false;
+}
+
 // command (AWS compliant): 
 // POST
 // Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
-class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp {
-public:
-  int get_params() override {
+class RGWPSCreateTopicOp : public RGWOp {
+  private:
+  std::optional<RGWPubSub> ps;
+  std::string topic_name;
+  rgw_pubsub_sub_dest dest;
+  std::string topic_arn;
+  std::string opaque_data;
+  
+  int get_params() {
     topic_name = s->info.args.get("Name");
     if (topic_name.empty()) {
       ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
@@ -73,6 +123,20 @@ public:
     return 0;
   }
 
+  public:
+  int verify_permission(optional_yield) override {
+    return 0;
+  }
+
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute(optional_yield) override;
+
+  const char* name() const override { return "pubsub_topic_create"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -97,11 +161,42 @@ public:
   }
 };
 
+void RGWPSCreateTopicOp::execute(optional_yield y) {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+
+  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+  op_ret = ps->create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  ldpp_dout(this, 20) << "successfully created topic '" << topic_name << "'" << dendl;
+}
+
 // command (AWS compliant): 
 // POST 
 // Action=ListTopics
-class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp {
+class RGWPSListTopicsOp : public RGWOp {
+private:
+  std::optional<RGWPubSub> ps;
+  rgw_pubsub_topics result;
+
 public:
+  int verify_permission(optional_yield) override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute(optional_yield) override;
+
+  const char* name() const override { return "pubsub_topics_list"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -126,12 +221,33 @@ public:
   }
 };
 
+void RGWPSListTopicsOp::execute(optional_yield y) {
+  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+  op_ret = ps->get_topics(&result);
+  // if there are no topics it is not considered an error
+  op_ret = op_ret == -ENOENT ? 0 : op_ret;
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
+    return;
+  }
+  if (topics_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+    ldpp_dout(this, 1) << "topics contain secrets and cannot be sent over insecure transport" << dendl;
+    op_ret = -EPERM;
+    return;
+  }
+  ldpp_dout(this, 20) << "successfully got topics" << dendl;
+}
+
 // command (extension to AWS): 
 // POST
 // Action=GetTopic&TopicArn=<topic-arn>
-class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp {
-public:
-  int get_params() override {
+class RGWPSGetTopicOp : public RGWOp {
+  private:
+  std::string topic_name;
+  std::optional<RGWPubSub> ps;
+  rgw_pubsub_topic_subs result;
+  
+  int get_params() {
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
 
     if (!topic_arn || topic_arn->resource.empty()) {
@@ -143,6 +259,19 @@ public:
     return 0;
   }
 
+  public:
+  int verify_permission(optional_yield y) override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute(optional_yield y) override;
+
+  const char* name() const override { return "pubsub_topic_get"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -167,12 +296,35 @@ public:
   }
 };
 
+void RGWPSGetTopicOp::execute(optional_yield y) {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+  op_ret = ps->get_topic(topic_name, &result);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+    ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+    op_ret = -EPERM;
+    return;
+  }
+  ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
 // command (AWS compliant): 
 // POST
 // Action=GetTopicAttributes&TopicArn=<topic-arn>
-class RGWPSGetTopicAttributes_ObjStore_AWS : public RGWPSGetTopicOp {
-public:
-  int get_params() override {
+class RGWPSGetTopicAttributesOp : public RGWOp {
+  private:
+  std::string topic_name;
+  std::optional<RGWPubSub> ps;
+  rgw_pubsub_topic_subs result;
+  
+  int get_params() {
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
 
     if (!topic_arn || topic_arn->resource.empty()) {
@@ -184,6 +336,19 @@ public:
     return 0;
   }
 
+  public:
+  int verify_permission(optional_yield y) override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute(optional_yield y) override;
+
+  const char* name() const override { return "pubsub_topic_get"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -208,12 +373,34 @@ public:
   }
 };
 
+void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+  op_ret = ps->get_topic(topic_name, &result);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+    return;
+  }
+  if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+    ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+    op_ret = -EPERM;
+    return;
+  }
+  ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
 // command (AWS compliant): 
 // POST
 // Action=DeleteTopic&TopicArn=<topic-arn>
-class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp {
-public:
-  int get_params() override {
+class RGWPSDeleteTopicOp : public RGWOp {
+  private:
+  std::string topic_name;
+  std::optional<RGWPubSub> ps;
+  
+  int get_params() {
     const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
 
     if (!topic_arn || topic_arn->resource.empty()) {
@@ -237,7 +424,20 @@ public:
 
     return 0;
   }
-  
+
+  public:
+  int verify_permission(optional_yield) override {
+    return 0;
+  }
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  void execute(optional_yield y) override;
+
+  const char* name() const override { return "pubsub_topic_delete"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
   void send_response() override {
     if (op_ret) {
       set_req_state_err(s, op_ret);
@@ -259,6 +459,20 @@ public:
   }
 };
 
+void RGWPSDeleteTopicOp::execute(optional_yield y) {
+  op_ret = get_params();
+  if (op_ret < 0) {
+    return;
+  }
+  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
+  op_ret = ps->remove_topic(this, topic_name, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
+    return;
+  }
+  ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
+}
+
 namespace {
 // utility classes and functions for handling parameters with the following format:
 // Attributes.entry.{N}.{key|value}={VALUE}
@@ -324,7 +538,7 @@ void update_attribute_map(const std::string& input, AttributeMap& map) {
   key_or_value.pop_back();
 
   auto pos = key_or_value.find("=");
-  if (pos != string::npos) {
+  if (pos != std::string::npos) {
     const auto key_or_value_lhs = key_or_value.substr(0, pos);
     const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1));
     const auto map_it = map.find(idx);
@@ -343,13 +557,13 @@ void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() {
   if (post_body.size() > 0) {
     ldpp_dout(s, 10) << "Content of POST: " << post_body << dendl;
 
-    if (post_body.find("Action") != string::npos) {
+    if (post_body.find("Action") != std::string::npos) {
       const boost::char_separator<char> sep("&");
       const boost::tokenizer<boost::char_separator<char>> tokens(post_body, sep);
       AttributeMap map;
       for (const auto& t : tokens) {
         auto pos = t.find("=");
-        if (pos != string::npos) {
+        if (pos != std::string::npos) {
           const auto key = t.substr(0, pos);
           if (key == "Action") {
             s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
@@ -377,15 +591,15 @@ RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() {
   if (s->info.args.exists("Action")) {
     const auto action = s->info.args.get("Action");
     if (action.compare("CreateTopic") == 0)
-      return new RGWPSCreateTopic_ObjStore_AWS();
+      return new RGWPSCreateTopicOp();
     if (action.compare("DeleteTopic") == 0)
-      return new RGWPSDeleteTopic_ObjStore_AWS;
+      return new RGWPSDeleteTopicOp;
     if (action.compare("ListTopics") == 0)
-      return new RGWPSListTopics_ObjStore_AWS();
+      return new RGWPSListTopicsOp();
     if (action.compare("GetTopic") == 0)
-      return new RGWPSGetTopic_ObjStore_AWS();
+      return new RGWPSGetTopicOp();
     if (action.compare("GetTopicAttributes") == 0)
-      return new RGWPSGetTopicAttributes_ObjStore_AWS();
+      return new RGWPSGetTopicAttributesOp();
   }
 
   return nullptr;
@@ -395,7 +609,6 @@ int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, option
   return RGW_Auth_S3::authorize(dpp, store, auth_registry, s, y);
 }
 
-
 namespace {
 // return a unique topic by prefexing with the notification name: <notification>_<topic>
 std::string topic_to_unique(const std::string& topic, const std::string& notification) {
@@ -404,7 +617,7 @@ std::string topic_to_unique(const std::string& topic, const std::string& notific
 
 // extract the topic from a unique topic of the form: <notification>_<topic>
 [[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
-  if (unique_topic.find(notification + "_") == string::npos) {
+  if (unique_topic.find(notification + "_") == std::string::npos) {
     return "";
   }
   return unique_topic.substr(notification.length() + 1);
@@ -445,9 +658,43 @@ int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_buc
 // command (S3 compliant): PUT /<bucket name>?notification
 // a "notification" and a subscription will be auto-generated
 // actual configuration is XML encoded in the body of the message
-class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
+class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
+  private:
+  std::optional<RGWPubSub> ps;
+  std::string bucket_name;
+  RGWBucketInfo bucket_info;
   rgw_pubsub_s3_notifications configurations;
 
+  int get_params() {
+    bool exists;
+    const auto no_value = s->info.args.get("notification", &exists);
+    if (!exists) {
+      ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
+      return -EINVAL;
+    } 
+    if (no_value.length() > 0) {
+      ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
+      return -EINVAL;
+    }
+    if (s->bucket_name.empty()) {
+      ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
+      return -EINVAL;
+    }
+    bucket_name = s->bucket_name;
+    return 0;
+  }
+
+  public:
+  int verify_permission(optional_yield y) override;
+
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+
+  const char* name() const override { return "pubsub_notification_create_s3"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
   int get_params_from_body() {
     const auto max_size = s->cct->_conf->rgw_max_put_param_size;
     int r;
@@ -484,31 +731,10 @@ class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp {
     return 0;
   }
 
-  int get_params() override {
-    bool exists;
-    const auto no_value = s->info.args.get("notification", &exists);
-    if (!exists) {
-      ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
-      return -EINVAL;
-    } 
-    if (no_value.length() > 0) {
-      ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
-      return -EINVAL;
-    }
-    if (s->bucket_name.empty()) {
-      ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
-      return -EINVAL;
-    }
-    bucket_name = s->bucket_name;
-    return 0;
-  }
-
-public:
-  const char* name() const override { return "pubsub_notification_create_s3"; }
   void execute(optional_yield) override;
 };
 
-void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
+void RGWPSCreateNotifOp::execute(optional_yield y) {
   op_ret = get_params_from_body();
   if (op_ret < 0) {
     return;
@@ -597,12 +823,48 @@ void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) {
   }
 }
 
+int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
+  int ret = get_params();
+  if (ret < 0) {
+    return ret;
+  }
+
+  std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
+  if (ret < 0) {
+    ldpp_dout(this, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
+    return ret;
+  }
+  bucket_info = bucket->get_info();
+
+  if (bucket_info.owner != s->owner.get_id()) {
+    ldpp_dout(this, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
+    return -EPERM;
+  }
+  return 0;
+}
+
 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
-class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp {
-private:
+class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
+  private:
+  std::optional<RGWPubSub> ps;
+  std::string bucket_name;
+  RGWBucketInfo bucket_info;
   std::string notif_name;
+  
+  public:
+  int verify_permission(optional_yield y) override;
 
-  int get_params() override {
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+  
+  const char* name() const override { return "pubsub_notification_delete_s3"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
+  int get_params() {
     bool exists;
     notif_name = s->info.args.get("notification", &exists);
     if (!exists) {
@@ -617,12 +879,10 @@ private:
     return 0;
   }
 
-public:
   void execute(optional_yield y) override;
-  const char* name() const override { return "pubsub_notification_delete_s3"; }
 };
 
-void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
+void RGWPSDeleteNotifOp::execute(optional_yield y) {
   op_ret = get_params();
   if (op_ret < 0) {
     return;
@@ -656,13 +916,37 @@ void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) {
   op_ret = delete_all_notifications(this, bucket_topics, b, y, *ps);
 }
 
+int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
+  int ret = get_params();
+  if (ret < 0) {
+    return ret;
+  }
+
+  std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
+  if (ret < 0) {
+    return ret;
+  }
+  bucket_info = bucket->get_info();
+
+  if (bucket_info.owner != s->owner.get_id()) {
+    ldpp_dout(this, 1) << "user doesn't own bucket, cannot remove notification" << dendl;
+    return -EPERM;
+  }
+  return 0;
+}
+
 // command (S3 compliant): GET /bucket?notification[=<notification-id>]
-class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp {
+class RGWPSListNotifsOp : public RGWOp {
 private:
+  std::string bucket_name;
+  RGWBucketInfo bucket_info;
+  std::optional<RGWPubSub> ps;
   std::string notif_name;
   rgw_pubsub_s3_notifications notifications;
 
-  int get_params() override {
+  int get_params() {
     bool exists;
     notif_name = s->info.args.get("notification", &exists);
     if (!exists) {
@@ -677,7 +961,17 @@ private:
     return 0;
   }
 
-public:
+  public:
+  int verify_permission(optional_yield y) override;
+
+  void pre_exec() override {
+    rgw_bucket_object_pre_exec(s);
+  }
+
+  const char* name() const override { return "pubsub_notifications_get_s3"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
   void execute(optional_yield y) override;
   void send_response() override {
     if (op_ret) {
@@ -692,10 +986,9 @@ public:
     notifications.dump_xml(s->formatter);
     rgw_flush_formatter_and_reset(s, s->formatter);
   }
-  const char* name() const override { return "pubsub_notifications_get_s3"; }
 };
 
-void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
+void RGWPSListNotifsOp::execute(optional_yield y) {
   ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
   auto b = ps->get_bucket(bucket_info.bucket);
   ceph_assert(b);
@@ -728,27 +1021,49 @@ void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) {
   }
 }
 
+int RGWPSListNotifsOp::verify_permission(optional_yield y) {
+  int ret = get_params();
+  if (ret < 0) {
+    return ret;
+  }
+
+  std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+  ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
+  if (ret < 0) {
+    return ret;
+  }
+  bucket_info = bucket->get_info();
+
+  if (bucket_info.owner != s->owner.get_id()) {
+    ldpp_dout(this, 1) << "user doesn't own bucket, cannot get notification list" << dendl;
+    return -EPERM;
+  }
+
+  return 0;
+}
+
 RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
-  return new RGWPSListNotifs_ObjStore_S3();
+  return new RGWPSListNotifsOp();
 }
 
 RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
-  return new RGWPSCreateNotif_ObjStore_S3();
+  return new RGWPSCreateNotifOp();
 }
 
 RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
-  return new RGWPSDeleteNotif_ObjStore_S3();
+  return new RGWPSDeleteNotifOp();
 }
 
 RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
-    return new RGWPSListNotifs_ObjStore_S3();
+    return new RGWPSListNotifsOp();
 }
 
 RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
-  return new RGWPSCreateNotif_ObjStore_S3();
+  return new RGWPSCreateNotifOp();
 }
 
 RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
-  return new RGWPSDeleteNotif_ObjStore_S3();
+  return new RGWPSDeleteNotifOp();
 }
 
diff --git a/src/rgw/store/rados/rgw_rest_pubsub_common.cc b/src/rgw/store/rados/rgw_rest_pubsub_common.cc
deleted file mode 100644 (file)
index 6410e5b..0000000
+++ /dev/null
@@ -1,184 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "rgw_common.h"
-#include "rgw_rest_pubsub_common.h"
-#include "common/dout.h"
-#include "rgw_url.h"
-#include "rgw_sal_rados.h"
-
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rgw
-
-bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
-  const auto is_secure = rgw_transport_is_secure(cct, env);
-  if (!is_secure && g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
-    ldout(cct, 0) << "WARNING: bypassing endpoint validation, allow sending password over insecure transport" << dendl;
-    return true;
-  }
-  return is_secure;
-}
-
-bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) {
-  if (dest.push_endpoint.empty()) {
-      return true;
-  }
-  std::string user;
-  std::string password;
-  if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
-    ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
-    return false;
-  }
-  // this should be verified inside parse_url()
-  ceph_assert(user.empty() == password.empty());
-  if (!user.empty()) {
-      dest.stored_secret = true;
-      if (!verify_transport_security(cct, env)) {
-        ldout(cct, 1) << "endpoint validation error: sending password over insecure transport" << dendl;
-        return false;
-      }
-  }
-  return true;
-}
-
-bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
-    return topic.topic.dest.stored_secret;
-}
-
-bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
-    for (const auto& topic : topics.topics) {
-        if (topic_has_endpoint_secret(topic.second)) return true;
-    }
-    return false;
-}
-
-void RGWPSCreateTopicOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  op_ret = ps->create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldpp_dout(this, 20) << "successfully created topic '" << topic_name << "'" << dendl;
-}
-
-void RGWPSListTopicsOp::execute(optional_yield y) {
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  op_ret = ps->get_topics(&result);
-  // if there are no topics it is not considered an error
-  op_ret = op_ret == -ENOENT ? 0 : op_ret;
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
-    return;
-  }
-  if (topics_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
-    ldpp_dout(this, 1) << "topics contain secret and cannot be sent over insecure transport" << dendl;
-    op_ret = -EPERM;
-    return;
-  }
-  ldpp_dout(this, 20) << "successfully got topics" << dendl;
-}
-
-void RGWPSGetTopicOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  op_ret = ps->get_topic(topic_name, &result);
-  if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
-    ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
-    op_ret = -EPERM;
-    return;
-  }
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
-}
-
-void RGWPSDeleteTopicOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  ps.emplace(static_cast<rgw::sal::RadosStore*>(store), s->owner.get_id().tenant);
-  op_ret = ps->remove_topic(this, topic_name, y);
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
-    return;
-  }
-  ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
-}
-
-int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
-  int ret = get_params();
-  if (ret < 0) {
-    return ret;
-  }
-
-  std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
-  std::unique_ptr<rgw::sal::Bucket> bucket;
-  ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
-  if (ret < 0) {
-    ldpp_dout(this, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
-    return ret;
-  }
-  bucket_info = bucket->get_info();
-
-  if (bucket_info.owner != s->owner.get_id()) {
-    ldpp_dout(this, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
-    return -EPERM;
-  }
-  return 0;
-}
-
-int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
-  int ret = get_params();
-  if (ret < 0) {
-    return ret;
-  }
-
-  std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
-  std::unique_ptr<rgw::sal::Bucket> bucket;
-  ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
-  if (ret < 0) {
-    return ret;
-  }
-  bucket_info = bucket->get_info();
-
-  if (bucket_info.owner != s->owner.get_id()) {
-    ldpp_dout(this, 1) << "user doesn't own bucket, cannot remove notification" << dendl;
-    return -EPERM;
-  }
-  return 0;
-}
-
-int RGWPSListNotifsOp::verify_permission(optional_yield y) {
-  int ret = get_params();
-  if (ret < 0) {
-    return ret;
-  }
-
-  std::unique_ptr<rgw::sal::User> user = store->get_user(s->owner.get_id());
-  std::unique_ptr<rgw::sal::Bucket> bucket;
-  ret = store->get_bucket(this, user.get(), s->owner.get_id().tenant, bucket_name, &bucket, y);
-  if (ret < 0) {
-    return ret;
-  }
-  bucket_info = bucket->get_info();
-
-  if (bucket_info.owner != s->owner.get_id()) {
-    ldpp_dout(this, 1) << "user doesn't own bucket, cannot get notification list" << dendl;
-    return -EPERM;
-  }
-
-  return 0;
-}
-
diff --git a/src/rgw/store/rados/rgw_rest_pubsub_common.h b/src/rgw/store/rados/rgw_rest_pubsub_common.h
deleted file mode 100644 (file)
index fe3126c..0000000
+++ /dev/null
@@ -1,162 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-#pragma once
-#include <string>
-#include <optional>
-#include "rgw_op.h"
-#include "rgw_pubsub.h"
-
-// make sure that endpoint is a valid URL
-// make sure that if user/password are passed inside URL, it is over secure connection
-// update rgw_pubsub_sub_dest to indicate that a password is stored in the URL
-bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env);
-
-// create a topic
-class RGWPSCreateTopicOp : public RGWDefaultResponseOp {
-protected:
-  std::optional<RGWPubSub> ps;
-  std::string topic_name;
-  rgw_pubsub_sub_dest dest;
-  std::string topic_arn;
-  std::string opaque_data;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission(optional_yield) override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute(optional_yield) override;
-
-  const char* name() const override { return "pubsub_topic_create"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-// list all topics
-class RGWPSListTopicsOp : public RGWOp {
-protected:
-  std::optional<RGWPubSub> ps;
-  rgw_pubsub_topics result;
-
-public:
-  int verify_permission(optional_yield) override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute(optional_yield) override;
-
-  const char* name() const override { return "pubsub_topics_list"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-// get topic information
-class RGWPSGetTopicOp : public RGWOp {
-protected:
-  std::string topic_name;
-  std::optional<RGWPubSub> ps;
-  rgw_pubsub_topic_subs result;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission(optional_yield y) override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute(optional_yield y) override;
-
-  const char* name() const override { return "pubsub_topic_get"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};
-
-// delete a topic
-class RGWPSDeleteTopicOp : public RGWDefaultResponseOp {
-protected:
-  std::string topic_name;
-  std::optional<RGWPubSub> ps;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission(optional_yield) override {
-    return 0;
-  }
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  void execute(optional_yield y) override;
-
-  const char* name() const override { return "pubsub_topic_delete"; }
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-// notification creation
-class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
-protected:
-  std::optional<RGWPubSub> ps;
-  std::string bucket_name;
-  RGWBucketInfo bucket_info;
-
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission(optional_yield y) override;
-
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
-};
-
-// delete a notification
-class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
-protected:
-  std::optional<RGWPubSub> ps;
-  std::string bucket_name;
-  RGWBucketInfo bucket_info;
-  
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission(optional_yield y) override;
-
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-  
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
-};
-
-// get topics/notifications on a bucket
-class RGWPSListNotifsOp : public RGWOp {
-protected:
-  std::string bucket_name;
-  RGWBucketInfo bucket_info;
-  std::optional<RGWPubSub> ps;
-
-  virtual int get_params() = 0;
-
-public:
-  int verify_permission(optional_yield y) override;
-
-  void pre_exec() override {
-    rgw_bucket_object_pre_exec(s);
-  }
-
-  RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
-  uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
-};