static const int STATUS_QUEUE_FULL = -0x1003;
static const int STATUS_MAX_INFLIGHT = -0x1004;
static const int STATUS_MANAGER_STOPPED = -0x1005;
+static const int STATUS_CONNECTION_IDLE = -0x1006;
// status code for connection opening
static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
rd_kafka_conf_destroy(temp_conf);
return;
}
+ if (!is_ok()) {
+ // no producer, nothing to destroy
+ return;
+ }
// wait for all remaining acks/nacks
rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
// destroy all topics
std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
// destroy producer
rd_kafka_destroy(producer);
+ producer = nullptr;
// fire all remaining callbacks (if not fired by rd_kafka_flush)
std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
cb_tag.cb(status);
// dtor also destroys the internals
~connection_t() {
- destroy(STATUS_CONNECTION_CLOSED);
+ destroy(status);
}
friend void intrusive_ptr_add_ref(const connection_t* p);
return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
case STATUS_CONF_ALLOC_FAILED:
return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
+ case STATUS_CONNECTION_IDLE:
+ return "RGW_KAFKA_STATUS_CONNECTION_IDLE";
}
return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
}
ldout(conn->cct, 20) << "RDKAFKA-" << level << "-" << fac << ": " << rd_kafka_name(rk) << ": " << buf << dendl;
}
+void poll_err_callback(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
+ const auto conn = reinterpret_cast<connection_t*>(rd_kafka_opaque(rk));
+ ldout(conn->cct, 10) << "Kafka run: poll error(" << err << "): " << reason << dendl;
+}
+
// utility function to create a connection, when the connection object already exists
connection_ptr_t& create_connection(connection_ptr_t& conn) {
// pointer must be valid and not marked for deletion
// redirect kafka logs to RGW
rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
+ // define poll callback to allow reconnect
+ rd_kafka_conf_set_error_cb(conn->temp_conf, poll_err_callback);
// create the producer
conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
if (!conn->producer) {
typedef std::unordered_map<std::string, connection_ptr_t> ConnectionList;
typedef boost::lockfree::queue<message_wrapper_t*, boost::lockfree::fixed_sized<true>> MessageQueue;
-// macros used inside a loop where an iterator is either incremented or erased
-#define INCREMENT_AND_CONTINUE(IT) \
- ++IT; \
- continue;
-
-#define ERASE_AND_CONTINUE(IT,CONTAINER) \
- IT=CONTAINER.erase(IT); \
- --connection_count; \
- continue;
-
class Manager {
public:
const size_t max_connections;
// Checking the connection idlesness
if(conn->timestamp.sec() + max_idle_time < ceph_clock_now()) {
- ldout(conn->cct, 20) << "Time for deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
- ERASE_AND_CONTINUE(conn_it, connections);
+ ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+ conn->destroy(STATUS_CONNECTION_IDLE);
+ conn_it = connections.erase(conn_it);
+ --connection_count; \
+ continue;
}
// try to reconnect the connection if it has an error
} else {
ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successfull" << dendl;
}
- INCREMENT_AND_CONTINUE(conn_it);
+ ++conn_it;
+ continue;
}
reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);