]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
src/rgw: SASL mechanism implementation
authorhuynp1999 <huynp1999@gmail.com>
Mon, 19 Sep 2022 07:55:00 +0000 (14:55 +0700)
committerYuval Lifshitz <ylifshit@redhat.com>
Tue, 14 Feb 2023 12:42:39 +0000 (12:42 +0000)
Currently, RGW only supports Kafka endpoint with SASL mechanism PLAIN. On the Kafka cluster side, there are types of mechanisms. So we need to support more options.

If passed, it should be backported to all active versions.

Fixes: https://tracker.ceph.com/issues/57608
Signed-off-by: Huy Nguyen <huynp1999@gmail.com>
doc/radosgw/notifications.rst
src/rgw/driver/rados/rgw_pubsub_push.cc
src/rgw/rgw_kafka.cc
src/rgw/rgw_kafka.h

index 94d3a25312f7fa867edb6100ce88c094e972428c..f7244f63503c46ddfea1547ac87aa6fdfff67154 100644 (file)
@@ -156,6 +156,7 @@ updating, use the name of an existing topic and different endpoint values).
    [&Attributes.entry.8.key=push-endpoint&Attributes.entry.8.value=<endpoint>]
    [&Attributes.entry.9.key=persistent&Attributes.entry.9.value=true|false]
    [&Attributes.entry.10.key=cloudevents&Attributes.entry.10.value=true|false]
+   [&Attributes.entry.11.key=mechanism&Attributes.entry.11.value=<mechanism>]
 
 Request parameters:
 
@@ -215,6 +216,14 @@ Request parameters:
    requests will otherwise be rejected.
  - user/password: This must be provided along with ``use-ssl``. Connections to
    the broker will otherwise fail.
+ - mechanism: may be provided together with user/password (default: ``PLAIN``). The supported SASL mechanisms are:
+
+  - PLAIN
+  - SCRAM-SHA-256
+  - SCRAM-SHA-512
+  - GSSAPI
+  - OAUTHBEARER
+
  - port: This defaults to 9092.
  - kafka-ack-level: No end2end acking is required. Messages may persist in the
    broker before being delivered to their final destinations. Two ack methods
index 2f734c21df838addf385cfbdd500b759301e5a5f..9f1d7c57553b479c628988e2999c4bee38a2af1a 100644 (file)
@@ -309,7 +309,7 @@ public:
       CephContext* _cct) : 
         cct(_cct),
         topic(_topic),
-        conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"))) ,
+        conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"), args.get_optional("mechanism"))) ,
         ack_level(get_ack_level(args)) {
     if (!conn) { 
       throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
index 954f8b19e0ebffab81d7b3b31a48bb113c82b9b5..26934c303a198e87f4766728416cc4906936b2b9 100644 (file)
@@ -73,6 +73,7 @@ struct connection_t {
   const boost::optional<std::string> ca_location;
   const std::string user;
   const std::string password;
+  const boost::optional<std::string> mechanism;
   utime_t timestamp = ceph_clock_now();
 
   // cleanup of all internal connection resource
@@ -107,8 +108,8 @@ struct connection_t {
   // ctor for setting immutable values
   connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl, 
           const boost::optional<const std::string&>& _ca_location,
-          const std::string& _user, const std::string& _password) :
-      cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {}
+          const std::string& _user, const std::string& _password, const boost::optional<const std::string&>& _mechanism) :
+      cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password), mechanism(_mechanism) {}                                                                                                                                                        
 
   // dtor also destroys the internals
   ~connection_t() {
@@ -124,6 +125,7 @@ std::string to_string(const connection_ptr_t& conn) {
     str += "\nBroker: " + conn->broker; 
     str += conn->use_ssl ? "\nUse SSL" : ""; 
     str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : "";
+    str += conn->mechanism ? "\nSASL Mechanism: " + *(conn->mechanism) : "";
     return str;
 }
 // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
@@ -220,10 +222,18 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) {
     if (!conn->user.empty()) {
       // use SSL+SASL
       if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
-              rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
               rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
               rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
       ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl;
+
+      if (conn->mechanism) {
+        if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+        ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl;
+      } else {
+        if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+        ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl;
+      }
+
     } else {
       // use only SSL
       if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
@@ -286,9 +296,10 @@ connection_ptr_t create_new_connection(const std::string& broker, CephContext* c
         bool verify_ssl,
         boost::optional<const std::string&> ca_location, 
         const std::string& user, 
-        const std::string& password) { 
+        const std::string& password,
+        boost::optional<const std::string&> mechanism) { 
   // create connection state
-  connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password));
+  connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism));
   return create_connection(conn);
 }
 
@@ -529,7 +540,8 @@ public:
   connection_ptr_t connect(const std::string& url, 
           bool use_ssl,
           bool verify_ssl,
-          boost::optional<const std::string&> ca_location) {
+          boost::optional<const std::string&> ca_location,
+          boost::optional<const std::string&> mechanism) {
     if (stopped) {
       // TODO: increment counter
       ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
@@ -568,7 +580,7 @@ public:
       ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
       return nullptr;
     }
-    const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password);
+    const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password, mechanism);
     // create_new_connection must always return a connection object
     // even if error occurred during creation. 
     // in such a case the creation will be retried in the main thread
@@ -671,9 +683,10 @@ void shutdown() {
 }
 
 connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl,
-        boost::optional<const std::string&> ca_location) {
+        boost::optional<const std::string&> ca_location,
+        boost::optional<const std::string&> mechanism) {
   if (!s_manager) return nullptr;
-  return s_manager->connect(url, use_ssl, verify_ssl, ca_location);
+  return s_manager->connect(url, use_ssl, verify_ssl, ca_location, mechanism);
 }
 
 int publish(connection_ptr_t& conn, 
index 81f02106d78a2c7727c458477d787fb01274ed9c..6d3dcad4a004ce388f26756d84b9efd8d34b7a9d 100644 (file)
@@ -31,7 +31,7 @@ bool init(CephContext* cct);
 void shutdown();
 
 // connect to a kafka endpoint
-connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location);
+connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism);
 
 // publish a message over a connection that was already created
 int publish(connection_ptr_t& conn,