From: igomon Date: Tue, 16 Apr 2024 18:56:01 +0000 (-0400) Subject: rgw/s3-notifications: added user-name/password topic attributes for use with SASL... X-Git-Tag: v20.0.0~1982^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=84744428e6345a2e71fcd933bfc0d31c4988c280;p=ceph.git rgw/s3-notifications: added user-name/password topic attributes for use with SASL authentication with Kakfa broker Signed-off-by: Igor Gomon --- diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index 52bee2a16d31..3a1fed164381 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -303,8 +303,13 @@ public: cct(_cct), topic(_topic), ack_level(get_ack_level(args)) { - if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true), - args.get_optional("ca-location"), args.get_optional("mechanism"))) { + if (!kafka::connect(conn_name, _endpoint, + get_bool(args, "use-ssl", false), + get_bool(args, "verify-ssl", true), + args.get_optional("ca-location"), + args.get_optional("mechanism"), + args.get_optional("user-name"), + args.get_optional("password"))) { throw configuration_error("Kafka: failed to create connection to: " + _endpoint); } } diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 8669d431d359..ce496e414a7d 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -554,7 +554,9 @@ public: bool use_ssl, bool verify_ssl, boost::optional ca_location, - boost::optional mechanism) { + boost::optional mechanism, + boost::optional topic_user_name, + boost::optional topic_password) { if (stopped) { ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl; return false; @@ -568,6 +570,21 @@ public: return false; } + // check if username/password was already supplied via topic attributes + // and if also provided as part of the endpoint URL issue a warning + if (topic_user_name.has_value()) { + if (!user.empty()) { + ldout(cct, 5) << "Kafka connect: username provided via both topic attributes and endpoint URL: using topic attributes" << dendl; + } + user = topic_user_name.get(); + } + if (topic_password.has_value()) { + if (!password.empty()) { + ldout(cct, 5) << "Kafka connect: password provided via both topic attributes and endpoint URL: using topic attributes" << dendl; + } + password = topic_password.get(); + } + // this should be validated by the regex in parse_url() ceph_assert(user.empty() == password.empty()); @@ -694,9 +711,11 @@ void shutdown() { bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location, - boost::optional mechanism) { + boost::optional mechanism, + boost::optional user_name, + boost::optional password) { if (!s_manager) return false; - return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism); + return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password); } int publish(const std::string& conn_name, diff --git a/src/rgw/rgw_kafka.h b/src/rgw/rgw_kafka.h index 813fda32969b..a6a38ed81ab8 100644 --- a/src/rgw/rgw_kafka.h +++ b/src/rgw/rgw_kafka.h @@ -22,7 +22,14 @@ bool init(CephContext* cct); void shutdown(); // connect to a kafka endpoint -bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional ca_location, boost::optional mechanism); +bool connect(std::string& broker, + const std::string& url, + bool use_ssl, + bool verify_ssl, + boost::optional ca_location, + boost::optional mechanism, + boost::optional user_name, + boost::optional password); // publish a message over a connection that was already created int publish(const std::string& conn_name, diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index bf72baac13ea..894c5a0a21f4 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -755,7 +755,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp { }; static constexpr std::initializer_list args = { "verify-ssl", "use-ssl", "ca-location", "amqp-ack-level", - "amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents"}; + "amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents", + "user-name", "password"}; if (std::find(args.begin(), args.end(), attribute_name) != args.end()) { replace_str(attribute_name, s->info.args.get("AttributeValue")); return 0; diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index fda0cf29173d..4d4b5c85e941 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -4526,7 +4526,7 @@ def test_ps_s3_topic_no_permissions(): conn2.delete_bucket(bucket_name) -def kafka_security(security_type, mechanism='PLAIN'): +def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False): """ test pushing kafka s3 notification securly to master """ conn = connection() zonegroup = get_config_zonegroup() @@ -4537,7 +4537,10 @@ def kafka_security(security_type, mechanism='PLAIN'): topic_name = bucket_name+'_topic' # create s3 topic if security_type == 'SASL_SSL': - endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094' + if not use_topic_attrs_for_creds: + endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094' + else: + endpoint_address = 'kafka://' + kafka_server + ':9094' elif security_type == 'SSL': endpoint_address = 'kafka://' + kafka_server + ':9093' elif security_type == 'SASL_PLAINTEXT': @@ -4550,6 +4553,8 @@ def kafka_security(security_type, mechanism='PLAIN'): elif security_type == 'SASL_SSL': KAFKA_DIR = os.environ['KAFKA_DIR'] endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt&mechanism='+mechanism + if use_topic_attrs_for_creds: + endpoint_args += '&user-name=alice&password=alice-secret' else: KAFKA_DIR = os.environ['KAFKA_DIR'] endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt' @@ -4622,6 +4627,11 @@ def test_ps_s3_notification_push_kafka_security_ssl_sasl(): kafka_security('SASL_SSL') +@attr('kafka_security_test') +def test_ps_s3_notification_push_kafka_security_ssl_sasl_attrs(): + kafka_security('SASL_SSL', use_topic_attrs_for_creds=True) + + @attr('kafka_security_test') def test_ps_s3_notification_push_kafka_security_sasl(): kafka_security('SASL_PLAINTEXT')