cct(_cct),
topic(_topic),
ack_level(get_ack_level(args)) {
- if (!kafka::connect(conn_name, _endpoint, get_bool(args, "use-ssl", false), get_bool(args, "verify-ssl", true),
- args.get_optional("ca-location"), args.get_optional("mechanism"))) {
+ if (!kafka::connect(conn_name, _endpoint,
+ get_bool(args, "use-ssl", false),
+ get_bool(args, "verify-ssl", true),
+ args.get_optional("ca-location"),
+ args.get_optional("mechanism"),
+ args.get_optional("user-name"),
+ args.get_optional("password"))) {
throw configuration_error("Kafka: failed to create connection to: " + _endpoint);
}
}
bool use_ssl,
bool verify_ssl,
boost::optional<const std::string&> ca_location,
- boost::optional<const std::string&> mechanism) {
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> topic_user_name,
+ boost::optional<const std::string&> topic_password) {
if (stopped) {
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
return false;
return false;
}
+ // check if username/password was already supplied via topic attributes
+ // and if also provided as part of the endpoint URL issue a warning
+ if (topic_user_name.has_value()) {
+ if (!user.empty()) {
+ ldout(cct, 5) << "Kafka connect: username provided via both topic attributes and endpoint URL: using topic attributes" << dendl;
+ }
+ user = topic_user_name.get();
+ }
+ if (topic_password.has_value()) {
+ if (!password.empty()) {
+ ldout(cct, 5) << "Kafka connect: password provided via both topic attributes and endpoint URL: using topic attributes" << dendl;
+ }
+ password = topic_password.get();
+ }
+
// this should be validated by the regex in parse_url()
ceph_assert(user.empty() == password.empty());
bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
boost::optional<const std::string&> ca_location,
- boost::optional<const std::string&> mechanism) {
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> user_name,
+ boost::optional<const std::string&> password) {
if (!s_manager) return false;
- return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism);
+ return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
}
int publish(const std::string& conn_name,
void shutdown();
// connect to a kafka endpoint
-bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl, boost::optional<const std::string&> ca_location, boost::optional<const std::string&> mechanism);
+bool connect(std::string& broker,
+ const std::string& url,
+ bool use_ssl,
+ bool verify_ssl,
+ boost::optional<const std::string&> ca_location,
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> user_name,
+ boost::optional<const std::string&> password);
// publish a message over a connection that was already created
int publish(const std::string& conn_name,
};
static constexpr std::initializer_list<const char*> args = {
"verify-ssl", "use-ssl", "ca-location", "amqp-ack-level",
- "amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents"};
+ "amqp-exchange", "kafka-ack-level", "mechanism", "cloudevents",
+ "user-name", "password"};
if (std::find(args.begin(), args.end(), attribute_name) != args.end()) {
replace_str(attribute_name, s->info.args.get("AttributeValue"));
return 0;
conn2.delete_bucket(bucket_name)
-def kafka_security(security_type, mechanism='PLAIN'):
+def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False):
""" test pushing kafka s3 notification securly to master """
conn = connection()
zonegroup = get_config_zonegroup()
topic_name = bucket_name+'_topic'
# create s3 topic
if security_type == 'SASL_SSL':
- endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+ if not use_topic_attrs_for_creds:
+ endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+ else:
+ endpoint_address = 'kafka://' + kafka_server + ':9094'
elif security_type == 'SSL':
endpoint_address = 'kafka://' + kafka_server + ':9093'
elif security_type == 'SASL_PLAINTEXT':
elif security_type == 'SASL_SSL':
KAFKA_DIR = os.environ['KAFKA_DIR']
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt&mechanism='+mechanism
+ if use_topic_attrs_for_creds:
+ endpoint_args += '&user-name=alice&password=alice-secret'
else:
KAFKA_DIR = os.environ['KAFKA_DIR']
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt'
kafka_security('SASL_SSL')
+@attr('kafka_security_test')
+def test_ps_s3_notification_push_kafka_security_ssl_sasl_attrs():
+ kafka_security('SASL_SSL', use_topic_attrs_for_creds=True)
+
+
@attr('kafka_security_test')
def test_ps_s3_notification_push_kafka_security_sasl():
kafka_security('SASL_PLAINTEXT')