]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
src/rgw: Deletion of marked_for_deletion parameter
authorKalpesh Pandya <kapandya@redhat.com>
Wed, 18 Aug 2021 07:07:38 +0000 (12:37 +0530)
committerKalpesh Pandya <kapandya@redhat.com>
Thu, 26 Aug 2021 05:20:50 +0000 (10:50 +0530)
Signed-off-by: Kalpesh Pandya <kapandya@redhat.com>
src/rgw/rgw_kafka.cc

index 55960971144bfbc5c5f969465613b911c8c28e37..3847453551f5b5b319472663743c0ec62d74a94d 100644 (file)
@@ -62,7 +62,6 @@ struct connection_t {
   rd_kafka_t* producer = nullptr;
   rd_kafka_conf_t* temp_conf = nullptr;
   std::vector<rd_kafka_topic_t*> topics;
-  bool marked_for_deletion = false;
   uint64_t delivery_tag = 1;
   int status = STATUS_OK;
   mutable std::atomic<int> ref_count = 0;
@@ -101,7 +100,7 @@ struct connection_t {
   }
 
   bool is_ok() const {
-    return (producer != nullptr && !marked_for_deletion);
+    return (producer != nullptr);
   }
 
   // ctor for setting immutable values
@@ -187,7 +186,7 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
 // utility function to create a connection, when the connection object already exists
 connection_ptr_t& create_connection(connection_ptr_t& conn) {
   // pointer must be valid and not marked for deletion
-  ceph_assert(conn && !conn->marked_for_deletion);
+  ceph_assert(conn);
   
   // reset all status codes
   conn->status = STATUS_OK; 
@@ -416,15 +415,6 @@ private:
       for (;conn_it != end_it;) {
         
         auto& conn = conn_it->second;
-        // delete the connection if marked for deletion
-        if (conn->marked_for_deletion) {
-          ldout(conn->cct, 10) << "Kafka run: connection is deleted" << dendl;
-          conn->destroy(STATUS_CONNECTION_CLOSED);
-          std::lock_guard lock(connections_lock);
-          // erase is safe - does not invalidate any other iterator
-          // lock so no insertion happens at the same time
-          ERASE_AND_CONTINUE(conn_it, connections);
-        }
 
         // try to reconnect the connection if it has an error
         if (!conn->is_ok()) {
@@ -496,15 +486,6 @@ public:
     stopped = true;
   }
 
-  // disconnect from a broker
-  bool disconnect(connection_ptr_t& conn) {
-    if (!conn || stopped) {
-      return false;
-    }
-    conn->marked_for_deletion = true;
-    return true;
-  }
-
   // connect to a broker, or reuse an existing connection if already connected
   connection_ptr_t connect(const std::string& url, 
           bool use_ssl,
@@ -537,11 +518,6 @@ public:
     const auto it = connections.find(broker);
     // note that ssl vs. non-ssl connection to the same host are two separate conenctions
     if (it != connections.end()) {
-      if (it->second->marked_for_deletion) {
-        // TODO: increment counter
-        ldout(cct, 1) << "Kafka connect: endpoint marked for deletion" << dendl;
-        return nullptr;
-      }
       // connection found - return even if non-ok
       ldout(cct, 20) << "Kafka connect: connection found" << dendl;
       return it->second;
@@ -711,10 +687,5 @@ size_t get_max_queue() {
   return s_manager->max_queue;
 }
 
-bool disconnect(connection_ptr_t& conn) {
-  if (!s_manager) return false;
-  return s_manager->disconnect(conn);
-}
-
 } // namespace kafka