]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: do init/validation in init_processing()
authorCasey Bodley <cbodley@redhat.com>
Sat, 9 Mar 2024 16:08:17 +0000 (11:08 -0500)
committerCasey Bodley <cbodley@redhat.com>
Fri, 12 Apr 2024 19:34:30 +0000 (15:34 -0400)
verify_permission() should do permission checks and nothing else!

admin/system users ignore errors from verify_permission() and go on to
call execute() regardless. that means that execute() can't rely on any
initialization that happened during verify_permission(), at risk of
crashing on admin/system requests. it also means that any permission
checks in execute() won't get overridden for admin/system users,
breaking their superuser access

by moving all parameter validation and initialization into
init_processing(), we can prepare all the state that verify_permission()
will need to do it's thing

Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 88a35fed3507e37ad191995b014cbd5b089280d0)

src/rgw/rgw_rest_pubsub.cc

index 0fc4b3a83a8c3860de9b384058dbde4841a91e87..4da155f4d882b933f1ac54d2660072ebdd87fb1d 100644 (file)
@@ -76,6 +76,21 @@ bool validate_topic_name(const std::string& name, std::string& message)
   return true;
 }
 
+auto validate_topic_arn(const std::string& str, std::string& message)
+  -> boost::optional<rgw::ARN>
+{
+  if (str.empty()) {
+    message = "Missing required element TopicArn";
+    return boost::none;
+  }
+  auto arn = rgw::ARN::parse(str);
+  if (!arn || arn->resource.empty()) {
+    message = "Invalid value for TopicArn";
+    return boost::none;
+  }
+  return arn;
+}
+
 bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) {
     return topic.dest.stored_secret;
 }
@@ -143,8 +158,9 @@ class RGWPSCreateTopicOp : public RGWOp {
   private:
   bufferlist bl_post_body;
   std::string topic_name;
+  rgw::ARN topic_arn;
+  std::optional<rgw_pubsub_topic> existing;
   rgw_pubsub_dest dest;
-  std::string topic_arn;
   std::string opaque_data;
   std::string policy_text;
 
@@ -198,10 +214,9 @@ class RGWPSCreateTopicOp : public RGWOp {
     // dest object only stores endpoint info
     dest.arn_topic = topic_name;
     // the topic ARN will be sent in the reply
-    const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, 
+    topic_arn = rgw::ARN{rgw::Partition::aws, rgw::Service::sns,
         driver->get_zone()->get_zonegroup().get_name(),
-        s->user->get_tenant(), topic_name);
-    topic_arn = arn.to_string();
+        s->user->get_tenant(), topic_name};
     return 0;
   }
 
@@ -209,36 +224,36 @@ class RGWPSCreateTopicOp : public RGWOp {
   explicit RGWPSCreateTopicOp(bufferlist bl_post_body)
     : bl_post_body(std::move(bl_post_body)) {}
 
-  int verify_permission(optional_yield y) override {
-    auto ret = get_params();
+  int init_processing(optional_yield y) override {
+    int ret = get_params();
     if (ret < 0) {
       return ret;
     }
 
+    // try to load existing topic for owner and policy
     const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
     rgw_pubsub_topic result;
     ret = ps.get_topic(this, topic_name, result, y, nullptr);
     if (ret == -ENOENT) {
       // topic not present
-      return 0;
+    } else if (ret < 0) {
+      ldpp_dout(this, 1) << "failed to read topic '" << topic_name
+          << "', with error:" << ret << dendl;
+      return ret;
+    } else {
+      existing = std::move(result);
     }
-    if (ret == 0) {
-      ret = verify_topic_owner_or_policy(
-          s, result, driver->get_zone()->get_zonegroup().get_name(),
-          rgw::IAM::snsCreateTopic);
-      if (ret == 0)
-      {
-        return 0;
-      }
+    return RGWOp::init_processing(y);
+  }
 
-      ldpp_dout(this, 4) << "no permission to modify topic '" << topic_name
-                         << "', topic already exist." << dendl;
-      return -EACCES;
+  int verify_permission(optional_yield y) override {
+    if (!existing) {
+      return 0;
     }
-    ldpp_dout(this, 4) << "failed to read topic '" << topic_name
-                       << "', with error:" << ret << dendl;
-    return ret;
-   }
+    return verify_topic_owner_or_policy(
+        s, *existing, driver->get_zone()->get_zonegroup().get_name(),
+        rgw::IAM::snsCreateTopic);
+  }
 
   void pre_exec() override {
     rgw_bucket_object_pre_exec(s);
@@ -263,7 +278,7 @@ class RGWPSCreateTopicOp : public RGWOp {
     const auto f = s->formatter;
     f->open_object_section_in_ns("CreateTopicResponse", RGW_REST_SNS_XMLNS);
     f->open_object_section("CreateTopicResult");
-    encode_xml("TopicArn", topic_arn, f); 
+    encode_xml("TopicArn", topic_arn.to_string(), f);
     f->close_section(); // CreateTopicResult
     f->open_object_section("ResponseMetadata");
     encode_xml("RequestId", s->req_id, f); 
@@ -295,8 +310,8 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
     }
   }
   const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
-  op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
-                           s->owner.id, policy_text, y);
+  op_ret = ps.create_topic(this, topic_name, dest, topic_arn.to_string(),
+                           opaque_data, s->owner.id, policy_text, y);
   if (op_ret < 0) {
     ldpp_dout(this, 4) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
@@ -385,25 +400,44 @@ void RGWPSListTopicsOp::execute(optional_yield y) {
 // POST
 // Action=GetTopic&TopicArn=<topic-arn>
 class RGWPSGetTopicOp : public RGWOp {
-  private:
+ private:
+  rgw::ARN topic_arn;
   std::string topic_name;
   rgw_pubsub_topic result;
   
   int get_params() {
-    const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
-
-    if (!topic_arn || topic_arn->resource.empty()) {
-        ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
-        return -EINVAL;
+    auto arn = validate_topic_arn(s->info.args.get("TopicArn"), s->err.message);
+    if (!arn) {
+      return -EINVAL;
     }
-
-    topic_name = topic_arn->resource;
+    topic_arn = std::move(*arn);
+    topic_name = topic_arn.resource;
     return 0;
   }
 
-  public:
+ public:
+  int init_processing(optional_yield y) override {
+    int ret = get_params();
+    if (ret < 0) {
+      return ret;
+    }
+    const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
+    ret = ps.get_topic(this, topic_name, result, y, nullptr);
+    if (ret < 0) {
+      ldpp_dout(this, 4) << "failed to get topic '" << topic_name << "', ret=" << ret << dendl;
+      return ret;
+    }
+    if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+      s->err.message = "Topic contains secrets that must be transmitted over a secure transport";
+      return -EPERM;
+    }
+    return RGWOp::init_processing(y);
+  }
+
   int verify_permission(optional_yield y) override {
-    return 0;
+    return verify_topic_owner_or_policy(
+        s, result, driver->get_zone()->get_zonegroup().get_name(),
+        rgw::IAM::snsGetTopicAttributes);
   }
   void pre_exec() override {
     rgw_bucket_object_pre_exec(s);
@@ -439,55 +473,51 @@ class RGWPSGetTopicOp : public RGWOp {
 };
 
 void RGWPSGetTopicOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
-  op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
-    ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
-    op_ret = -EPERM;
-    return;
-  }
-  op_ret = verify_topic_owner_or_policy(
-      s, result, driver->get_zone()->get_zonegroup().get_name(),
-      rgw::IAM::snsGetTopicAttributes);
-  if (op_ret != 0) {
-    ldpp_dout(this, 1) << "no permission to get topic '" << topic_name
-                       << "'" << dendl;
-    return;
-  }
-  ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+  ldpp_dout(this, 4) << "successfully got topic '" << topic_name << "'" << dendl;
 }
 
 // command (AWS compliant): 
 // POST
 // Action=GetTopicAttributes&TopicArn=<topic-arn>
 class RGWPSGetTopicAttributesOp : public RGWOp {
-  private:
+ private:
+  rgw::ARN topic_arn;
   std::string topic_name;
   rgw_pubsub_topic result;
   
   int get_params() {
-    const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
-
-    if (!topic_arn || topic_arn->resource.empty()) {
-        ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl;
-        return -EINVAL;
+    auto arn = validate_topic_arn(s->info.args.get("TopicArn"), s->err.message);
+    if (!arn) {
+      return -EINVAL;
     }
+    topic_arn = std::move(*arn);
+    topic_name = topic_arn.resource;
+    return 0;
+  }
 
-    topic_name = topic_arn->resource;
+ public:
+  int init_processing(optional_yield y) override {
+    int ret = get_params();
+    if (ret < 0) {
+      return ret;
+    }
+    const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
+    ret = ps.get_topic(this, topic_name, result, y, nullptr);
+    if (ret < 0) {
+      ldpp_dout(this, 4) << "failed to get topic '" << topic_name << "', ret=" << ret << dendl;
+      return ret;
+    }
+    if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+      s->err.message = "Topic contains secrets that must be transmitted over a secure transport";
+      return -EPERM;
+    }
     return 0;
   }
 
-  public:
   int verify_permission(optional_yield y) override {
-    return 0;
+    return verify_topic_owner_or_policy(
+        s, result, driver->get_zone()->get_zonegroup().get_name(),
+        rgw::IAM::snsGetTopicAttributes);
   }
   void pre_exec() override {
     rgw_bucket_object_pre_exec(s);
@@ -523,30 +553,7 @@ class RGWPSGetTopicAttributesOp : public RGWOp {
 };
 
 void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
-  const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
-  op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
-  if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
-    return;
-  }
-  if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
-    ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
-    op_ret = -EPERM;
-    return;
-  }
-  op_ret = verify_topic_owner_or_policy(
-      s, result, driver->get_zone()->get_zonegroup().get_name(),
-      rgw::IAM::snsGetTopicAttributes);
-  if (op_ret != 0) {
-    ldpp_dout(this, 1) << "no permission to get topic '" << topic_name
-                       << "'" << dendl;
-    return;
-  }
-  ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+  ldpp_dout(this, 4) << "successfully got topic '" << topic_name << "'" << dendl;
 }
 
 // command (AWS compliant):
@@ -555,8 +562,9 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
 class RGWPSSetTopicAttributesOp : public RGWOp {
  private:
   bufferlist bl_post_body;
+  rgw::ARN topic_arn;
   std::string topic_name;
-  std::string topic_arn;
+  rgw_pubsub_topic result;
   std::string opaque_data;
   std::string policy_text;
   rgw_pubsub_dest dest;
@@ -564,22 +572,16 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
   std::string attribute_name;
 
   int get_params() {
-    const auto arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
-
-    if (!arn || arn->resource.empty()) {
-      ldpp_dout(this, 1) << "SetTopicAttribute Action 'TopicArn' argument is "
-                            "missing or invalid"
-                         << dendl;
+    auto arn = validate_topic_arn(s->info.args.get("TopicArn"), s->err.message);
+    if (!arn) {
       return -EINVAL;
     }
-    topic_arn = arn->to_string();
-    topic_name = arn->resource;
+    topic_arn = std::move(*arn);
+    topic_name = topic_arn.resource;
+
     attribute_name = s->info.args.get("AttributeName");
     if (attribute_name.empty()) {
-      ldpp_dout(this, 1)
-          << "SetTopicAttribute Action 'AttributeName' argument is "
-             "missing or invalid"
-          << dendl;
+      s->err.message = "Missing required element AttributeName";
       return -EINVAL;
     }
     return 0;
@@ -651,12 +653,12 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
   explicit RGWPSSetTopicAttributesOp(bufferlist bl_post_body)
     : bl_post_body(std::move(bl_post_body)) {}
 
-  int verify_permission(optional_yield y) override {
-    auto ret = get_params();
+  int init_processing(optional_yield y) override {
+    int ret = get_params();
     if (ret < 0) {
       return ret;
     }
-    rgw_pubsub_topic result;
+
     const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
     ret = ps.get_topic(this, topic_name, result, y, nullptr);
     if (ret < 0) {
@@ -665,16 +667,19 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
       return ret;
     }
     topic_owner = result.owner;
-    ret = verify_topic_owner_or_policy(
-        s, result, driver->get_zone()->get_zonegroup().get_name(),
-        rgw::IAM::snsSetTopicAttributes);
-    if (ret != 0) {
-      ldpp_dout(this, 1) << "no permission to set attributes for topic '" << topic_name
-                         << "'" << dendl;
+
+    ret = map_attributes(result);
+    if (ret < 0) {
       return ret;
     }
 
-    return map_attributes(result);
+    return RGWOp::init_processing(y);
+  }
+
+  int verify_permission(optional_yield y) override {
+    return verify_topic_owner_or_policy(
+        s, result, driver->get_zone()->get_zonegroup().get_name(),
+        rgw::IAM::snsSetTopicAttributes);
   }
 
   void pre_exec() override { rgw_bucket_object_pre_exec(s); }
@@ -710,7 +715,7 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
     op_ret = rgw_forward_request_to_master(
         this, *s->penv.site, s->user->get_id(), &bl_post_body, nullptr, s->info, y);
     if (op_ret < 0) {
-      ldpp_dout(this, 1)
+      ldpp_dout(this, 4)
           << "SetTopicAttributes forward_request_to_master returned ret = "
           << op_ret << dendl;
       return;
@@ -719,7 +724,7 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
   if (!dest.push_endpoint.empty() && dest.persistent) {
     op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
     if (op_ret < 0) {
-      ldpp_dout(this, 1)
+      ldpp_dout(this, 4)
           << "SetTopicAttributes Action failed to create queue for "
              "persistent topics. error:"
           << op_ret << dendl;
@@ -728,17 +733,17 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
   } else {  // changing the persistent topic to non-persistent.
     op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
     if (op_ret != -ENOENT && op_ret < 0) {
-      ldpp_dout(this, 1) << "SetTopicAttributes Action failed to remove queue "
+      ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue "
                             "for persistent topics. error:"
                          << op_ret << dendl;
       return;
     }
   }
   const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
-  op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
-                           topic_owner, policy_text, y);
+  op_ret = ps.create_topic(this, topic_name, dest, topic_arn.to_string(),
+                           opaque_data, topic_owner, policy_text, y);
   if (op_ret < 0) {
-    ldpp_dout(this, 1) << "failed to SetAttributes for topic '" << topic_name
+    ldpp_dout(this, 4) << "failed to SetAttributes for topic '" << topic_name
                        << "', ret=" << op_ret << dendl;
     return;
   }
@@ -752,17 +757,17 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
 class RGWPSDeleteTopicOp : public RGWOp {
   private:
   bufferlist bl_post_body;
+  rgw::ARN topic_arn;
   std::string topic_name;
+  std::optional<rgw_pubsub_topic> topic;
   
   int get_params() {
-    const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
-
-    if (!topic_arn || topic_arn->resource.empty()) {
-      ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+    auto arn = validate_topic_arn(s->info.args.get("TopicArn"), s->err.message);
+    if (!arn) {
       return -EINVAL;
     }
-
-    topic_name = topic_arn->resource;
+    topic_arn = std::move(*arn);
+    topic_name = topic_arn.resource;
     return 0;
   }
 
@@ -770,9 +775,37 @@ class RGWPSDeleteTopicOp : public RGWOp {
   explicit RGWPSDeleteTopicOp(bufferlist bl_post_body)
     : bl_post_body(std::move(bl_post_body)) {}
 
-  int verify_permission(optional_yield) override {
-    return 0;
+  int init_processing(optional_yield y) override {
+    int ret = get_params();
+    if (ret < 0) {
+      return ret;
+    }
+
+    const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
+    rgw_pubsub_topic result;
+    ret = ps.get_topic(this, topic_name, result, y, nullptr);
+    if (ret == -ENOENT) {
+      // leave topic empty
+    } else if (ret < 0) {
+      ldpp_dout(this, 4) << "failed to get topic '" << topic_name
+                         << "', ret=" << ret << dendl;
+      return ret;
+    } else {
+      topic = std::move(result);
+    }
+
+    return RGWOp::init_processing(y);
   }
+
+  int verify_permission(optional_yield y) override {
+    if (!topic) {
+      return 0;
+    }
+    return verify_topic_owner_or_policy(
+        s, *topic, driver->get_zone()->get_zonegroup().get_name(),
+        rgw::IAM::snsDeleteTopic);
+  }
+
   void pre_exec() override {
     rgw_bucket_object_pre_exec(s);
   }
@@ -804,10 +837,6 @@ class RGWPSDeleteTopicOp : public RGWOp {
 };
 
 void RGWPSDeleteTopicOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
-  }
   if (!driver->is_meta_master()) {
     op_ret = rgw_forward_request_to_master(
         this, *s->penv.site, s->user->get_id(), &bl_post_body, nullptr, s->info, y);
@@ -818,30 +847,19 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) {
       return;
     }
   }
-  const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
 
-  rgw_pubsub_topic result;
-  op_ret = ps.get_topic(this, topic_name, result, y, nullptr);
-  if (op_ret == 0) {
-    op_ret = verify_topic_owner_or_policy(
-        s, result, driver->get_zone()->get_zonegroup().get_name(),
-        rgw::IAM::snsDeleteTopic);
-    if (op_ret < 0) {
-      ldpp_dout(this, 1) << "no permission to remove topic '" << topic_name
-                         << "'" << dendl;
-      return;
-    }
-    op_ret = ps.remove_topic(this, topic_name, y);
-    if (op_ret < 0 && op_ret != -ENOENT) {
-      ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
-      return;
-    }
-    ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
-  } else if (op_ret != -ENOENT) {
-    ldpp_dout(this, 1) << "failed to fetch topic '" << topic_name
-                       << "' with error: " << op_ret << dendl;
+  if (!topic) {
     return;
   }
+
+  const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
+  op_ret = ps.remove_topic(this, topic_name, y);
+  if (op_ret < 0 && op_ret != -ENOENT) {
+    ldpp_dout(this, 4) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
+    return;
+  }
+  ldpp_dout(this, 4) << "successfully removed topic '" << topic_name << "'" << dendl;
+
   if (op_ret == -ENOENT) {
     // its not an error if no topics exist, just a no-op
     op_ret = 0;
@@ -939,6 +957,9 @@ int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_buc
 // actual configuration is XML encoded in the body of the message
 class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
   bufferlist data;
+  rgw_pubsub_s3_notifications configurations;
+  std::map<rgw::ARN, rgw_pubsub_topic> topics;
+
   int verify_params() override {
     bool exists;
     const auto no_value = s->info.args.get("notification", &exists);
@@ -992,6 +1013,7 @@ class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
     return 0;
   }
 public:
+  int init_processing(optional_yield y) override;
   int verify_permission(optional_yield y) override;
 
   void pre_exec() override {
@@ -1002,25 +1024,65 @@ public:
   RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
   uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
 
-
   void execute(optional_yield) override;
   void execute_v2(optional_yield);
 };
 
-void RGWPSCreateNotifOp::execute(optional_yield y) {
-  if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
-    return execute_v2(y);
+int RGWPSCreateNotifOp::init_processing(optional_yield y)
+{
+  int ret = verify_params();
+  if (ret < 0) {
+    return ret;
   }
-  op_ret = verify_params();
-  if (op_ret < 0) {
-    return;
+
+  ret = get_params_from_body(configurations);
+  if (ret < 0) {
+    return ret;
   }
 
-  rgw_pubsub_s3_notifications configurations;
-  op_ret = get_params_from_body(configurations);
-  if (op_ret < 0) {
-    return;
+  const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
+
+  for (const auto& c : configurations.list) {
+    const auto& notif_name = c.id;
+    if (notif_name.empty()) {
+      s->err.message = "Missing required element Id";
+      return -EINVAL;
+    }
+    if (c.topic_arn.empty()) {
+      s->err.message = "Missing required element Topic";
+      return -EINVAL;
+    }
+
+    const auto arn = rgw::ARN::parse(c.topic_arn);
+    if (!arn || arn->resource.empty()) {
+      s->err.message = "Invalid Topic ARN";
+      return -EINVAL;
+    }
+    const auto& topic_name = arn->resource;
+
+    if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
+      s->err.message = "Unknown Event type: " + notif_name;
+      return -EINVAL;
+    }
+
+    // load topic metadata if we haven't already
+    auto insert = topics.emplace(std::piecewise_construct,
+                                 std::forward_as_tuple(*arn),
+                                 std::forward_as_tuple());
+    if (insert.second) {
+      rgw_pubsub_topic& topic_info = insert.first->second;
+      ret = ps.get_topic(this, topic_name, topic_info, y, nullptr);
+      if (ret < 0) {
+        ldpp_dout(this, 4) << "failed to get topic '" << topic_name << "', ret=" << ret << dendl;
+        return ret;
+      }
+    }
   }
+
+  return RGWOp::init_processing(y);
+}
+
+void RGWPSCreateNotifOp::execute(optional_yield y) {
   if (!driver->is_meta_master()) {
     op_ret = rgw_forward_request_to_master(
         this, *s->penv.site, s->user->get_id(), &data, nullptr, s->info, y);
@@ -1032,18 +1094,12 @@ void RGWPSCreateNotifOp::execute(optional_yield y) {
     }
   }
 
-  std::unique_ptr<rgw::sal::Bucket> bucket;
-  op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
-                               &bucket, y);
-  if (op_ret < 0) {
-    ldpp_dout(this, 4) << "failed to get bucket '" <<
-      (s->bucket_tenant.empty() ? s->bucket_name : s->bucket_tenant + ":" + s->bucket_name) << 
-      "' info, ret = " << op_ret << dendl;
-    return;
+  if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
+    return execute_v2(y);
   }
 
   const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
-  const RGWPubSub::Bucket b(ps, bucket.get());
+  const RGWPubSub::Bucket b(ps, s->bucket.get());
 
   if(configurations.list.empty()) {
     // get all topics on a bucket
@@ -1060,47 +1116,19 @@ void RGWPSCreateNotifOp::execute(optional_yield y) {
 
   for (const auto& c : configurations.list) {
     const auto& notif_name = c.id;
-    if (notif_name.empty()) {
-      s->err.message = "Missing required element Id";
-      op_ret = -EINVAL;
-      return;
-    }
-    if (c.topic_arn.empty()) {
-      s->err.message = "Missing required element Topic";
-      op_ret = -EINVAL;
-      return;
-    }
 
     const auto arn = rgw::ARN::parse(c.topic_arn);
-    if (!arn || arn->resource.empty()) {
-      s->err.message = "Invalid Topic ARN";
-      op_ret = -EINVAL;
-      return;
+    if (!arn) { // already validated above
+      continue;
     }
+    const auto& topic_name = arn->resource;
 
-    if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
-      s->err.message = "Unknown Event type: " + notif_name;
-      op_ret = -EINVAL;
-      return;
+    auto t = topics.find(*arn);
+    if (t == topics.end()) {
+      continue;
     }
+    auto& topic_info = t->second;
 
-    const auto topic_name = arn->resource;
-
-    // get topic information. destination information is stored in the topic
-    rgw_pubsub_topic topic_info;
-    op_ret = ps.get_topic(this, topic_name, topic_info, y, nullptr);
-    if (op_ret < 0) {
-      ldpp_dout(this, 4) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
-      return;
-    }
-    op_ret = verify_topic_owner_or_policy(
-        s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
-        rgw::IAM::snsPublish);
-    if (op_ret != 0) {
-      ldpp_dout(this, 4) << "no permission to create notification for topic '"
-                         << topic_name << "'" << dendl;
-      return;
-    }
     // make sure that full topic configuration match
     // TODO: use ARN match function
     
@@ -1135,35 +1163,26 @@ void RGWPSCreateNotifOp::execute(optional_yield y) {
 }
 
 int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
+  // require s3:PutBucketNotification permission for the bucket
   if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
     return -EACCES;
   }
 
+  // require sns:Publish permission for each topic
+  for (const auto& [arn, topic] : topics) {
+    int ret = verify_topic_owner_or_policy(
+        s, topic, driver->get_zone()->get_zonegroup().get_name(),
+        rgw::IAM::snsPublish);
+    if (ret < 0) {
+      ldpp_dout(this, 4) << "no permission to create notification for topic '"
+                         << arn << "'" << dendl;
+      return ret;
+    }
+  }
   return 0;
 }
 
 void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
-  op_ret = verify_params();
-  if (op_ret < 0) {
-    return;
-  }
-
-  rgw_pubsub_s3_notifications configurations;
-  op_ret = get_params_from_body(configurations);
-  if (op_ret < 0) {
-    return;
-  }
-  if (!driver->is_meta_master()) {
-    op_ret = rgw_forward_request_to_master(
-        this, *s->penv.site, s->user->get_id(), &data, nullptr, s->info, y);
-    if (op_ret < 0) {
-      ldpp_dout(this, 1) << "CreateBucketNotification "
-                            "forward_request_to_master returned ret = "
-                         << op_ret << dendl;
-      return;
-    }
-  }
-
   if (const auto ret = driver->stat_topics_v1(s->bucket_tenant, y, this); ret != -ENOENT) {
     ldpp_dout(this, 1) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
       << ". please try again later" << dendl; 
@@ -1185,57 +1204,24 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
     return;
   }
   const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
-  std::unordered_map<std::string, rgw_pubsub_topic> topics;
   for (const auto& c : configurations.list) {
     const auto& notif_name = c.id;
-    if (notif_name.empty()) {
-      s->err.message = "Missing required element Id";
-      op_ret = -EINVAL;
-      return;
-    }
-    if (c.topic_arn.empty()) {
-      s->err.message = "Missing required element Topic";
-      op_ret = -EINVAL;
-      return;
-    }
 
     const auto arn = rgw::ARN::parse(c.topic_arn);
-    if (!arn || arn->resource.empty()) {
-      s->err.message = "Invalid Topic ARN";
-      op_ret = -EINVAL;
-      return;
-    }
-
-    if (std::find(c.events.begin(), c.events.end(),
-                  rgw::notify::UnknownEvent) != c.events.end()) {
-      s->err.message = "Unknown Event type: " + notif_name;
-      op_ret = -EINVAL;
-      return;
+    if (!arn) { // already validated above
+      continue;
     }
     const auto& topic_name = arn->resource;
-    if (!topics.contains(topic_name)) {
-      // get topic information. destination information is stored in the topic
-      rgw_pubsub_topic topic_info;
-      op_ret = ps.get_topic(this, topic_name, topic_info, y,nullptr);
-      if (op_ret < 0) {
-        ldpp_dout(this, 4) << "failed to get topic '" << topic_name
-                           << "', ret=" << op_ret << dendl;
-        return;
-      }
-      op_ret = verify_topic_owner_or_policy(
-          s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
-          rgw::IAM::snsPublish);
-      if (op_ret != 0) {
-        ldpp_dout(this, 4) << "failed to create notification for topic '"
-                           << topic_name << "' topic owned by other user"
-                           << dendl;
-        return;
-      }
-      topics[topic_name] = std::move(topic_info);
+
+    auto t = topics.find(*arn);
+    if (t == topics.end()) {
+      continue;
     }
+    auto& topic_info = t->second;
+
     auto& topic_filter =
         bucket_topics.topics[topic_to_unique(topic_name, notif_name)];
-    topic_filter.topic = topics[topic_name];
+    topic_filter.topic = topic_info;
     topic_filter.events = c.events;
     topic_filter.s3_id = notif_name;
     topic_filter.s3_filter = c.filter;
@@ -1270,7 +1256,8 @@ void RGWPSCreateNotifOp::execute_v2(optional_yield y) {
 
 // command (extension to S3): DELETE /bucket?notification[=<notification-id>]
 class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
-  int get_params(std::string& notif_name) const {
+  std::string notif_name;
+  int get_params() {
     bool exists;
     notif_name = s->info.args.get("notification", &exists);
     if (!exists) {
@@ -1286,6 +1273,7 @@ class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
   void execute_v2(optional_yield y);
 
  public:
+  int init_processing(optional_yield y) override;
   int verify_permission(optional_yield y) override;
 
   void pre_exec() override {
@@ -1299,15 +1287,16 @@ class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
   void execute(optional_yield y) override;
 };
 
-void RGWPSDeleteNotifOp::execute(optional_yield y) {
-  if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
-    return execute_v2(y);
-  }
-  std::string notif_name;
-  op_ret = get_params(notif_name);
-  if (op_ret < 0) {
-    return;
+int RGWPSDeleteNotifOp::init_processing(optional_yield y)
+{
+  int ret = get_params();
+  if (ret < 0) {
+    return ret;
   }
+  return RGWOp::init_processing(y);
+}
+
+void RGWPSDeleteNotifOp::execute(optional_yield y) {
   if (!driver->is_meta_master()) {
     bufferlist indata;
     op_ret = rgw_forward_request_to_master(
@@ -1320,6 +1309,10 @@ void RGWPSDeleteNotifOp::execute(optional_yield y) {
     }
   }
 
+  if (rgw::all_zonegroups_support(*s->penv.site, rgw::zone_features::notification_v2)) {
+    return execute_v2(y);
+  }
+
   const RGWPubSub ps(driver, s->auth.identity->get_tenant(), *s->penv.site);
   const RGWPubSub::Bucket b(ps, s->bucket.get());
 
@@ -1356,23 +1349,6 @@ int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
 }
 
 void RGWPSDeleteNotifOp::execute_v2(optional_yield y) {
-  std::string notif_name;
-  op_ret = get_params(notif_name);
-  if (op_ret < 0) {
-    return;
-  }
-  if (!driver->is_meta_master()) {
-    bufferlist indata;
-    op_ret = rgw_forward_request_to_master(
-        this, *s->penv.site, s->user->get_id(), &indata, nullptr, s->info, y);
-    if (op_ret < 0) {
-      ldpp_dout(this, 4) << "DeleteBucketNotification "
-                            "forward_request_to_master returned error ret= "
-                         << op_ret << dendl;
-      return;
-    }
-  }
-
   if (const auto ret = driver->stat_topics_v1(s->bucket_tenant, y, this); ret != -ENOENT) {
     ldpp_dout(this, 4) << "WARNING: " << (ret == 0 ? "topic migration in process" : "cannot determine topic migration status. ret = " + std::to_string(ret))
       << ". please try again later" << dendl;