]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: modify topic owner check when creating 55275/head
authorZhipeng Li <qiuxinyidian@gmail.com>
Tue, 23 Jan 2024 06:50:52 +0000 (14:50 +0800)
committerYuval Lifshitz <ylifshit@redhat.com>
Tue, 6 Feb 2024 07:41:55 +0000 (07:41 +0000)
add tests to cover topic policies
as well as behavior when no policies are defined

Fixes: https://tracker.ceph.com/issues/64124
Signed-off-by: Zhipeng Li <qiuxinyidian@gmail.com>
PendingReleaseNotes
src/common/options/rgw.yaml.in
src/rgw/rgw_iam_policy.cc
src/rgw/rgw_iam_policy.h
src/rgw/rgw_rest_pubsub.cc
src/test/rgw/bucket_notification/test_bn.py

index dfc6294b36a2c86e53d0234023ec7cdc109a81c8..0ecec83ec81f9e81fe7feedcf5abf2202bc1dc9a 100644 (file)
@@ -121,6 +121,15 @@ CephFS: Disallow delegating preallocated inode ranges to clients. Config
   trim the log every second (`mds_log_trim_upkeep_interval` config). Also,
   a couple of configs govern how much time the MDS spends in trimming its
   logs. These configs are `mds_log_trim_threshold` and `mds_log_trim_decay_rate`.
+* RGW: Notification topics are now owned by the user that created them. 
+  By default, only the owner can read/write their topics. Topic policy documents
+  are now supported to grant these permissions to other users. Preexisting topics
+  are treated as if they have no owner, and any user can read/write them using the SNS API. 
+  If such a topic is recreated with CreateTopic, the issuing user becomes the new owner.
+  For backward compatibility, all users still have permission to publish bucket 
+  notifications to topics owned by other users. A new configuration parameter:
+  ``rgw_topic_require_publish_policy`` can be enabled to deny ``sns:Publish``
+  permissions unless explicitly granted by topic policy.
 
 >=18.0.0
 
index f4ef0079dd6bee43b37072b2b493edef8c58aa97..6fab43e5589ff99335f65b3e51c008735bcb70d1 100644 (file)
@@ -3917,15 +3917,15 @@ options:
   services:
   - rgw
   with_legacy: true
-- name: mandatory_topic_permissions
+- name: rgw_topic_require_publish_policy
   type: bool
   level: basic
-  desc: Whether to validate user permissions to access notification topics.
+  desc: Whether to validate user permissions to publish notifications to topics.
   long_desc: If true, all users (other then the owner of the topic) will need
-    to have a policy to access topics.
+    to have a policy to publish notifications to topics.
     The topic policy can be set by owner via CreateTopic() or SetTopicAttribute().
     Following permissions can be granted "sns:Publish", "sns:GetTopicAttributes",
-    "sns:SetTopicAttributes" and "sns:DeleteTopic" via Policy.
+    "sns:SetTopicAttributes", "sns:DeleteTopic" and "sns:CreateTopic" via Policy.
     NOTE that even if set to "false" topics will still follow the policies if set on them.
   default: false
   services:
index 76b24034d6149bbb8f031a480f1f4f23ff692d80..813b78f161e11651c52c4cb78257a2bf345066bd 100644 (file)
@@ -161,6 +161,7 @@ static const actpair actpairs[] =
  { "sns:DeleteTopic", snsDeleteTopic},
  { "sns:Publish", snsPublish},
  { "sns:SetTopicAttributes", snsSetTopicAttributes},
+ { "sns:CreateTopic", snsCreateTopic},
 };
 
 struct PolicyParser;
@@ -1476,6 +1477,9 @@ const char* action_bit_string(uint64_t action) {
 
   case snsPublish:
     return "sns:Publish";
+
+  case snsCreateTopic:
+    return "sns:CreateTopic";
   }
   return "s3Invalid";
 }
index e528d1515c7742ea4501bbf142aa8c17d0272649..5d6f334c176ebb0dbd4e78242142d8a8199f91d3 100644 (file)
@@ -145,7 +145,8 @@ static constexpr std::uint64_t snsGetTopicAttributes = stsAll + 1;
 static constexpr std::uint64_t snsDeleteTopic = stsAll + 2;
 static constexpr std::uint64_t snsPublish = stsAll + 3;
 static constexpr std::uint64_t snsSetTopicAttributes = stsAll + 4;
-static constexpr std::uint64_t snsAll = stsAll + 5;
+static constexpr std::uint64_t snsCreateTopic = stsAll + 5;
+static constexpr std::uint64_t snsAll = stsAll + 6;
 
 static constexpr std::uint64_t s3Count = s3All;
 static constexpr std::uint64_t allCount = snsAll + 1;
index 611589d721f37eb57349f8578cc95d1047652fbc..191f535d82bd48c31792fd0b650ff7b196a77be4 100644 (file)
@@ -4,6 +4,7 @@
 #include <algorithm>
 #include <boost/tokenizer.hpp>
 #include <optional>
+#include "rgw_iam_policy.h"
 #include "rgw_rest_pubsub.h"
 #include "rgw_pubsub_push.h"
 #include "rgw_pubsub.h"
@@ -75,8 +76,8 @@ std::optional<rgw::IAM::Policy> get_policy_from_text(req_state* const s,
         s->cct, s->owner.id.tenant, bl,
         s->cct->_conf.get_val<bool>("rgw_policy_reject_invalid_principals"));
   } catch (rgw::IAM::PolicyParseException& e) {
-    ldout(s->cct, 1) << "failed to parse policy:" << policy_text
-                     << " ' with error: " << e.what() << dendl;
+    ldout(s->cct, 1) << "failed to parse policy: '" << policy_text
+                     << "' with error: " << e.what() << dendl;
     s->err.message = e.what();
     return std::nullopt;
   }
@@ -91,13 +92,18 @@ int verify_topic_owner_or_policy(req_state* const s,
   }
   // no policy set.
   if (topic.policy_text.empty()) {
-    // if mandatory_topic_permissions is true, then validate all users for
-    // permission.
-    if (s->cct->_conf->mandatory_topic_permissions) {
-      return -EACCES;
-    } else {
+    // if rgw_topic_require_publish_policy is "false" dont validate "publish" policies
+    if (op == rgw::IAM::snsPublish && !s->cct->_conf->rgw_topic_require_publish_policy) {
+      return 0;
+    }
+    if (topic.user.empty()) {
+      // if we don't know the original user and there is no policy
+      // we will not reject the request.
+      // this is for compatibility with versions that did not store the user in the topic
       return 0;
     }
+    s->err.message = "Topic was created by another user.";
+    return -EACCES;
   }
   // bufferlist::static_from_string wants non const string
   std::string policy_text(topic.policy_text);
@@ -107,7 +113,7 @@ int verify_topic_owner_or_policy(req_state* const s,
                      s->user->get_tenant(), topic.name);
   if (!p || p->eval(s->env, *s->auth.identity, op, arn, princ_type) !=
                 rgw::IAM::Effect::Allow) {
-    ldout(s->cct, 1) << "topic_policy failed validation, topic_policy: " << p
+    ldout(s->cct, 1) << "topic policy failed validation, topic policy: " << p
                      << dendl;
     return -EACCES;
   }
@@ -195,13 +201,17 @@ class RGWPSCreateTopicOp : public RGWOp {
       return 0;
     }
     if (ret == 0) {
-      if (result.user == s->owner.id ||
-          !s->cct->_conf->mandatory_topic_permissions) {
+      ret = verify_topic_owner_or_policy(
+          s, result, driver->get_zone()->get_zonegroup().get_name(),
+          rgw::IAM::snsCreateTopic);
+      if (ret == 0)
+      {
         return 0;
       }
-      ldpp_dout(this, 1) << "failed to create topic '" << topic_name
+
+      ldpp_dout(this, 1) << "no permission to modify topic '" << topic_name
                          << "', topic already exist." << dendl;
-      return -EPERM;
+      return -EACCES;
     }
     ldpp_dout(this, 1) << "failed to read topic '" << topic_name
                        << "', with error:" << ret << dendl;
@@ -408,8 +418,8 @@ void RGWPSGetTopicOp::execute(optional_yield y) {
       s, result, driver->get_zone()->get_zonegroup().get_name(),
       rgw::IAM::snsGetTopicAttributes);
   if (op_ret != 0) {
-    ldpp_dout(this, 1) << "failed to get topic '" << topic_name
-                       << "', topic owned by other user" << dendl;
+    ldpp_dout(this, 1) << "no permission to get topic '" << topic_name
+                       << "'" << dendl;
     return;
   }
   ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
@@ -492,8 +502,8 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
       s, result, driver->get_zone()->get_zonegroup().get_name(),
       rgw::IAM::snsGetTopicAttributes);
   if (op_ret != 0) {
-    ldpp_dout(this, 1) << "failed to get topic '" << topic_name
-                       << "', topic owned by other user" << dendl;
+    ldpp_dout(this, 1) << "no permission to get topic '" << topic_name
+                       << "'" << dendl;
     return;
   }
   ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
@@ -617,8 +627,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
         s, result, driver->get_zone()->get_zonegroup().get_name(),
         rgw::IAM::snsSetTopicAttributes);
     if (ret != 0) {
-      ldpp_dout(this, 1) << "failed to set attributes for topic '" << topic_name
-                         << "', topic owned by other user" << dendl;
+      ldpp_dout(this, 1) << "no permission to set attributes for topic '" << topic_name
+                         << "'" << dendl;
       return ret;
     }
 
@@ -750,8 +760,8 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) {
         s, result, driver->get_zone()->get_zonegroup().get_name(),
         rgw::IAM::snsDeleteTopic);
     if (op_ret != 0) {
-      ldpp_dout(this, 1) << "failed to remove topic '" << topic_name
-                         << "' topic owned by other user" << dendl;
+      ldpp_dout(this, 1) << "no permission to remove topic '" << topic_name
+                         << "'" << dendl;
       return;
     }
   } else {
@@ -1025,9 +1035,8 @@ void RGWPSCreateNotifOp::execute(optional_yield y) {
         s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
         rgw::IAM::snsPublish);
     if (op_ret != 0) {
-      ldpp_dout(this, 1) << "failed to create notification for topic '"
-                         << topic_name << "' topic owned by other user"
-                         << dendl;
+      ldpp_dout(this, 1) << "no permission to create notification for topic '"
+                         << topic_name << "'" << dendl;
       return;
     }
     // make sure that full topic configuration match
index 6e9248a3669b56c95635955521690f58c68545e3..30cbfdfe7865b06140c979b574411011a3f0a871 100644 (file)
@@ -4395,6 +4395,7 @@ def test_ps_s3_multiple_topics_notification():
     conn.delete_bucket(bucket_name)
     http_server.close()
 
+
 @attr('basic_test')
 def test_ps_s3_topic_permissions():
     """ test s3 topic set/get/delete permissions """
@@ -4410,7 +4411,7 @@ def test_ps_s3_topic_permissions():
                 "Sid": "Statement",
                 "Effect": "Deny",
                 "Principal": "*",
-                "Action": ["sns:Publish", "sns:SetTopicAttributes", "sns:GetTopicAttributes"],
+                "Action": ["sns:Publish", "sns:SetTopicAttributes", "sns:GetTopicAttributes", "sns:DeleteTopic", "sns:CreateTopic"],
                 "Resource": f"arn:aws:sns:{zonegroup}::{topic_name}"
             }
         ]
@@ -4421,10 +4422,23 @@ def test_ps_s3_topic_permissions():
     topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy)
     topic_arn = topic_conf.set_config()
 
-    # 2nd user tries to fetch the topic
     topic_conf2 = PSTopicS3(conn2, topic_name, zonegroup, endpoint_args=endpoint_args)
+    try:
+        # 2nd user tries to override the topic
+        topic_arn = topic_conf2.set_config()
+        assert False, "'AccessDenied' error is expected"
+    except ClientError as err:
+        if 'Error' in err.response:
+            assert_equal(err.response['Error']['Code'], 'AccessDenied')
+        else:
+            assert_equal(err.response['Code'], 'AccessDenied')
+    except Exception as err:
+        print('unexpected error type: '+type(err).__name__)
+
+    # 2nd user tries to fetch the topic
     _, status = topic_conf2.get_config(topic_arn=topic_arn)
     assert_equal(status, 403)
+
     try:
         # 2nd user tries to set the attribute
         status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn)
@@ -4455,6 +4469,18 @@ def test_ps_s3_topic_permissions():
     except Exception as err:
         print('unexpected error type: '+type(err).__name__)
 
+    try:
+        # 2nd user tries to delete the topic
+        status = topic_conf2.del_config(topic_arn=topic_arn)
+        assert False, "'AccessDenied' error is expected"
+    except ClientError as err:
+        if 'Error' in err.response:
+            assert_equal(err.response['Error']['Code'], 'AccessDenied')
+        else:
+            assert_equal(err.response['Code'], 'AccessDenied')
+    except Exception as err:
+        print('unexpected error type: '+type(err).__name__)
+
     # Topic policy is now added by the 1st user to allow 2nd user.
     topic_policy  = topic_policy.replace("Deny", "Allow")
     topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy)
@@ -4469,6 +4495,82 @@ def test_ps_s3_topic_permissions():
     s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
     _, status = s3_notification_conf2.set_config()
     assert_equal(status, 200)
+    # 2nd user tries to delete the topic again
+    status = topic_conf2.del_config(topic_arn=topic_arn)
+    assert_equal(status, 200)
+
+    # cleanup
+    s3_notification_conf2.del_config()
+    # delete the bucket
+    conn2.delete_bucket(bucket_name)
+
+
+@attr('basic_test')
+def test_ps_s3_topic_no_permissions():
+    """ test s3 topic set/get/delete permissions """
+    conn1 = connection()
+    conn2 = another_user()
+    zonegroup = 'default'
+    bucket_name = gen_bucket_name()
+    topic_name = bucket_name + TOPIC_SUFFIX
+    
+    # create s3 topic without policy
+    endpoint_address = 'amqp://127.0.0.1:7001'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
+    topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args)
+    topic_arn = topic_conf.set_config()
+
+    topic_conf2 = PSTopicS3(conn2, topic_name, zonegroup, endpoint_args=endpoint_args)
+    try:
+        # 2nd user tries to override the topic
+        topic_arn = topic_conf2.set_config()
+        assert False, "'AccessDenied' error is expected"
+    except ClientError as err:
+        if 'Error' in err.response:
+            assert_equal(err.response['Error']['Code'], 'AccessDenied')
+        else:
+            assert_equal(err.response['Code'], 'AccessDenied')
+    except Exception as err:
+        print('unexpected error type: '+type(err).__name__)
+
+    # 2nd user tries to fetch the topic
+    _, status = topic_conf2.get_config(topic_arn=topic_arn)
+    assert_equal(status, 403)
+
+    try:
+        # 2nd user tries to set the attribute
+        status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn)
+        assert False, "'AccessDenied' error is expected"
+    except ClientError as err:
+        if 'Error' in err.response:
+            assert_equal(err.response['Error']['Code'], 'AccessDenied')
+        else:
+            assert_equal(err.response['Code'], 'AccessDenied')
+    except Exception as err:
+        print('unexpected error type: '+type(err).__name__)
+
+    # create bucket for conn2 publish notification to topic
+    # should be allowed based on the default value of rgw_topic_require_publish_policy=false
+    _ = conn2.create_bucket(bucket_name)
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
+                         'Events': []
+                       }]
+    s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
+    _, status = s3_notification_conf2.set_config()
+    assert_equal(status, 200)
+    
+    try:
+        # 2nd user tries to delete the topic
+        status = topic_conf2.del_config(topic_arn=topic_arn)
+        assert False, "'AccessDenied' error is expected"
+    except ClientError as err:
+        if 'Error' in err.response:
+            assert_equal(err.response['Error']['Code'], 'AccessDenied')
+        else:
+            assert_equal(err.response['Code'], 'AccessDenied')
+    except Exception as err:
+        print('unexpected error type: '+type(err).__name__)
 
     # cleanup
     s3_notification_conf2.del_config()
@@ -4476,6 +4578,7 @@ def test_ps_s3_topic_permissions():
     # delete the bucket
     conn2.delete_bucket(bucket_name)
 
+
 def kafka_security(security_type, mechanism='PLAIN'):
     """ test pushing kafka s3 notification securly to master """
     conn = connection()