]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/s3-notifications: added user-name/password topic attributes for use with SASL...
authorigomon <igomon@bloomberg.net>
Tue, 16 Apr 2024 18:56:01 +0000 (14:56 -0400)
committerigomon <igomon@bloomberg.net>
Wed, 4 Dec 2024 19:14:08 +0000 (14:14 -0500)
Signed-off-by: Igor Gomon <igomon@bloomberg.net>
(cherry picked from commit 84744428e6345a2e71fcd933bfc0d31c4988c280)

Conflicts:
src/rgw/rgw_kafka.cc
- a bit of an overlap between my changes and later changes (mostly code refactoring/cleanup around Kafka error handling and usage of raw pointers).

src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/rgw_kafka.cc
src/rgw/rgw_kafka.h
src/rgw/rgw_rest_pubsub.cc
src/test/rgw/bucket_notification/test_bn.py

index 2d569d1b0c99f38b70b300d8957ecd933e62fe0d..f1a1e8c9546bf4a373da3ccb1adb3dbe515545d1 100644 (file)
@@ -311,8 +311,13 @@ public:
       const RGWHTTPArgs& args) : 
         topic(_topic),
         ack_level(get_ack_level(args)) {
-    if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), 
-          args.get_optional("ca-location"), args.get_optional("mechanism"))) {
+    if (!kafka::connect(conn_name, _endpoint,
+                        get_bool(args, "use-ssl", false),
+                        get_bool(args, "verify-ssl", true),
+                        args.get_optional("ca-location"),
+                        args.get_optional("mechanism"),
+                        args.get_optional("user-name"),
+                        args.get_optional("password"))) {
       throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
     }
   }
index d2d9b7e07090aca6ad9263b85c7fa9be263cada4..63bf80b3a7b65b7ec0d8d5382c9034e1532e9fa7 100644 (file)
@@ -42,7 +42,7 @@ inline int rd_kafka_err2errno(rd_kafka_resp_err_t err) {
   case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE:
     return EMSGSIZE;
   case RD_KAFKA_RESP_ERR__QUEUE_FULL:
-    return ENOBUFS;                                                                                                                                                                                                                           
+    return ENOBUFS;
   default:
     return EIO;
   }
@@ -155,6 +155,29 @@ struct connection_t {
   }
 };
 
+// convert int status to string - including RGW specific values
+std::string status_to_string(int s) {
+  switch (s) {
+    case STATUS_OK:
+        return "STATUS_OK";
+    case STATUS_CONNECTION_CLOSED:
+      return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
+    case STATUS_QUEUE_FULL:
+      return "RGW_KAFKA_STATUS_QUEUE_FULL";
+    case STATUS_MAX_INFLIGHT:
+      return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
+    case STATUS_MANAGER_STOPPED:
+      return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
+    case STATUS_CONF_ALLOC_FAILED:
+      return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
+    case STATUS_CONF_REPLCACE:
+      return "RGW_KAFKA_STATUS_CONF_REPLCACE";
+    case STATUS_CONNECTION_IDLE:
+      return "RGW_KAFKA_STATUS_CONNECTION_IDLE";
+  }
+  return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
+}
+
 void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
   ceph_assert(opaque);
 
@@ -162,10 +185,10 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
   const auto result = rkmessage->err;
 
   if (rkmessage->err == 0) {
-      ldout(conn->cct, 20) << "Kafka run: ack received with result=" << 
+      ldout(conn->cct, 20) << "Kafka run: ack received with result=" <<
         rd_kafka_err2str(result) << dendl;
   } else {
-      ldout(conn->cct, 1) << "Kafka run: nack received with result=" << 
+      ldout(conn->cct, 1) << "Kafka run: nack received with result=" <<
         rd_kafka_err2str(result) << dendl;
   }
 
@@ -235,7 +258,7 @@ bool new_producer(connection_t* conn) {
   // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
   constexpr std::uint64_t min_message_timeout = 1;
   const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
-  if (rd_kafka_conf_set(conf.get(), "message.timeout.ms", 
+  if (rd_kafka_conf_set(conf.get(), "message.timeout.ms",
         std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
 
   // get list of brokers based on the bootstrap broker
@@ -391,7 +414,7 @@ private:
       topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
       if (!topic) {
         const auto err = rd_kafka_last_error();
-        ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " 
+        ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: "
           << rd_kafka_err2str(err) << "(" << err << ")" << dendl;
         if (message->cb) {
           message->cb(-rd_kafka_err2errno(err));
@@ -478,7 +501,7 @@ private:
 
         // Checking the connection idleness
         if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
-          ldout(conn->cct, 20) << "kafka run: deleting a connection that was idle for: " << 
+          ldout(conn->cct, 20) << "kafka run: deleting a connection that was idle for: " <<
             conn->cct->_conf->rgw_kafka_connection_idle << " seconds. last activity was at: " << conn->timestamp << dendl;
           std::lock_guard lock(connections_lock);
           conn->status = STATUS_CONNECTION_IDLE;
@@ -549,7 +572,9 @@ public:
           bool use_ssl,
           bool verify_ssl,
           boost::optional<const std::string&> ca_location,
-          boost::optional<const std::string&> mechanism) {
+          boost::optional<const std::string&> mechanism,
+          boost::optional<const std::string&> topic_user_name,
+          boost::optional<const std::string&> topic_password) {
     if (stopped) {
       ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
       return false;
@@ -563,6 +588,21 @@ public:
       return false;
     }
 
+    // check if username/password was already supplied via topic attributes
+    // and if also provided as part of the endpoint URL issue a warning
+    if (topic_user_name.has_value()) {
+      if (!user.empty()) {
+        ldout(cct, 5) << "Kafka connect: username provided via both topic attributes and endpoint URL: using topic attributes" << dendl;
+      }
+      user = topic_user_name.get();
+    }
+    if (topic_password.has_value()) {
+      if (!password.empty()) {
+        ldout(cct, 5) << "Kafka connect: password provided via both topic attributes and endpoint URL: using topic attributes" << dendl;
+      }
+      password = topic_password.get();
+    }
+
     // this should be validated by the regex in parse_url()
     ceph_assert(user.empty() == password.empty());
 
@@ -695,10 +735,12 @@ void shutdown() {
 
 bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
         boost::optional<const std::string&> ca_location,
-        boost::optional<const std::string&> mechanism) {
+        boost::optional<const std::string&> mechanism,
+        boost::optional<const std::string&> user_name,
+        boost::optional<const std::string&> password) {
   std::shared_lock lock(s_manager_mutex);
   if (!s_manager) return false;
-  return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism);
+  return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
 }
 
 int publish(const std::string& conn_name,
index 813fda32969b9fac57a55dee26a691d1b6efdbff..a6a38ed81ab8e1bf3821473afe28ac612c9fb764 100644 (file)
@@ -22,7 +22,14 @@ bool init(CephContext* cct);
 void shutdown();
 
 // connect to a kafka endpoint
-bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism);
+bool connect(std::string& broker,
+  const std::string& url,
+  bool use_ssl,
+  bool verify_ssl,
+  boost::optional<const std::string&> ca_location,
+  boost::optional<const std::string&> mechanism,
+  boost::optional<const std::string&> user_name,
+  boost::optional<const std::string&> password);
 
 // publish a message over a connection that was already created
 int publish(const std::string& conn_name,
index 7a93c7114e2cad67d226460e65282f9c4287259c..9ec6a6234f7939f99e31a40c9e48172622b688fd 100644 (file)
@@ -757,7 +757,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
       };
       static constexpr std::initializer_list<const char*> args = {
           "verify-ssl",    "use-ssl",         "ca-location", "amqp-ack-level",
-          "amqp-exchange", "kafka-ack-level", "mechanism",   "cloudevents"};
+          "amqp-exchange", "kafka-ack-level", "mechanism",   "cloudevents",
+          "user-name",     "password"};
       if (std::find(args.begin(), args.end(), attribute_name) != args.end()) {
         replace_str(attribute_name, s->info.args.get("AttributeValue"));
         return 0;
index 3ae5d544550198acc5a2a5791967709177ba6df1..768e4a4838d559466d9c92a4fc6ced477e352069 100644 (file)
@@ -4652,7 +4652,7 @@ def test_ps_s3_topic_no_permissions():
     conn2.delete_bucket(bucket_name)
 
 
-def kafka_security(security_type, mechanism='PLAIN'):
+def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False):
     """ test pushing kafka s3 notification securly to master """
     conn = connection()
     zonegroup = get_config_zonegroup()
@@ -4663,7 +4663,10 @@ def kafka_security(security_type, mechanism='PLAIN'):
     topic_name = bucket_name+'_topic'
     # create s3 topic
     if security_type == 'SASL_SSL':
-        endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+        if not use_topic_attrs_for_creds:
+            endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+        else:
+            endpoint_address = 'kafka://' + kafka_server + ':9094'
     elif security_type == 'SSL':
         endpoint_address = 'kafka://' + kafka_server + ':9093'
     elif security_type == 'SASL_PLAINTEXT':
@@ -4676,6 +4679,8 @@ def kafka_security(security_type, mechanism='PLAIN'):
     elif security_type == 'SASL_SSL':
         KAFKA_DIR = os.environ['KAFKA_DIR']
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt&mechanism='+mechanism
+        if use_topic_attrs_for_creds:
+            endpoint_args += '&user-name=alice&password=alice-secret'
     else:
         KAFKA_DIR = os.environ['KAFKA_DIR']
         endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt'
@@ -4748,6 +4753,11 @@ def test_ps_s3_notification_push_kafka_security_ssl_sasl():
     kafka_security('SASL_SSL')
 
 
+@attr('kafka_security_test')
+def test_ps_s3_notification_push_kafka_security_ssl_sasl_attrs():
+    kafka_security('SASL_SSL', use_topic_attrs_for_creds=True)
+
+
 @attr('kafka_security_test')
 def test_ps_s3_notification_push_kafka_security_sasl():
     kafka_security('SASL_PLAINTEXT')