From: igomon Date: Tue, 16 Apr 2024 18:56:01 +0000 (-0400) Subject: rgw/s3-notifications: added user-name/password topic attributes for use with SASL... X-Git-Tag: v19.2.3~277^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5f4cbdd82d5f9cf7e891f005f5c683539deded71;p=ceph.git rgw/s3-notifications: added user-name/password topic attributes for use with SASL authentication with Kakfa broker Signed-off-by: Igor Gomon (cherry picked from commit 84744428e6345a2e71fcd933bfc0d31c4988c280) Conflicts: src/rgw/rgw_kafka.cc - a bit of an overlap between my changes and later changes (mostly code refactoring/cleanup around Kafka error handling and usage of raw pointers). --- diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index 2d569d1b0c99..f1a1e8c9546b 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -311,8 +311,13 @@ public: const RGWHTTPArgs& args) : topic(_topic), ack_level(get_ack_level(args)) { - if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), - args.get_optional("ca-location"), args.get_optional("mechanism"))) { + if (!kafka::connect(conn_name, _endpoint, + get_bool(args, "use-ssl", false), + get_bool(args, "verify-ssl", true), + args.get_optional("ca-location"), + args.get_optional("mechanism"), + args.get_optional("user-name"), + args.get_optional("password"))) { throw configuration_error("Kafka: failed to create connection to: " + _endpoint); } } diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index d2d9b7e07090..63bf80b3a7b6 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -42,7 +42,7 @@ inline int rd_kafka_err2errno(rd_kafka_resp_err_t err) { case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE: return EMSGSIZE; case RD_KAFKA_RESP_ERR__QUEUE_FULL: - return ENOBUFS; + return ENOBUFS; default: return EIO; } @@ -155,6 +155,29 @@ struct connection_t { } }; +// convert int status to string - including RGW specific values +std::string status_to_string(int s) { + switch (s) { + case STATUS_OK: + return "STATUS_OK"; + case STATUS_CONNECTION_CLOSED: + return "RGW_KAFKA_STATUS_CONNECTION_CLOSED"; + case STATUS_QUEUE_FULL: + return "RGW_KAFKA_STATUS_QUEUE_FULL"; + case STATUS_MAX_INFLIGHT: + return "RGW_KAFKA_STATUS_MAX_INFLIGHT"; + case STATUS_MANAGER_STOPPED: + return "RGW_KAFKA_STATUS_MANAGER_STOPPED"; + case STATUS_CONF_ALLOC_FAILED: + return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED"; + case STATUS_CONF_REPLCACE: + return "RGW_KAFKA_STATUS_CONF_REPLCACE"; + case STATUS_CONNECTION_IDLE: + return "RGW_KAFKA_STATUS_CONNECTION_IDLE"; + } + return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s)); +} + void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) { ceph_assert(opaque); @@ -162,10 +185,10 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* const auto result = rkmessage->err; if (rkmessage->err == 0) { - ldout(conn->cct, 20) << "Kafka run: ack received with result=" << + ldout(conn->cct, 20) << "Kafka run: ack received with result=" << rd_kafka_err2str(result) << dendl; } else { - ldout(conn->cct, 1) << "Kafka run: nack received with result=" << + ldout(conn->cct, 1) << "Kafka run: nack received with result=" << rd_kafka_err2str(result) << dendl; } @@ -235,7 +258,7 @@ bool new_producer(connection_t* conn) { // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms constexpr std::uint64_t min_message_timeout = 1; const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout); - if (rd_kafka_conf_set(conf.get(), "message.timeout.ms", + if (rd_kafka_conf_set(conf.get(), "message.timeout.ms", std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; // get list of brokers based on the bootstrap broker @@ -391,7 +414,7 @@ private: topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr); if (!topic) { const auto err = rd_kafka_last_error(); - ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " + ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << rd_kafka_err2str(err) << "(" << err << ")" << dendl; if (message->cb) { message->cb(-rd_kafka_err2errno(err)); @@ -478,7 +501,7 @@ private: // Checking the connection idleness if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) { - ldout(conn->cct, 20) << "kafka run: deleting a connection that was idle for: " << + ldout(conn->cct, 20) << "kafka run: deleting a connection that was idle for: " << conn->cct->_conf->rgw_kafka_connection_idle << " seconds. last activity was at: " << conn->timestamp << dendl; std::lock_guard lock(connections_lock); conn->status = STATUS_CONNECTION_IDLE; @@ -549,7 +572,9 @@ public: bool use_ssl, bool verify_ssl, boost::optional ca_location, - boost::optional mechanism) { + boost::optional mechanism, + boost::optional topic_user_name, + boost::optional topic_password) { if (stopped) { ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; return false; @@ -563,6 +588,21 @@ public: return false; } + // check if username/password was already supplied via topic attributes + // and if also provided as part of the endpoint URL issue a warning + if (topic_user_name.has_value()) { + if (!user.empty()) { + ldout(cct, 5) << "Kafka connect: username provided via both topic attributes and endpoint URL: using topic attributes" << dendl; + } + user = topic_user_name.get(); + } + if (topic_password.has_value()) { + if (!password.empty()) { + ldout(cct, 5) << "Kafka connect: password provided via both topic attributes and endpoint URL: using topic attributes" << dendl; + } + password = topic_password.get(); + } + // this should be validated by the regex in parse_url() ceph_assert(user.empty() == password.empty()); @@ -695,10 +735,12 @@ void shutdown() { bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location, - boost::optional mechanism) { + boost::optional mechanism, + boost::optional user_name, + boost::optional password) { std::shared_lock lock(s_manager_mutex); if (!s_manager) return false; - return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism); + return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password); } int publish(const std::string& conn_name, diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h index 813fda32969b..a6a38ed81ab8 100644 --- a/src/rgw/rgw_kafka.h +++ b/src/rgw/rgw_kafka.h @@ -22,7 +22,14 @@ bool init(CephContext* cct); void shutdown(); // connect to a kafka endpoint -bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location, boost::optional mechanism); +bool connect(std::string& broker, + const std::string& url, + bool use_ssl, + bool verify_ssl, + boost::optional ca_location, + boost::optional mechanism, + boost::optional user_name, + boost::optional password); // publish a message over a connection that was already created int publish(const std::string& conn_name, diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index 7a93c7114e2c..9ec6a6234f79 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -757,7 +757,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp { }; static constexpr std::initializer_list args = { "verify-ssl", "use-ssl", "ca-location", "amqp-ack-level", - "amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents"}; + "amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents", + "user-name", "password"}; if (std::find(args.begin(), args.end(), attribute_name) != args.end()) { replace_str(attribute_name, s->info.args.get("AttributeValue")); return 0; diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 3ae5d5445501..768e4a4838d5 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -4652,7 +4652,7 @@ def test_ps_s3_topic_no_permissions(): conn2.delete_bucket(bucket_name) -def kafka_security(security_type, mechanism='PLAIN'): +def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False): """ test pushing kafka s3 notification securly to master """ conn = connection() zonegroup = get_config_zonegroup() @@ -4663,7 +4663,10 @@ def kafka_security(security_type, mechanism='PLAIN'): topic_name = bucket_name+'_topic' # create s3 topic if security_type == 'SASL_SSL': - endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094' + if not use_topic_attrs_for_creds: + endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094' + else: + endpoint_address = 'kafka://' + kafka_server + ':9094' elif security_type == 'SSL': endpoint_address = 'kafka://' + kafka_server + ':9093' elif security_type == 'SASL_PLAINTEXT': @@ -4676,6 +4679,8 @@ def kafka_security(security_type, mechanism='PLAIN'): elif security_type == 'SASL_SSL': KAFKA_DIR = os.environ['KAFKA_DIR'] endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt&mechanism='+mechanism + if use_topic_attrs_for_creds: + endpoint_args += '&user-name=alice&password=alice-secret' else: KAFKA_DIR = os.environ['KAFKA_DIR'] endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt' @@ -4748,6 +4753,11 @@ def test_ps_s3_notification_push_kafka_security_ssl_sasl(): kafka_security('SASL_SSL') +@attr('kafka_security_test') +def test_ps_s3_notification_push_kafka_security_ssl_sasl_attrs(): + kafka_security('SASL_SSL', use_topic_attrs_for_creds=True) + + @attr('kafka_security_test') def test_ps_s3_notification_push_kafka_security_sasl(): kafka_security('SASL_PLAINTEXT')