]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification: Add support to set topic policy while topic creation
authorkchheda3 <kchheda3@bloomberg.net>
Thu, 5 Oct 2023 16:14:35 +0000 (12:14 -0400)
committerkchheda3 <kchheda3@bloomberg.net>
Mon, 23 Oct 2023 15:29:38 +0000 (11:29 -0400)
Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
12 files changed:
doc/radosgw/notifications.rst
src/common/options/rgw.yaml.in
src/rgw/rgw_auth_s3.cc
src/rgw/rgw_iam_policy.cc
src/rgw/rgw_iam_policy.h
src/rgw/rgw_op_type.h
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_rest_pubsub.cc
src/test/rgw/bucket_notification/api.py
src/test/rgw/bucket_notification/test_bn.py
src/test/rgw/test_rgw_iam_policy.cc

index 16f4847c5dc0d8f2b6f5af71b197bed87bf77046..a38ad9e01bdfb323110815617f054a6c851b8057 100644 (file)
@@ -159,6 +159,7 @@ updating, use the name of an existing topic and different endpoint values).
    [&Attributes.entry.12.key=time_to_live&Attributes.entry.12.value=<seconds to live>]
    [&Attributes.entry.13.key=max_retries&Attributes.entry.13.value=<retries number>]
    [&Attributes.entry.14.key=retry_sleep_duration&Attributes.entry.14.value=<sleep seconds>]
+   [&Attributes.entry.14.key=Policy&Attributes.entry.14.value=<policy-JSON-string>]
 
 Request parameters:
 
@@ -179,6 +180,25 @@ Request parameters:
   default value is taken from `rgw_topic_persistency_sleep_duration`.
   providing a value overrides the global value.
   zero value mean there is no delay between retries.
+- Policy: This will control who can access the topic in addition to the owner of the topic.
+  The policy passed needs to be a JSON string similar to bucket policy.
+  For example, one can send a policy string as follows::
+
+    {
+      "Version": "2012-10-17",
+      "Statement": [{
+        "Effect": "Allow",
+        "Principal": {"AWS": ["arn:aws:iam::usfolks:user/fred:subuser"]},
+        "Action": ["sns:GetTopicAttributes","sns:Publish"],
+        "Resource": ["arn:aws:sns:default::mytopic"],
+      }]
+    }
+
+  Currently, we support only the following actions:
+  - sns:GetTopicAttributes  To list or get existing topics
+  - sns:SetTopicAttributes  To set attributes for the existing topic
+  - sns:DeleteTopic         To delete the existing topic
+  - sns:Publish             To be able to create/subscribe notification on existing topic
 
 - HTTP endpoint
 
@@ -340,6 +360,7 @@ The response has the following format:
 - TopicArn: topic `ARN
   <https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html>`_.
 - OpaqueData: The opaque data set on the topic.
+- Policy: Any access permission set on the topic.
 
 Get Topic Information
 `````````````````````
@@ -393,6 +414,7 @@ The response has the following format:
 - TopicArn: topic `ARN
   <https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html>`_.
 - OpaqueData: the opaque data set on the topic.
+- Policy: Any access permission set on the topic.
 
 Delete Topic
 ````````````
@@ -463,6 +485,61 @@ The response has the following format:
   topic, the request must be made over HTTPS. The "topic list" request will
   otherwise be rejected.
 
+Set Topic Attributes
+````````````````````
+
+::
+
+   POST
+
+   Action=SetTopicAttributes
+   &TopicArn=<topic-arn>&AttributeName=<attribute-name>&AttributeValue=<attribute-value>
+
+This allows to set/modify existing attributes on the specified topic.
+
+.. note::
+
+  - The AttributeName passed will either be updated or created (if not exist) with AttributeValue passed.
+  - Any unsupported AttributeName passed will result in error 400.
+
+The response has the following format:
+
+::
+
+    <SetTopicAttributesResponse xmlns="https://sns.amazonaws.com/doc/2010-03-31/">
+        <ResponseMetadata>
+            <RequestId></RequestId>
+        </ResponseMetadata>
+    </SetTopicAttributesResponse>
+
+Valid AttributeName that can be passed:
+
+  - push-endpoint: This is the URI of an endpoint to send push notifications to.
+  - OpaqueData: Opaque data is set in the topic configuration and added to all
+    notifications that are triggered by the topic.
+  - persistent: This indicates whether notifications to this endpoint are
+    persistent (=asynchronous) or not persistent. (This is "false" by default.)
+  - time_to_live: This will limit the time (in seconds) to retain the notifications.
+  - max_retries: This will limit the max retries before expiring notifications.
+  - retry_sleep_duration: This will control the frequency of retrying the notifications.
+  - Policy: This will control who can access the topic other than owner of the topic.
+  - verify-ssl: This indicates whether the server certificates must be validated by
+    the client. This is "true" by default.
+  - ``use-ssl``: If this is set to "true", a secure connection is used to
+    connect to the broker. This is "false" by default.
+  - cloudevents: This indicates whether the HTTP header should contain
+    attributes according to the `S3 CloudEvents Spec`_. 
+  - amqp-exchange: The exchanges must exist and must be able to route messages
+    based on topics.
+  - amqp-ack-level: No end2end acknowledgement is required. Messages may persist in the
+    broker before being delivered to their final destinations. 
+  - ``ca-location``: If this is provided and a secure connection is used, the
+    specified CA will be used instead of the default CA to authenticate the
+    broker. 
+  - mechanism: may be provided together with user/password (default: ``PLAIN``).
+  - kafka-ack-level: No end2end acknowledgement is required. Messages may persist in the
+    broker before being delivered to their final destinations. 
+
 Notifications
 ~~~~~~~~~~~~~
 
index 8928e853e64d5f422707157dd091e92d686ecd8d..37453605260b8a9a3184e7d003cdf27c6617b775 100644 (file)
@@ -3850,3 +3850,18 @@ options:
   services:
   - rgw
   with_legacy: true
+- name: mandatory_topic_permissions
+  type: bool
+  level: basic
+  desc: Whether to validate user permissions to access notification topics.
+  long_desc: If true, all users (other then the owner of the topic) will need
+    to have a policy to access topics.
+    The topic policy can be set by owner via CreateTopic() or SetTopicAttribute().
+    Following permissions can be granted "sns:Publish", "sns:GetTopicAttributes",
+    "sns:SetTopicAttributes" and "sns:DeleteTopic" via Policy.
+    NOTE that even if set to "false" topics will still follow the policies if set on them.
+  default: false
+  services:
+  - rgw
+  with_legacy: true
+
index a2def87040efa33fff30399d35dd8f9cf0182781..d06c83813382338ff4b87dee0358d30d828c555f 100644 (file)
@@ -482,6 +482,7 @@ bool is_non_s3_op(RGWOpType op_type)
       op_type == RGW_OP_PUBSUB_TOPIC_CREATE ||
       op_type == RGW_OP_PUBSUB_TOPICS_LIST ||
       op_type == RGW_OP_PUBSUB_TOPIC_GET ||
+      op_type == RGW_OP_PUBSUB_TOPIC_SET ||
       op_type == RGW_OP_PUBSUB_TOPIC_DELETE ||
       op_type == RGW_OP_TAG_ROLE ||
       op_type == RGW_OP_LIST_ROLE_TAGS ||
index 35aeb15fcdc5aacc7b0e9180c4b774a171788339..0bfcccff638e5f0d9ba4c5db270b8c504c6462bb 100644 (file)
@@ -157,6 +157,10 @@ static const actpair actpairs[] =
  { "sts:AssumeRoleWithWebIdentity", stsAssumeRoleWithWebIdentity},
  { "sts:GetSessionToken", stsGetSessionToken},
  { "sts:TagSession", stsTagSession},
+ { "sns:GetTopicAttributes", snsGetTopicAttributes},
+ { "sns:DeleteTopic", snsDeleteTopic},
+ { "sns:Publish", snsPublish},
+ { "sns:SetTopicAttributes", snsSetTopicAttributes},
 };
 
 struct PolicyParser;
@@ -600,6 +604,12 @@ bool ParseState::do_string(CephContext* cct, const char* s, size_t l) {
         if ((t->notaction & stsAllValue) == stsAllValue) {
           t->notaction[stsAll] = 1;
         }
+        if ((t->action & snsAllValue) == snsAllValue) {
+          t->action[snsAll] = 1;
+        }
+        if ((t->notaction & snsAllValue) == snsAllValue) {
+          t->notaction[snsAll] = 1;
+        }
       }
     }
   } else if (w->id == TokenID::Resource || w->id == TokenID::NotResource) {
@@ -1452,6 +1462,18 @@ const char* action_bit_string(uint64_t action) {
 
   case stsTagSession:
     return "sts:TagSession";
+
+  case snsSetTopicAttributes:
+    return "sns:SetTopicAttributes";
+
+  case snsGetTopicAttributes:
+    return "sns:GetTopicAttributes";
+
+  case snsDeleteTopic:
+    return "sns:DeleteTopic";
+
+  case snsPublish:
+    return "sns:Publish";
   }
   return "s3Invalid";
 }
index c60872850a309878ca07e725013ed3e0e559ee74..e528d1515c7742ea4501bbf142aa8c17d0272649 100644 (file)
@@ -141,8 +141,14 @@ static constexpr std::uint64_t stsGetSessionToken = iamAll + 3;
 static constexpr std::uint64_t stsTagSession = iamAll + 4;
 static constexpr std::uint64_t stsAll = iamAll + 5;
 
+static constexpr std::uint64_t snsGetTopicAttributes = stsAll + 1;
+static constexpr std::uint64_t snsDeleteTopic = stsAll + 2;
+static constexpr std::uint64_t snsPublish = stsAll + 3;
+static constexpr std::uint64_t snsSetTopicAttributes = stsAll + 4;
+static constexpr std::uint64_t snsAll = stsAll + 5;
+
 static constexpr std::uint64_t s3Count = s3All;
-static constexpr std::uint64_t allCount = stsAll + 1;
+static constexpr std::uint64_t allCount = snsAll + 1;
 
 using Action_t = std::bitset<allCount>;
 using NotAction_t = Action_t;
@@ -164,6 +170,7 @@ static const Action_t None(0);
 static const Action_t s3AllValue = set_cont_bits<allCount>(0,s3All);
 static const Action_t iamAllValue = set_cont_bits<allCount>(s3All+1,iamAll);
 static const Action_t stsAllValue = set_cont_bits<allCount>(iamAll+1,stsAll);
+static const Action_t snsAllValue = set_cont_bits<allCount>(stsAll + 1, snsAll);
 static const Action_t allValue = set_cont_bits<allCount>(0,allCount);
 
 namespace {
index 375c7348b2482fa0758812be27ff8a9deabaa291..a7a68d4ade5ee21cb1c0499111be29b225c718a2 100644 (file)
@@ -104,6 +104,7 @@ enum RGWOpType {
   RGW_OP_PUBSUB_TOPIC_CREATE,
   RGW_OP_PUBSUB_TOPICS_LIST,
   RGW_OP_PUBSUB_TOPIC_GET,
+  RGW_OP_PUBSUB_TOPIC_SET,
   RGW_OP_PUBSUB_TOPIC_DELETE,
   RGW_OP_PUBSUB_SUB_CREATE,
   RGW_OP_PUBSUB_SUB_GET,
index 51c8b0281c68bccdb8283b7fe4eb487e0b014c9f..14d85abe3b62d9499c6094207f367bd4f1d4f2ab 100644 (file)
@@ -337,6 +337,7 @@ void rgw_pubsub_topic::dump(Formatter *f) const
   encode_json("dest", dest, f);
   encode_json("arn", arn, f);
   encode_json("opaqueData", opaque_data, f);
+  encode_json("policy", policy_text, f);
 }
 
 void rgw_pubsub_topic::dump_xml(Formatter *f) const
@@ -346,6 +347,7 @@ void rgw_pubsub_topic::dump_xml(Formatter *f) const
   encode_xml("EndPoint", dest, f);
   encode_xml("TopicArn", arn, f);
   encode_xml("OpaqueData", opaque_data, f);
+  encode_xml("Policy", policy_text, f);
 }
 
 void encode_xml_key_value_entry(const std::string& key, const std::string& value, Formatter *f) {
@@ -365,6 +367,7 @@ void rgw_pubsub_topic::dump_xml_as_attributes(Formatter *f) const
   encode_xml_key_value_entry("EndPoint", dest.to_json_str(), f);
   encode_xml_key_value_entry("TopicArn", arn, f);
   encode_xml_key_value_entry("OpaqueData", opaque_data, f);
+  encode_xml_key_value_entry("Policy", policy_text, f);
   f->close_section(); // Attributes
 }
 
@@ -695,7 +698,9 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
                             const std::string& name,
                             const rgw_pubsub_dest& dest, const std::string& arn,
                             const std::string& opaque_data,
-                            const rgw_user& user, optional_yield y) const {
+                            const rgw_user& user,
+                            const std::string& policy_text,
+                            optional_yield y) const {
   RGWObjVersionTracker objv_tracker;
   rgw_pubsub_topics topics;
 
@@ -712,6 +717,7 @@ int RGWPubSub::create_topic(const DoutPrefixProvider* dpp,
   new_topic.dest = dest;
   new_topic.arn = arn;
   new_topic.opaque_data = opaque_data;
+  new_topic.policy_text = policy_text;
 
   ret = write_topics(dpp, topics, &objv_tracker, y);
   if (ret < 0) {
index 3a87d777d6cbbafe81a6a6ca945fde1d5ad14e28..ddc72f99b076a062dd8599c2bddeb0b18d065f9d 100644 (file)
@@ -398,19 +398,21 @@ struct rgw_pubsub_topic {
   rgw_pubsub_dest dest;
   std::string arn;
   std::string opaque_data;
+  std::string policy_text;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(3, 1, bl);
+    ENCODE_START(4, 1, bl);
     encode(user, bl);
     encode(name, bl);
     encode(dest, bl);
     encode(arn, bl);
     encode(opaque_data, bl);
+    encode(policy_text, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(3, bl);
+    DECODE_START(4, bl);
     decode(user, bl);
     decode(name, bl);
     if (struct_v >= 2) {
@@ -420,6 +422,9 @@ struct rgw_pubsub_topic {
     if (struct_v >= 3) {
       decode(opaque_data, bl);
     }
+    if (struct_v >= 4) {
+      decode(policy_text, bl);
+    }
     DECODE_FINISH(bl);
   }
 
@@ -629,7 +634,7 @@ public:
   int create_topic(const DoutPrefixProvider* dpp, const std::string& name,
                    const rgw_pubsub_dest& dest, const std::string& arn,
                    const std::string& opaque_data, const rgw_user& user,
-                   optional_yield y) const;
+                   const std::string& policy_text, optional_yield y) const;
   // remove a topic according to its name
   // if the topic does not exists it is a no-op (considered success)
   // return 0 on success, error code otherwise
index 9bc7a2897632870a392980761c784f84f25079fd..da81711511c379bd699108289ea5607d7c6f005e 100644 (file)
@@ -67,6 +67,53 @@ bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
     return false;
 }
 
+std::optional<rgw::IAM::Policy> get_policy_from_text(req_state* const s,
+                                                     std::string& policy_text) {
+  const auto bl = bufferlist::static_from_string(policy_text);
+  try {
+    return rgw::IAM::Policy(
+        s->cct, s->owner.get_id().tenant, bl,
+        s->cct->_conf.get_val<bool>("rgw_policy_reject_invalid_principals"));
+  } catch (rgw::IAM::PolicyParseException& e) {
+    ldout(s->cct, 1) << "failed to parse policy:' " << policy_text
+                     << " ' with error: " << e.what() << dendl;
+    s->err.message = e.what();
+    return std::nullopt;
+  }
+}
+
+int verify_topic_owner_or_policy(req_state* const s,
+                                 const rgw_pubsub_topic& topic,
+                                 const std::string& zonegroup_name,
+                                 const uint64_t op) {
+  if (topic.user == s->owner.get_id()) {
+    return 0;
+  }
+  // no policy set.
+  if (topic.policy_text.empty()) {
+    // if mandatory_topic_permissions is true, then validate all users for
+    // permission.
+    if (s->cct->_conf->mandatory_topic_permissions) {
+      return -EACCES;
+    } else {
+      return 0;
+    }
+  }
+  // bufferlist::static_from_string wants non const string
+  std::string policy_text(topic.policy_text);
+  const auto p = get_policy_from_text(s, policy_text);
+  rgw::IAM::PolicyPrincipal princ_type = rgw::IAM::PolicyPrincipal::Other;
+  const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, zonegroup_name,
+                     s->user->get_tenant(), topic.name);
+  if (!p || p->eval(s->env, *s->auth.identity, op, arn, princ_type) !=
+                rgw::IAM::Effect::Allow) {
+    ldout(s->cct, 1) << "topic_policy failed validation, topic_policy: " << p
+                     << dendl;
+    return -EACCES;
+  }
+  return 0;
+}
+
 // command (AWS compliant): 
 // POST
 // Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
@@ -76,7 +123,8 @@ class RGWPSCreateTopicOp : public RGWOp {
   rgw_pubsub_dest dest;
   std::string topic_arn;
   std::string opaque_data;
-  
+  std::string policy_text;
+
   int get_params() {
     topic_name = s->info.args.get("Name");
     if (topic_name.empty()) {
@@ -84,17 +132,32 @@ class RGWPSCreateTopicOp : public RGWOp {
       return -EINVAL;
     }
 
+    // Remove the args that are parsed, so the push_endpoint_args only contains
+    // necessary one's.
     opaque_data = s->info.args.get("OpaqueData");
+    s->info.args.remove("OpaqueData");
 
     dest.push_endpoint = s->info.args.get("push-endpoint");
+    s->info.args.remove("push-endpoint");
     s->info.args.get_bool("persistent", &dest.persistent, false);
+    s->info.args.remove("persistent");
     s->info.args.get_int("time_to_live", reinterpret_cast<int *>(&dest.time_to_live), rgw::notify::DEFAULT_GLOBAL_VALUE);
+    s->info.args.remove("time_to_live");
     s->info.args.get_int("max_retries", reinterpret_cast<int *>(&dest.max_retries), rgw::notify::DEFAULT_GLOBAL_VALUE);
+    s->info.args.remove("max_retries");
     s->info.args.get_int("retry_sleep_duration", reinterpret_cast<int *>(&dest.retry_sleep_duration), rgw::notify::DEFAULT_GLOBAL_VALUE);
+    s->info.args.remove("retry_sleep_duration");
 
     if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
       return -EINVAL;
     }
+    // Store topic Policy.
+    policy_text = url_decode(s->info.args.get("Policy"), true);
+    if (!policy_text.empty() && !get_policy_from_text(s, policy_text)) {
+      return -ERR_MALFORMED_DOC;
+    }
+    s->info.args.remove("Policy");
+
     for (const auto& param : s->info.args.get_params()) {
       if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
         continue;
@@ -106,14 +169,7 @@ class RGWPSCreateTopicOp : public RGWOp {
       // remove last separator
       dest.push_endpoint_args.pop_back();
     }
-    if (!dest.push_endpoint.empty() && dest.persistent) {
-      const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
-      if (ret < 0) {
-        ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl;
-        return ret;
-      }
-    }
-    
+
     // dest object only stores endpoint info
     dest.arn_topic = topic_name;
     // the topic ARN will be sent in the reply
@@ -125,9 +181,32 @@ class RGWPSCreateTopicOp : public RGWOp {
   }
 
   public:
-  int verify_permission(optional_yield) override {
-    return 0;
-  }
+   int verify_permission(optional_yield y) override {
+    auto ret = get_params();
+    if (ret < 0) {
+      return ret;
+    }
+
+    const RGWPubSub ps(driver, s->owner.get_id().tenant);
+    rgw_pubsub_topic result;
+    ret = ps.get_topic(this, topic_name, result, y);
+    if (ret == -ENOENT) {
+      // topic not present
+      return 0;
+    }
+    if (ret == 0) {
+      if (result.user == s->owner.get_id() ||
+          !s->cct->_conf->mandatory_topic_permissions) {
+        return 0;
+      }
+      ldpp_dout(this, 1) << "failed to create topic '" << topic_name
+                         << "', topic already exist." << dendl;
+      return -EPERM;
+    }
+    ldpp_dout(this, 1) << "failed to read topic '" << topic_name
+                       << "', with error:" << ret << dendl;
+    return ret;
+   }
 
   void pre_exec() override {
     rgw_bucket_object_pre_exec(s);
@@ -163,14 +242,18 @@ class RGWPSCreateTopicOp : public RGWOp {
 };
 
 void RGWPSCreateTopicOp::execute(optional_yield y) {
-  op_ret = get_params();
-  if (op_ret < 0) {
-    return;
+  if (!dest.push_endpoint.empty() && dest.persistent) {
+    op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
+    if (op_ret < 0) {
+      ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for "
+                            "persistent topics. error:"
+                         << op_ret << dendl;
+      return;
+    }
   }
-
   const RGWPubSub ps(driver, s->owner.get_id().tenant);
   op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
-                           s->owner.get_id(), y);
+                           s->owner.get_id(), policy_text, y);
   if (op_ret < 0) {
     ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
     return;
@@ -236,6 +319,15 @@ void RGWPSListTopicsOp::execute(optional_yield y) {
     op_ret = -EPERM;
     return;
   }
+  for (auto it = result.topics.cbegin(); it != result.topics.cend();) {
+    if (verify_topic_owner_or_policy(
+            s, it->second, driver->get_zone()->get_zonegroup().get_name(),
+            rgw::IAM::snsGetTopicAttributes) != 0) {
+      result.topics.erase(it++);
+    } else {
+      ++it;
+    }
+  }
   ldpp_dout(this, 20) << "successfully got topics" << dendl;
 }
 
@@ -312,6 +404,14 @@ void RGWPSGetTopicOp::execute(optional_yield y) {
     op_ret = -EPERM;
     return;
   }
+  op_ret = verify_topic_owner_or_policy(
+      s, result, driver->get_zone()->get_zonegroup().get_name(),
+      rgw::IAM::snsGetTopicAttributes);
+  if (op_ret != 0) {
+    ldpp_dout(this, 1) << "failed to get topic '" << topic_name
+                       << "', topic owned by other user" << dendl;
+    return;
+  }
   ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
 }
 
@@ -388,9 +488,202 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
     op_ret = -EPERM;
     return;
   }
+  op_ret = verify_topic_owner_or_policy(
+      s, result, driver->get_zone()->get_zonegroup().get_name(),
+      rgw::IAM::snsGetTopicAttributes);
+  if (op_ret != 0) {
+    ldpp_dout(this, 1) << "failed to get topic '" << topic_name
+                       << "', topic owned by other user" << dendl;
+    return;
+  }
   ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
 }
 
+// command (AWS compliant):
+// POST
+// Action=SetTopicAttributes&TopicArn=<topic-arn>&AttributeName=<attribute-name>&AttributeValue=<attribute-value>
+class RGWPSSetTopicAttributesOp : public RGWOp {
+ private:
+  std::string topic_name;
+  std::string topic_arn;
+  std::string opaque_data;
+  std::string policy_text;
+  rgw_pubsub_dest dest;
+  rgw_user topic_owner;
+  std::string attribute_name;
+
+  int get_params() {
+    const auto arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
+
+    if (!arn || arn->resource.empty()) {
+      ldpp_dout(this, 1) << "SetTopicAttribute Action 'TopicArn' argument is "
+                            "missing or invalid"
+                         << dendl;
+      return -EINVAL;
+    }
+    topic_arn = arn->to_string();
+    topic_name = arn->resource;
+    attribute_name = s->info.args.get("AttributeName");
+    if (attribute_name.empty()) {
+      ldpp_dout(this, 1)
+          << "SetTopicAttribute Action 'AttributeName' argument is "
+             "missing or invalid"
+          << dendl;
+      return -EINVAL;
+    }
+    return 0;
+  }
+
+  int map_attributes(const rgw_pubsub_topic& topic) {
+    // update the default values that is stored in topic currently.
+    opaque_data = topic.opaque_data;
+    policy_text = topic.policy_text;
+    dest = topic.dest;
+
+    if (attribute_name == "OpaqueData") {
+      opaque_data = s->info.args.get("AttributeValue");
+    } else if (attribute_name == "persistent") {
+      s->info.args.get_bool("AttributeValue", &dest.persistent, false);
+    } else if (attribute_name == "time_to_live") {
+      s->info.args.get_int("AttributeValue",
+                           reinterpret_cast<int*>(&dest.time_to_live),
+                           rgw::notify::DEFAULT_GLOBAL_VALUE);
+    } else if (attribute_name == "max_retries") {
+      s->info.args.get_int("AttributeValue",
+                           reinterpret_cast<int*>(&dest.max_retries),
+                           rgw::notify::DEFAULT_GLOBAL_VALUE);
+    } else if (attribute_name == "retry_sleep_duration") {
+      s->info.args.get_int("AttributeValue",
+                           reinterpret_cast<int*>(&dest.retry_sleep_duration),
+                           rgw::notify::DEFAULT_GLOBAL_VALUE);
+    } else if (attribute_name == "push-endpoint") {
+      dest.push_endpoint = s->info.args.get("AttributeValue");
+      if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
+        return -EINVAL;
+      }
+    } else if (attribute_name == "Policy") {
+      policy_text = url_decode(s->info.args.get("AttributeValue"), true);
+      if (!policy_text.empty() && !get_policy_from_text(s, policy_text)) {
+        return -ERR_MALFORMED_DOC;
+      }
+    } else {
+      // replace the push_endpoint_args if passed in SetAttribute.
+      const auto replace_str = [&](const std::string& param,
+                                   const std::string& val) {
+        auto& push_endpoint_args = dest.push_endpoint_args;
+        const std::string replaced_str = param + "=" + val;
+        const auto pos = push_endpoint_args.find(param);
+        if (pos == std::string::npos) {
+          dest.push_endpoint_args.append("&" + replaced_str);
+          return;
+        }
+        auto end_pos = dest.push_endpoint_args.find("&", pos);
+        end_pos = end_pos == std::string::npos ? push_endpoint_args.length()
+                                               : end_pos;
+        push_endpoint_args.replace(pos, end_pos - pos, replaced_str);
+      };
+      const std::unordered_set<std::string> push_endpoint_args = {
+          "verify-ssl",    "use-ssl",         "ca-location", "amqp-ack-level",
+          "amqp-exchange", "kafka-ack-level", "mechanism",   "cloudevents"};
+      if (push_endpoint_args.count(attribute_name) == 1) {
+        replace_str(attribute_name, s->info.args.get("AttributeValue"));
+        return 0;
+      }
+      ldpp_dout(this, 1)
+          << "SetTopicAttribute Action 'AttributeName' argument is "
+             "invalid: 'AttributeName' = "
+          << attribute_name << dendl;
+      return -EINVAL;
+    }
+    return 0;
+  }
+
+ public:
+  int verify_permission(optional_yield y) override {
+    auto ret = get_params();
+    if (ret < 0) {
+      return ret;
+    }
+    rgw_pubsub_topic result;
+    const RGWPubSub ps(driver, s->owner.get_id().tenant);
+    ret = ps.get_topic(this, topic_name, result, y);
+    if (ret < 0) {
+      ldpp_dout(this, 1) << "failed to get topic '" << topic_name
+                         << "', ret=" << ret << dendl;
+      return ret;
+    }
+    topic_owner = result.user;
+    ret = verify_topic_owner_or_policy(
+        s, result, driver->get_zone()->get_zonegroup().get_name(),
+        rgw::IAM::snsSetTopicAttributes);
+    if (ret != 0) {
+      ldpp_dout(this, 1) << "failed to set attributes for topic '" << topic_name
+                         << "', topic owned by other user" << dendl;
+      return ret;
+    }
+
+    return map_attributes(result);
+  }
+
+  void pre_exec() override { rgw_bucket_object_pre_exec(s); }
+  void execute(optional_yield) override;
+
+  const char* name() const override { return "pubsub_topic_set"; }
+  RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_SET; }
+  uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
+  void send_response() override {
+    if (op_ret) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+    end_header(s, this, "application/xml");
+
+    if (op_ret < 0) {
+      return;
+    }
+
+    const auto f = s->formatter;
+    f->open_object_section_in_ns("SetTopicAttributesResponse", AWS_SNS_NS);
+    f->open_object_section("ResponseMetadata");
+    encode_xml("RequestId", s->req_id, f);
+    f->close_section();  // ResponseMetadata
+    f->close_section();  // SetTopicAttributesResponse
+    rgw_flush_formatter_and_reset(s, f);
+  }
+};
+
+void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
+  if (!dest.push_endpoint.empty() && dest.persistent) {
+    op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
+    if (op_ret < 0) {
+      ldpp_dout(this, 1)
+          << "SetTopicAttributes Action failed to create queue for "
+             "persistent topics. error:"
+          << op_ret << dendl;
+      return;
+    }
+  } else {  // changing the persistent topic to non-persistent.
+    op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
+    if (op_ret != -ENOENT && op_ret < 0) {
+      ldpp_dout(this, 1) << "SetTopicAttributes Action failed to remove queue "
+                            "for persistent topics. error:"
+                         << op_ret << dendl;
+      return;
+    }
+  }
+  const RGWPubSub ps(driver, s->owner.get_id().tenant);
+  op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data,
+                           topic_owner, policy_text, y);
+  if (op_ret < 0) {
+    ldpp_dout(this, 1) << "failed to SetAttributes for topic '" << topic_name
+                       << "', ret=" << op_ret << dendl;
+    return;
+  }
+  ldpp_dout(this, 20) << "successfully set the attributes for topic '"
+                      << topic_name << "'" << dendl;
+}
+
 // command (AWS compliant): 
 // POST
 // Action=DeleteTopic&TopicArn=<topic-arn>
@@ -407,19 +700,6 @@ class RGWPSDeleteTopicOp : public RGWOp {
     }
 
     topic_name = topic_arn->resource;
-
-    // upon deletion it is not known if topic is persistent or not
-    // will try to delete the persistent topic anyway
-    const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
-    if (ret == -ENOENT) {
-      // topic was not persistent, or already deleted
-      return 0;
-    }
-    if (ret < 0) {
-      ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl;
-      return ret;
-    }
-
     return 0;
   }
 
@@ -463,6 +743,35 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) {
     return;
   }
   const RGWPubSub ps(driver, s->owner.get_id().tenant);
+  rgw_pubsub_topic result;
+  op_ret = ps.get_topic(this, topic_name, result, y);
+  if (op_ret == 0) {
+    op_ret = verify_topic_owner_or_policy(
+        s, result, driver->get_zone()->get_zonegroup().get_name(),
+        rgw::IAM::snsDeleteTopic);
+    if (op_ret != 0) {
+      ldpp_dout(this, 1) << "failed to remove topic '" << topic_name
+                         << "' topic owned by other user" << dendl;
+      return;
+    }
+  } else {
+    ldpp_dout(this, 1) << "failed to fetch topic '" << topic_name
+                       << "' with error: " << op_ret << dendl;
+    if (op_ret == -ENOENT) {
+      // its not an error if no topics exist, just a no-op
+      op_ret = 0;
+    }
+    return;
+  }
+  // upon deletion it is not known if topic is persistent or not
+  // will try to delete the persistent topic anyway
+  op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
+  if (op_ret != -ENOENT && op_ret < 0) {
+    ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for "
+                          "persistent topics. error:"
+                       << op_ret << dendl;
+    return;
+  }
   op_ret = ps.remove_topic(this, topic_name, y);
   if (op_ret < 0) {
     ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
@@ -473,12 +782,14 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) {
 
 using op_generator = RGWOp*(*)();
 static const std::unordered_map<std::string, op_generator> op_generators = {
-  {"CreateTopic", []() -> RGWOp* {return new RGWPSCreateTopicOp;}},
-  {"DeleteTopic", []() -> RGWOp* {return new RGWPSDeleteTopicOp;}},
-  {"ListTopics", []() -> RGWOp* {return new RGWPSListTopicsOp;}},
-  {"GetTopic", []() -> RGWOp* {return new RGWPSGetTopicOp;}},
-  {"GetTopicAttributes", []() -> RGWOp* {return new RGWPSGetTopicAttributesOp;}}
-};
+    {"CreateTopic", []() -> RGWOp* { return new RGWPSCreateTopicOp; }},
+    {"DeleteTopic", []() -> RGWOp* { return new RGWPSDeleteTopicOp; }},
+    {"ListTopics", []() -> RGWOp* { return new RGWPSListTopicsOp; }},
+    {"GetTopic", []() -> RGWOp* { return new RGWPSGetTopicOp; }},
+    {"GetTopicAttributes",
+     []() -> RGWOp* { return new RGWPSGetTopicAttributesOp; }},
+    {"SetTopicAttributes",
+     []() -> RGWOp* { return new RGWPSSetTopicAttributesOp; }}};
 
 bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s) 
 {
@@ -710,6 +1021,15 @@ void RGWPSCreateNotifOp::execute(optional_yield y) {
       ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
       return;
     }
+    op_ret = verify_topic_owner_or_policy(
+        s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
+        rgw::IAM::snsPublish);
+    if (op_ret != 0) {
+      ldpp_dout(this, 1) << "failed to create notification for topic '"
+                         << topic_name << "' topic owned by other user"
+                         << dendl;
+      return;
+    }
     // make sure that full topic configuration match
     // TODO: use ARN match function
     
@@ -722,7 +1042,7 @@ void RGWPSCreateNotifOp::execute(optional_yield y) {
     // ARN is cached to make the "GET" method faster
     op_ret = ps.create_topic(this, unique_topic_name, topic_info.dest,
                              topic_info.arn, topic_info.opaque_data,
-                             s->owner.get_id(), y);
+                             s->owner.get_id(), topic_info.policy_text, y);
     if (op_ret < 0) {
       ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name << 
         "', ret=" << op_ret << dendl;
index fe38576fb3512e7ef1ead5ff4669debcf844a368..30fce41a1bb017616b4b47c3c1555ef12a3fb36c 100644 (file)
@@ -78,7 +78,7 @@ class PSTopicS3:
     POST ?Action=GetTopic&TopicArn=<topic-arn>
     POST ?Action=DeleteTopic&TopicArn=<topic-arn>
     """
-    def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
+    def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None, policy_text=None):
         self.conn = conn
         self.topic_name = topic_name.strip()
         assert self.topic_name
@@ -88,6 +88,8 @@ class PSTopicS3:
             self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
         if opaque_data is not None:
             self.attributes['OpaqueData'] = opaque_data
+        if policy_text is not None:
+            self.attributes['Policy'] = policy_text
         protocol = 'https' if conn.is_secure else 'http'
         self.client = boto3.client('sns',
                            endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
@@ -96,9 +98,9 @@ class PSTopicS3:
                            region_name=region,
                            verify='./cert.pem')
 
-    def get_config(self):
+    def get_config(self, topic_arn=None):
         """get topic info"""
-        parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
+        parameters = {'Action': 'GetTopic', 'TopicArn': (topic_arn if topic_arn is not None else self.topic_arn)}
         body = urlparse.urlencode(parameters)
         string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
         content_type = 'application/x-www-form-urlencoded; charset=utf-8'
@@ -131,6 +133,13 @@ class PSTopicS3:
         result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
         self.topic_arn = result['TopicArn']
         return self.topic_arn
+    
+    def set_attributes(self, attribute_name, attribute_val, topic_arn=None):
+        """set topic attributes."""
+        result = self.client.set_topic_attributes(TopicArn=(
+            topic_arn if topic_arn is not None else self.topic_arn), AttributeName=attribute_name, AttributeValue=attribute_val)
+        return result['ResponseMetadata']['HTTPStatusCode']
+
 
     def del_config(self, topic_arn=None):
         """delete topic"""
index ffb8e488bbff03aca99e2b5a0d0865df1159eb75..f5017885493a39fb1c4ec6cc73ab32eb0bab0f64 100644 (file)
@@ -4283,6 +4283,76 @@ def test_ps_s3_multiple_topics_notification():
     conn.delete_bucket(bucket_name)
     http_server.close()
 
+@attr('basic_test')
+def test_ps_s3_topic_permissions():
+    """ test s3 topic set/get/delete permissions """
+    conn1 = connection()
+    conn2 = another_user()
+    zonegroup = 'default'
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name + TOPIC_SUFFIX
+    topic_policy = json.dumps({
+        "Version": "2012-10-17",
+        "Statement": [
+            {
+                "Sid": "Statement",
+                "Effect": "Deny",
+                "Principal": "*",
+                "Action": ["sns:Publish", "sns:SetTopicAttributes", "sns:GetTopicAttributes"],
+                "Resource": f"arn:aws:sns:{zonegroup}::{topic_name}"
+            }
+        ]
+    })
+    # create s3 topic with DENY policy
+    endpoint_address = 'amqp://127.0.0.1:7001'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+    topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy)
+    topic_arn = topic_conf.set_config()
+
+    # 2nd user tries to fetch the topic
+    topic_conf2 = PSTopicS3(conn2, topic_name, zonegroup, endpoint_args=endpoint_args)
+    _, status = topic_conf2.get_config(topic_arn=topic_arn)
+    assert_equal(status, 403)
+    try:
+        # 2nd user tries to set the attribute
+        status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn)
+        assert False, "'AccessDenied' error is expected"
+    except Exception as err:
+        print(err)
+
+    # create bucket for conn2 and try publishing notification to topic
+    _ = conn2.create_bucket(bucket_name)
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                         'Events': []
+                       }]
+    try:
+        s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
+        _, status = s3_notification_conf2.set_config()
+        assert False, "'AccessDenied' error is expected"
+    except ClientError as error:
+        assert_equal(error.response['Error']['Code'], 'AccessDenied')
+
+    # Topic policy is now added by the 1st user to allow 2nd user.
+    topic_policy  = topic_policy.replace("Deny", "Allow")
+    topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy)
+    topic_arn = topic_conf.set_config()
+    # 2nd user try to fetch topic again
+    _, status = topic_conf2.get_config(topic_arn=topic_arn)
+    assert_equal(status, 200)
+    # 2nd user tries to set the attribute again
+    status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn)
+    assert_equal(status, 200)
+    # 2nd user tries to publish notification again
+    s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf2.set_config()
+    assert_equal(status, 200)
+
+    # cleanup
+    s3_notification_conf2.del_config()
+    topic_conf.del_config()
+    # delete the bucket
+    conn2.delete_bucket(bucket_name)
 
 def kafka_security(security_type, mechanism='PLAIN'):
     """ test pushing kafka s3 notification securly to master """
index f4c3c6aff6f1d2ed59f35f42115f080a28f12423..c630646c9128822e0590c46ad9515184cd22b433 100644 (file)
@@ -94,6 +94,7 @@ using rgw::IAM::iamCreateRole;
 using rgw::IAM::iamDeleteRole;
 using rgw::IAM::iamAll;
 using rgw::IAM::stsAll;
+using rgw::IAM::snsAll;
 using rgw::IAM::allCount;
 
 class FakeIdentity : public Identity {
@@ -642,7 +643,7 @@ TEST_F(PolicyTest, Parse6) {
   EXPECT_TRUE(p->statements[0].noprinc.empty());
   EXPECT_EQ(p->statements[0].effect, Effect::Allow);
   Action_t act;
-  for (auto i = 0U; i <= stsAll; i++)
+  for (auto i = 0U; i <= snsAll; i++)
     act[i] = 1;
   EXPECT_EQ(p->statements[0].action, act);
   EXPECT_EQ(p->statements[0].notaction, None);