]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/s3-notifications: added user-name/password topic attributes for use with SASL...
authorigomon <igomon@bloomberg.net>
Tue, 16 Apr 2024 18:56:01 +0000 (14:56 -0400)
committerigomon <igomon@bloomberg.net>
Tue, 16 Apr 2024 20:05:40 +0000 (16:05 -0400)
Signed-off-by: Igor Gomon <igomon@bloomberg.net>
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/rgw_kafka.cc
src/rgw/rgw_kafka.h
src/rgw/rgw_rest_pubsub.cc
src/test/rgw/bucket_notification/test_bn.py

index 52bee2a16d312fb4629a5793ce287183fa04ab95..3a1fed164381bd47562b09a6c10b52c1e2f17d7d 100644 (file)
@@ -303,8 +303,13 @@ public:
         cct(_cct),
         topic(_topic),
         ack_level(get_ack_level(args)) {
-    if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), 
-          args.get_optional("ca-location"), args.get_optional("mechanism"))) {
+    if (!kafka::connect(conn_name, _endpoint,
+                        get_bool(args, "use-ssl", false),
+                        get_bool(args, "verify-ssl", true),
+                        args.get_optional("ca-location"),
+                        args.get_optional("mechanism"),
+                        args.get_optional("user-name"),
+                        args.get_optional("password"))) {
       throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
     }
   }
index 8669d431d35987af0277e4458a387f0b46d6f3b0..ce496e414a7d543bc230316ff58afee23e6dd4e5 100644 (file)
@@ -554,7 +554,9 @@ public:
           bool use_ssl,
           bool verify_ssl,
           boost::optional<const std::string&> ca_location,
-          boost::optional<const std::string&> mechanism) {
+          boost::optional<const std::string&> mechanism,
+          boost::optional<const std::string&> topic_user_name,
+          boost::optional<const std::string&> topic_password) {
     if (stopped) {
       ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
       return false;
@@ -568,6 +570,21 @@ public:
       return false;
     }
 
+    // check if username/password was already supplied via topic attributes
+    // and if also provided as part of the endpoint URL issue a warning
+    if (topic_user_name.has_value()) {
+      if (!user.empty()) {
+        ldout(cct, 5) << "Kafka connect: username provided via both topic attributes and endpoint URL: using topic attributes" << dendl;
+      }
+      user = topic_user_name.get();
+    }
+    if (topic_password.has_value()) {
+      if (!password.empty()) {
+        ldout(cct, 5) << "Kafka connect: password provided via both topic attributes and endpoint URL: using topic attributes" << dendl;
+      }
+      password = topic_password.get();
+    }
+
     // this should be validated by the regex in parse_url()
     ceph_assert(user.empty() == password.empty());
 
@@ -694,9 +711,11 @@ void shutdown() {
 
 bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
         boost::optional<const std::string&> ca_location,
-        boost::optional<const std::string&> mechanism) {
+        boost::optional<const std::string&> mechanism,
+        boost::optional<const std::string&> user_name,
+        boost::optional<const std::string&> password) {
   if (!s_manager) return false;
-  return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism);
+  return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
 }
 
 int publish(const std::string& conn_name,
index 813fda32969b9fac57a55dee26a691d1b6efdbff..a6a38ed81ab8e1bf3821473afe28ac612c9fb764 100644 (file)
@@ -22,7 +22,14 @@ bool init(CephContext* cct);
 void shutdown();
 
 // connect to a kafka endpoint
-bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism);
+bool connect(std::string& broker,
+  const std::string& url,
+  bool use_ssl,
+  bool verify_ssl,
+  boost::optional<const std::string&> ca_location,
+  boost::optional<const std::string&> mechanism,
+  boost::optional<const std::string&> user_name,
+  boost::optional<const std::string&> password);
 
 // publish a message over a connection that was already created
 int publish(const std::string& conn_name,
index bf72baac13ea13413aef030727afe0f86d299436..894c5a0a21f40233599eb891f41b01894971329f 100644 (file)
@@ -755,7 +755,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
       };
       static constexpr std::initializer_list<const char*> args = {
           "verify-ssl",    "use-ssl",         "ca-location", "amqp-ack-level",
-          "amqp-exchange", "kafka-ack-level", "mechanism",   "cloudevents"};
+          "amqp-exchange", "kafka-ack-level", "mechanism",   "cloudevents",
+          "user-name",     "password"};
       if (std::find(args.begin(), args.end(), attribute_name) != args.end()) {
         replace_str(attribute_name, s->info.args.get("AttributeValue"));
         return 0;
index fda0cf29173d2683031ab99158dd834fe6083795..4d4b5c85e9413b618f6b2544fbab43cea3410af2 100644 (file)
@@ -4526,7 +4526,7 @@ def test_ps_s3_topic_no_permissions():
     conn2.delete_bucket(bucket_name)
 
 
-def kafka_security(security_type, mechanism='PLAIN'):
+def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False):
     """ test pushing kafka s3 notification securly to master """
     conn = connection()
     zonegroup = get_config_zonegroup()
@@ -4537,7 +4537,10 @@ def kafka_security(security_type, mechanism='PLAIN'):
     topic_name = bucket_name+'_topic'
     # create s3 topic
     if security_type == 'SASL_SSL':
-        endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+        if not use_topic_attrs_for_creds:
+            endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+        else:
+            endpoint_address = 'kafka://' + kafka_server + ':9094'
     elif security_type == 'SSL':
         endpoint_address = 'kafka://' + kafka_server + ':9093'
     elif security_type == 'SASL_PLAINTEXT':
@@ -4550,6 +4553,8 @@ def kafka_security(security_type, mechanism='PLAIN'):
     elif security_type == 'SASL_SSL':
         KAFKA_DIR = os.environ['KAFKA_DIR']
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt&mechanism='+mechanism
+        if use_topic_attrs_for_creds:
+            endpoint_args += '&user-name=alice&password=alice-secret'
     else:
         KAFKA_DIR = os.environ['KAFKA_DIR']
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt'
@@ -4622,6 +4627,11 @@ def test_ps_s3_notification_push_kafka_security_ssl_sasl():
     kafka_security('SASL_SSL')
 
 
+@attr('kafka_security_test')
+def test_ps_s3_notification_push_kafka_security_ssl_sasl_attrs():
+    kafka_security('SASL_SSL', use_topic_attrs_for_creds=True)
+
+
 @attr('kafka_security_test')
 def test_ps_s3_notification_push_kafka_security_sasl():
     kafka_security('SASL_PLAINTEXT')