const boost::optional<std::string> ca_location;
const std::string user;
const std::string password;
+ const boost::optional<std::string> mechanism;
utime_t timestamp = ceph_clock_now();
// cleanup of all internal connection resource
// ctor for setting immutable values
connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl,
const boost::optional<const std::string&>& _ca_location,
- const std::string& _user, const std::string& _password) :
- cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password) {}
+ const std::string& _user, const std::string& _password, const boost::optional<const std::string&>& _mechanism) :
+ cct(_cct), broker(_broker), use_ssl(_use_ssl), verify_ssl(_verify_ssl), ca_location(_ca_location), user(_user), password(_password), mechanism(_mechanism) {}
// dtor also destroys the internals
~connection_t() {
str += "\nBroker: " + conn->broker;
str += conn->use_ssl ? "\nUse SSL" : "";
str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : "";
+ str += conn->mechanism ? "\nSASL Mechanism: " + *(conn->mechanism) : "";
return str;
}
// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
if (!conn->user.empty()) {
// use SSL+SASL
if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
- rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl;
+
+ if (conn->mechanism) {
+ if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl;
+ } else {
+ if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl;
+ }
+
} else {
// use only SSL
if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
bool verify_ssl,
boost::optional<const std::string&> ca_location,
const std::string& user,
- const std::string& password) {
+ const std::string& password,
+ boost::optional<const std::string&> mechanism) {
// create connection state
- connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password));
+ connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism));
return create_connection(conn);
}
connection_ptr_t connect(const std::string& url,
bool use_ssl,
bool verify_ssl,
- boost::optional<const std::string&> ca_location) {
+ boost::optional<const std::string&> ca_location,
+ boost::optional<const std::string&> mechanism) {
if (stopped) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
return nullptr;
}
- const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password);
+ const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password, mechanism);
// create_new_connection must always return a connection object
// even if error occurred during creation.
// in such a case the creation will be retried in the main thread
}
connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl,
- boost::optional<const std::string&> ca_location) {
+ boost::optional<const std::string&> ca_location,
+ boost::optional<const std::string&> mechanism) {
if (!s_manager) return nullptr;
- return s_manager->connect(url, use_ssl, verify_ssl, ca_location);
+ return s_manager->connect(url, use_ssl, verify_ssl, ca_location, mechanism);
}
int publish(connection_ptr_t& conn,