namespace rgw::kafka {
// status codes for publishing
-// TODO: use the actual error code (when conn exists) instead of STATUS_CONNECTION_CLOSED when replying to client
static const int STATUS_CONNECTION_CLOSED = -0x1002;
static const int STATUS_QUEUE_FULL = -0x1003;
static const int STATUS_MAX_INFLIGHT = -0x1004;
static const int STATUS_MANAGER_STOPPED = -0x1005;
// status code for connection opening
static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
+static const int STATUS_CONF_REPLCACE = -0x2002;
static const int STATUS_OK = 0x0;
std::vector<rd_kafka_topic_t*> topics;
uint64_t delivery_tag = 1;
int status = STATUS_OK;
- mutable std::atomic<int> ref_count = 0;
CephContext* const cct;
CallbackList callbacks;
const std::string broker;
~connection_t() {
destroy(STATUS_CONNECTION_CLOSED);
}
-
- friend void intrusive_ptr_add_ref(const connection_t* p);
- friend void intrusive_ptr_release(const connection_t* p);
};
-std::string to_string(const connection_ptr_t& conn) {
- std::string str;
- str += "\nBroker: " + conn->broker;
- str += conn->use_ssl ? "\nUse SSL" : "";
- str += conn->ca_location ? "\nCA Location: " + *(conn->ca_location) : "";
- str += conn->mechanism ? "\nSASL Mechanism: " + *(conn->mechanism) : "";
- return str;
-}
-// these are required interfaces so that connection_t could be used inside boost::intrusive_ptr
-void intrusive_ptr_add_ref(const connection_t* p) {
- ++p->ref_count;
-}
-void intrusive_ptr_release(const connection_t* p) {
- if (--p->ref_count == 0) {
- delete p;
- }
-}
-
// convert int status to string - including RGW specific values
std::string status_to_string(int s) {
switch (s) {
return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
case STATUS_CONF_ALLOC_FAILED:
return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
+ case STATUS_CONF_REPLCACE:
+ return "RGW_KAFKA_STATUS_CONF_REPLCACE";
}
return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
}
ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
}
-// utility function to create a connection, when the connection object already exists
-connection_ptr_t& create_connection(connection_ptr_t& conn) {
- // pointer must be valid and not marked for deletion
- ceph_assert(conn);
-
+void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
+ const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk));
+ ldout(conn->cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl;
+}
+
+using connection_t_ptr = std::unique_ptr<connection_t>;
+
+// utility function to create a producer, when the connection object already exists
+bool new_producer(connection_t* conn) {
// reset all status codes
conn->status = STATUS_OK;
char errstr[512] = {0};
conn->temp_conf = rd_kafka_conf_new();
if (!conn->temp_conf) {
conn->status = STATUS_CONF_ALLOC_FAILED;
- return conn;
+ return false;
}
// get list of brokers based on the bootsrap broker
rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
// set the global opaque pointer to be the connection itself
- rd_kafka_conf_set_opaque(conn->temp_conf, conn.get());
+ rd_kafka_conf_set_opaque(conn->temp_conf, conn);
// redirect kafka logs to RGW
rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
// create the producer
+ if (conn->producer) {
+ ldout(conn->cct, 5) << "Kafka connect: producer already exists. detroying the existing before creating a new one" << dendl;
+ conn->destroy(STATUS_CONF_REPLCACE);
+ }
conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
if (!conn->producer) {
conn->status = rd_kafka_last_error();
ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl;
- return conn;
+ return false;
}
ldout(conn->cct, 20) << "Kafka connect: successfully created new producer" << dendl;
{
// conf ownership passed to producer
conn->temp_conf = nullptr;
- return conn;
+ return true;
conf_error:
conn->status = rd_kafka_last_error();
ldout(conn->cct, 1) << "Kafka connect: configuration failed: " << errstr << dendl;
- return conn;
-}
-
-// utility function to create a new connection
-connection_ptr_t create_new_connection(const std::string& broker, CephContext* cct,
- bool use_ssl,
- bool verify_ssl,
- boost::optional<const std::string&> ca_location,
- const std::string& user,
- const std::string& password,
- boost::optional<const std::string&> mechanism) {
- // create connection state
- connection_ptr_t conn(new connection_t(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism));
- return create_connection(conn);
+ return false;
}
-/// struct used for holding messages in the message queue
+// struct used for holding messages in the message queue
struct message_wrapper_t {
- connection_ptr_t conn;
+ std::string conn_name;
std::string topic;
std::string message;
- reply_callback_t cb;
+ const reply_callback_t cb;
- message_wrapper_t(connection_ptr_t& _conn,
+ message_wrapper_t(const std::string& _conn_name,
const std::string& _topic,
const std::string& _message,
- reply_callback_t _cb) : conn(_conn), topic(_topic), message(_message), cb(_cb) {}
+ reply_callback_t _cb) : conn_name(_conn_name), topic(_topic), message(_message), cb(_cb) {}
};
-typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
+typedef std::unordered_map<std::string, connection_t_ptr> ConnectionList;
typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
// macros used inside a loop where an iterator is either incremented or erased
// TODO use rd_kafka_produce_batch for better performance
void publish_internal(message_wrapper_t* message) {
- const std::unique_ptr<message_wrapper_t> msg_owner(message);
- auto& conn = message->conn;
+ const std::unique_ptr<message_wrapper_t> msg_deleter(message);
+ const auto conn_it = connections.find(message->conn_name);
+ if (conn_it == connections.end()) {
+ ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue. error: " << STATUS_CONNECTION_CLOSED << dendl;
+ if (message->cb) {
+ message->cb(STATUS_CONNECTION_CLOSED);
+ }
+ return;
+ }
+ auto& conn = conn_it->second;
conn->timestamp = ceph_clock_now();
if (!conn->is_ok()) {
// connection had an issue while message was in the queue
// TODO add error stats
- ldout(conn->cct, 1) << "Kafka publish: connection had an issue while message was in the queue. error: " << status_to_string(conn->status) << dendl;
+ ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. error: " << status_to_string(conn->status) << dendl;
if (message->cb) {
message->cb(conn->status);
}
ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
const auto& broker = conn_it->first;
ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
- if (create_connection(conn)->is_ok() == false) {
+ if (new_producer(conn.get()) == false) {
ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
// TODO: add error counter for failed retries
// TODO: add exponential backoff for retries
}
// connect to a broker, or reuse an existing connection if already connected
- connection_ptr_t connect(const std::string& url,
+ bool connect(std::string& broker,
+ const std::string& url,
bool use_ssl,
bool verify_ssl,
boost::optional<const std::string&> ca_location,
boost::optional<const std::string&> mechanism) {
if (stopped) {
- // TODO: increment counter
ldout(cct, 1) << "Kafka connect: manager is stopped" << dendl;
- return nullptr;
+ return false;
}
- std::string broker;
- std::string user;
- std::string password;
+ std::string user;
+ std::string password;
if (!parse_url_authority(url, broker, user, password)) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: URL parsing failed" << dendl;
- return nullptr;
+ return false;
}
// this should be validated by the regex in parse_url()
ceph_assert(user.empty() == password.empty());
- if (!user.empty() && !use_ssl && !g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
+ if (!user.empty() && !use_ssl && !g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
ldout(cct, 1) << "Kafka connect: user/password are only allowed over secure connection" << dendl;
- return nullptr;
- }
+ return false;
+ }
std::lock_guard lock(connections_lock);
const auto it = connections.find(broker);
if (it != connections.end()) {
// connection found - return even if non-ok
ldout(cct, 20) << "Kafka connect: connection found" << dendl;
- return it->second;
+ return it->second.get();
}
// connection not found, creating a new one
if (connection_count >= max_connections) {
// TODO: increment counter
ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
- return nullptr;
+ return false;
}
- const auto conn = create_new_connection(broker, cct, use_ssl, verify_ssl, ca_location, user, password, mechanism);
- // create_new_connection must always return a connection object
+ // create_connection must always return a connection object
// even if error occurred during creation.
// in such a case the creation will be retried in the main thread
- ceph_assert(conn);
++connection_count;
ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
- return connections.emplace(broker, conn).first->second;
+ auto conn = connections.emplace(broker, std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get();
+ if (!new_producer(conn)) {
+ ldout(cct, 10) << "Kafka connect: new connection is created. But producer creation failed. will retry" << dendl;
+ }
+ return true;
}
// TODO publish with confirm is needed in "none" case as well, cb should be invoked publish is ok (no ack)
- int publish(connection_ptr_t& conn,
+ int publish(const std::string& conn_name,
const std::string& topic,
const std::string& message) {
if (stopped) {
return STATUS_MANAGER_STOPPED;
}
- if (!conn || !conn->is_ok()) {
- return STATUS_CONNECTION_CLOSED;
- }
- if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
+ if (messages.push(new message_wrapper_t(conn_name, topic, message, nullptr))) {
++queued;
return STATUS_OK;
}
return STATUS_QUEUE_FULL;
}
- int publish_with_confirm(connection_ptr_t& conn,
+ int publish_with_confirm(const std::string& conn_name,
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
if (stopped) {
return STATUS_MANAGER_STOPPED;
}
- if (!conn || !conn->is_ok()) {
- return STATUS_CONNECTION_CLOSED;
- }
- if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
+ if (messages.push(new message_wrapper_t(conn_name, topic, message, cb))) {
++queued;
return STATUS_OK;
}
s_manager = nullptr;
}
-connection_ptr_t connect(const std::string& url, bool use_ssl, bool verify_ssl,
+bool connect(std::string& broker, const std::string& url, bool use_ssl, bool verify_ssl,
boost::optional<const std::string&> ca_location,
boost::optional<const std::string&> mechanism) {
- if (!s_manager) return nullptr;
- return s_manager->connect(url, use_ssl, verify_ssl, ca_location, mechanism);
+ if (!s_manager) return false;
+ return s_manager->connect(broker, url, use_ssl, verify_ssl, ca_location, mechanism);
}
-int publish(connection_ptr_t& conn,
+int publish(const std::string& conn_name,
const std::string& topic,
const std::string& message) {
if (!s_manager) return STATUS_MANAGER_STOPPED;
- return s_manager->publish(conn, topic, message);
+ return s_manager->publish(conn_name, topic, message);
}
-int publish_with_confirm(connection_ptr_t& conn,
+int publish_with_confirm(const std::string& conn_name,
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
if (!s_manager) return STATUS_MANAGER_STOPPED;
- return s_manager->publish_with_confirm(conn, topic, message, cb);
+ return s_manager->publish_with_confirm(conn_name, topic, message, cb);
}
size_t get_connection_count() {