rd_kafka_t* producer = nullptr;
rd_kafka_conf_t* temp_conf = nullptr;
std::vector<rd_kafka_topic_t*> topics;
- bool marked_for_deletion = false;
uint64_t delivery_tag = 1;
int status = STATUS_OK;
mutable std::atomic<int> ref_count = 0;
}
bool is_ok() const {
- return (producer != nullptr && !marked_for_deletion);
+ return (producer != nullptr);
}
// ctor for setting immutable values
// utility function to create a connection, when the connection object already exists
connection_ptr_t& create_connection(connection_ptr_t& conn) {
// pointer must be valid and not marked for deletion
- ceph_assert(conn && !conn->marked_for_deletion);
+ ceph_assert(conn);
// reset all status codes
conn->status = STATUS_OK;
for (;conn_it != end_it;) {
auto& conn = conn_it->second;
- // delete the connection if marked for deletion
- if (conn->marked_for_deletion) {
- ldout(conn->cct, 10) << "Kafka run: connection is deleted" << dendl;
- conn->destroy(STATUS_CONNECTION_CLOSED);
- std::lock_guard lock(connections_lock);
- // erase is safe - does not invalidate any other iterator
- // lock so no insertion happens at the same time
- ERASE_AND_CONTINUE(conn_it, connections);
- }
// try to reconnect the connection if it has an error
if (!conn->is_ok()) {
stopped = true;
}
- // disconnect from a broker
- bool disconnect(connection_ptr_t& conn) {
- if (!conn || stopped) {
- return false;
- }
- conn->marked_for_deletion = true;
- return true;
- }
-
// connect to a broker, or reuse an existing connection if already connected
connection_ptr_t connect(const std::string& url,
bool use_ssl,
const auto it = connections.find(broker);
// note that ssl vs. non-ssl connection to the same host are two separate conenctions
if (it != connections.end()) {
- if (it->second->marked_for_deletion) {
- // TODO: increment counter
- ldout(cct, 1) << "Kafka connect: endpoint marked for deletion" << dendl;
- return nullptr;
- }
// connection found - return even if non-ok
ldout(cct, 20) << "Kafka connect: connection found" << dendl;
return it->second;
return s_manager->max_queue;
}
-bool disconnect(connection_ptr_t& conn) {
- if (!s_manager) return false;
- return s_manager->disconnect(conn);
-}
-
} // namespace kafka