From: Kalpesh Pandya Date: Wed, 18 Aug 2021 07:07:38 +0000 (+0530) Subject: src/rgw: Deletion of marked_for_deletion parameter X-Git-Tag: v17.1.0~949^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fc5aeda85cc2bbbc7a66cb1154f465816569e49a;p=ceph.git src/rgw: Deletion of marked_for_deletion parameter Signed-off-by: Kalpesh Pandya --- diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index 55960971144b..3847453551f5 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -62,7 +62,6 @@ struct connection_t { rd_kafka_t* producer = nullptr; rd_kafka_conf_t* temp_conf = nullptr; std::vector topics; - bool marked_for_deletion = false; uint64_t delivery_tag = 1; int status = STATUS_OK; mutable std::atomic ref_count = 0; @@ -101,7 +100,7 @@ struct connection_t { } bool is_ok() const { - return (producer != nullptr && !marked_for_deletion); + return (producer != nullptr); } // ctor for setting immutable values @@ -187,7 +186,7 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* // utility function to create a connection, when the connection object already exists connection_ptr_t& create_connection(connection_ptr_t& conn) { // pointer must be valid and not marked for deletion - ceph_assert(conn && !conn->marked_for_deletion); + ceph_assert(conn); // reset all status codes conn->status = STATUS_OK; @@ -416,15 +415,6 @@ private: for (;conn_it != end_it;) { auto& conn = conn_it->second; - // delete the connection if marked for deletion - if (conn->marked_for_deletion) { - ldout(conn->cct, 10) << "Kafka run: connection is deleted" << dendl; - conn->destroy(STATUS_CONNECTION_CLOSED); - std::lock_guard lock(connections_lock); - // erase is safe - does not invalidate any other iterator - // lock so no insertion happens at the same time - ERASE_AND_CONTINUE(conn_it, connections); - } // try to reconnect the connection if it has an error if (!conn->is_ok()) { @@ -496,15 +486,6 @@ public: stopped = true; } - // disconnect from a broker - bool disconnect(connection_ptr_t& conn) { - if (!conn || stopped) { - return false; - } - conn->marked_for_deletion = true; - return true; - } - // connect to a broker, or reuse an existing connection if already connected connection_ptr_t connect(const std::string& url, bool use_ssl, @@ -537,11 +518,6 @@ public: const auto it = connections.find(broker); // note that ssl vs. non-ssl connection to the same host are two separate conenctions if (it != connections.end()) { - if (it->second->marked_for_deletion) { - // TODO: increment counter - ldout(cct, 1) << "Kafka connect: endpoint marked for deletion" << dendl; - return nullptr; - } // connection found - return even if non-ok ldout(cct, 20) << "Kafka connect: connection found" << dendl; return it->second; @@ -711,10 +687,5 @@ size_t get_max_queue() { return s_manager->max_queue; } -bool disconnect(connection_ptr_t& conn) { - if (!s_manager) return false; - return s_manager->disconnect(conn); -} - } // namespace kafka