case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE:
return EMSGSIZE;
case RD_KAFKA_RESP_ERR__QUEUE_FULL:
- return ENOBUFS;
+ return ENOBUFS;
default:
return EIO;
}
}
};
+// convert int status to string - including RGW specific values
+std::string status_to_string(int s) {
+ switch (s) {
+ case STATUS_OK:
+ return "STATUS_OK";
+ case STATUS_CONNECTION_CLOSED:
+ return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
+ case STATUS_QUEUE_FULL:
+ return "RGW_KAFKA_STATUS_QUEUE_FULL";
+ case STATUS_MAX_INFLIGHT:
+ return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
+ case STATUS_MANAGER_STOPPED:
+ return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
+ case STATUS_CONF_ALLOC_FAILED:
+ return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
+ case STATUS_CONF_REPLCACE:
+ return "RGW_KAFKA_STATUS_CONF_REPLCACE";
+ case STATUS_CONNECTION_IDLE:
+ return "RGW_KAFKA_STATUS_CONNECTION_IDLE";
+ }
+ return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
+}
+
void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
ceph_assert(opaque);
const auto result = rkmessage->err;
if (rkmessage->err == 0) {
- ldout(conn->cct, 20) << "Kafka run: ack received with result=" <<
+ ldout(conn->cct, 20) << "Kafka run: ack received with result=" <<
rd_kafka_err2str(result) << dendl;
} else {
- ldout(conn->cct, 1) << "Kafka run: nack received with result=" <<
+ ldout(conn->cct, 1) << "Kafka run: nack received with result=" <<
rd_kafka_err2str(result) << dendl;
}
// however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
constexpr std::uint64_t min_message_timeout = 1;
const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
- if (rd_kafka_conf_set(conf.get(), "message.timeout.ms",
+ if (rd_kafka_conf_set(conf.get(), "message.timeout.ms",
std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
// get list of brokers based on the bootstrap broker
topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
if (!topic) {
const auto err = rd_kafka_last_error();
- ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: "
+ ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: "
<< rd_kafka_err2str(err) << "(" << err << ")" << dendl;
if (message->cb) {
message->cb(-rd_kafka_err2errno(err));
// Checking the connection idleness
if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
- ldout(conn->cct, 20) << "kafka run: deleting a connection that was idle for: " <<
+ ldout(conn->cct, 20) << "kafka run: deleting a connection that was idle for: " <<
conn->cct->_conf->rgw_kafka_connection_idle << " seconds. last activity was at: " << conn->timestamp << dendl;
std::lock_guard lock(connections_lock);
conn->status = STATUS_CONNECTION_IDLE;
bool use_ssl,
bool verify_ssl,
boost::optional<const std::string&> ca_location,
- boost::optional<const std::string&> mechanism) {
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> topic_user_name,
+ boost::optional<const std::string&> topic_password) {
if (stopped) {
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
return false;
return false;
}
+ // check if username/password was already supplied via topic attributes
+ // and if also provided as part of the endpoint URL issue a warning
+ if (topic_user_name.has_value()) {
+ if (!user.empty()) {
+ ldout(cct, 5) << "Kafka connect: username provided via both topic attributes and endpoint URL: using topic attributes" << dendl;
+ }
+ user = topic_user_name.get();
+ }
+ if (topic_password.has_value()) {
+ if (!password.empty()) {
+ ldout(cct, 5) << "Kafka connect: password provided via both topic attributes and endpoint URL: using topic attributes" << dendl;
+ }
+ password = topic_password.get();
+ }
+
// this should be validated by the regex in parse_url()
ceph_assert(user.empty() == password.empty());
bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
boost::optional<const std::string&> ca_location,
- boost::optional<const std::string&> mechanism) {
+ boost::optional<const std::string&> mechanism,
+ boost::optional<const std::string&> user_name,
+ boost::optional<const std::string&> password) {
std::shared_lock lock(s_manager_mutex);
if (!s_manager) return false;
- return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism);
+ return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism, user_name, password);
}
int publish(const std::string& conn_name,
conn2.delete_bucket(bucket_name)
-def kafka_security(security_type, mechanism='PLAIN'):
+def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False):
""" test pushing kafka s3 notification securly to master """
conn = connection()
zonegroup = get_config_zonegroup()
topic_name = bucket_name+'_topic'
# create s3 topic
if security_type == 'SASL_SSL':
- endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+ if not use_topic_attrs_for_creds:
+ endpoint_address = 'kafka://alice:alice-secret@' + kafka_server + ':9094'
+ else:
+ endpoint_address = 'kafka://' + kafka_server + ':9094'
elif security_type == 'SSL':
endpoint_address = 'kafka://' + kafka_server + ':9093'
elif security_type == 'SASL_PLAINTEXT':
elif security_type == 'SASL_SSL':
KAFKA_DIR = os.environ['KAFKA_DIR']
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt&mechanism='+mechanism
+ if use_topic_attrs_for_creds:
+ endpoint_args += '&user-name=alice&password=alice-secret'
else:
KAFKA_DIR = os.environ['KAFKA_DIR']
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt'
kafka_security('SASL_SSL')
+@attr('kafka_security_test')
+def test_ps_s3_notification_push_kafka_security_ssl_sasl_attrs():
+ kafka_security('SASL_SSL', use_topic_attrs_for_creds=True)
+
+
@attr('kafka_security_test')
def test_ps_s3_notification_push_kafka_security_sasl():
kafka_security('SASL_PLAINTEXT')