From d5dce601f65974a21c83c4b1add036030a75c3c4 Mon Sep 17 00:00:00 2001 From: huynp1999 Date: Mon, 19 Sep 2022 14:55:00 +0700 Subject: [PATCH] src/rgw: SASL mechanism implementation Currently, RGW only supports Kafka endpoint with SASL mechanism PLAIN. On the Kafka cluster side, there are types of mechanisms. So we need to support more options. If passed, it should be backported to all active versions. Fixes: https://tracker.ceph.com/issues/57608 Signed-off-by: Huy Nguyen --- doc/radosgw/notifications.rst | 9 +++++++ src/rgw/driver/rados/rgw_pubsub_push.cc | 2 +- src/rgw/rgw_kafka.cc | 31 ++++++++++++++++++------- src/rgw/rgw_kafka.h | 2 +- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index 94d3a25312f7f..f7244f63503c4 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -156,6 +156,7 @@ updating, use the name of an existing topic and different endpoint values). [&Attributes.entry.8.key=push-endpoint&Attributes.entry.8.value=] [&Attributes.entry.9.key=persistent&Attributes.entry.9.value=true|false] [&Attributes.entry.10.key=cloudevents&Attributes.entry.10.value=true|false] + [&Attributes.entry.11.key=mechanism&Attributes.entry.11.value=] Request parameters: @@ -215,6 +216,14 @@ Request parameters: requests will otherwise be rejected. - user/password: This must be provided along with ``use-ssl``. Connections to the broker will otherwise fail. + - mechanism: may be provided together with user/password (default: ``PLAIN``). The supported SASL mechanisms are: + + - PLAIN + - SCRAM-SHA-256 + - SCRAM-SHA-512 + - GSSAPI + - OAUTHBEARER + - port: This defaults to 9092. - kafka-ack-level: No end2end acking is required. Messages may persist in the broker before being delivered to their final destinations. Two ack methods diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index 2f734c21df838..9f1d7c57553b4 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -309,7 +309,7 @@ public: CephContext* _cct) : cct(_cct), topic(_topic), - conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"))) , + conn(kafka::connect(_endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), args.get_optional("ca-location"), args.get_optional("mechanism"))) , ack_level(get_ack_level(args)) { if (!conn) { throw configuration_error("Kafka: failed to create connection to: " + _endpoint); diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 954f8b19e0ebf..26934c303a198 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -73,6 +73,7 @@ struct connection_t { const boost::optional ca_location; const std::string user; const std::string password; + const boost::optional mechanism; utime_t timestamp = ceph_clock_now(); // cleanup of all internal connection resource @@ -107,8 +108,8 @@ struct connection_t { // ctor for setting immutable values connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl, const boost::optional& _ca_location, - const std::string& _user, const std::string& _password) : - cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {} + const std::string& _user, const std::string& _password, const boost::optional& _mechanism) : + cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password), mechanism(_mechanism) {} // dtor also destroys the internals ~connection_t() { @@ -124,6 +125,7 @@ std::string to_string(const connection_ptr_t& conn) { str += "\nBroker: " + conn->broker; str += conn->use_ssl ? "\nUse SSL" : ""; str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : ""; + str += conn->mechanism ? "\nSASL Mechanism: " + *(conn->mechanism) : ""; return str; } // these are required interfaces so that connection_t could be used inside boost::intrusive_ptr @@ -220,10 +222,18 @@ connection_ptr_t& create_connection(connection_ptr_t& conn) { if (!conn->user.empty()) { // use SSL+SASL if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || - rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl; + + if (conn->mechanism) { + if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl; + } else { + if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl; + } + } else { // use only SSL if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; @@ -286,9 +296,10 @@ connection_ptr_t create_new_connection(const std::string& broker, CephContext* c bool verify_ssl, boost::optional ca_location, const std::string& user, - const std::string& password) { + const std::string& password, + boost::optional mechanism) { // create connection state - connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password)); + connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)); return create_connection(conn); } @@ -529,7 +540,8 @@ public: connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, - boost::optional ca_location) { + boost::optional ca_location, + boost::optional mechanism) { if (stopped) { // TODO: increment counter ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; @@ -568,7 +580,7 @@ public: ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl; return nullptr; } - const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password); + const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password, mechanism); // create_new_connection must always return a connection object // even if error occurred during creation. // in such a case the creation will be retried in the main thread @@ -671,9 +683,10 @@ void shutdown() { } connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, - boost::optional ca_location) { + boost::optional ca_location, + boost::optional mechanism) { if (!s_manager) return nullptr; - return s_manager->connect(url, use_ssl, verify_ssl, ca_location); + return s_manager->connect(url, use_ssl, verify_ssl, ca_location, mechanism); } int publish(connection_ptr_t& conn, diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h index 81f02106d78a2..6d3dcad4a004c 100644 --- a/src/rgw/rgw_kafka.h +++ b/src/rgw/rgw_kafka.h @@ -31,7 +31,7 @@ bool init(CephContext* cct); void shutdown(); // connect to a kafka endpoint -connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location); +connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location, boost::optional mechanism); // publish a message over a connection that was already created int publish(connection_ptr_t& conn, -- 2.39.5