]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka: do not destroy the connection on errors 56033/head
authorYuval Lifshitz <ylifshit@ibm.com>
Thu, 7 Mar 2024 11:49:10 +0000 (11:49 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Wed, 22 May 2024 18:29:45 +0000 (18:29 +0000)
as well as other simplifications:
* do not store temporary configuration in the connection object. just
  use as a local variable
* do not create a connection without a producer

other improvements:
* copy to a local list before publishing
* convert internal error codes to errno

Fixes: https://tracker.ceph.com/issues/66017
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/rgw/rgw_kafka.cc

index ce1b273d8b78b42c87649187711814fe13ffd926..9a356d9c6f04e59009fda77b74a99a93a850698b 100644 (file)
 
 #define dout_subsys ceph_subsys_rgw_notification
 
-// TODO investigation, not necessarily issues:
-// (1) in case of single threaded writer context use spsc_queue
-// (2) check performance of emptying queue to local list, and go over the list and publish
-// (3) use std::shared_mutex (c++17) or equivalent for the connections lock
-
 // comparison operator between topic pointer and name
 bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
     return name == std::string_view(rd_kafka_topic_name(rkt)); 
 }
 
+// this is the inverse of rd_kafka_errno2err
+// see: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.c
+inline int rd_kafka_err2errno(rd_kafka_resp_err_t err) {
+  if (err == 0) return 0;
+  switch (err) {
+    case RD_KAFKA_RESP_ERR__INVALID_ARG:
+    return EINVAL;
+  case RD_KAFKA_RESP_ERR__CONFLICT:
+    return EBUSY;
+  case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC:
+    return ENOENT;
+  case RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION:
+    return ESRCH;
+  case RD_KAFKA_RESP_ERR__TIMED_OUT:
+  case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT:
+    return ETIMEDOUT;
+  case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE:
+    return EMSGSIZE;
+  case RD_KAFKA_RESP_ERR__QUEUE_FULL:
+    return ENOBUFS;                                                                                                                                                                                                                           
+  default:
+    return EIO;
+  }
+}
+
 namespace rgw::kafka {
 
-// status codes for publishing
-static const int STATUS_CONNECTION_CLOSED =      -0x1002;
-static const int STATUS_QUEUE_FULL =             -0x1003;
-static const int STATUS_MAX_INFLIGHT =           -0x1004;
-static const int STATUS_MANAGER_STOPPED =        -0x1005;
-static const int STATUS_CONNECTION_IDLE =        -0x1006;
-// status code for connection opening
-static const int STATUS_CONF_ALLOC_FAILED      = -0x2001;
-static const int STATUS_CONF_REPLCACE          = -0x2002;
+enum Status {
+  STATUS_CONNECTION_CLOSED = -0x1002,
+  STATUS_CONNECTION_IDLE   = -0x1006,
+  STATUS_CONF_ALLOC_FAILED = -0x2001,
+};
 
-static const int STATUS_OK =                     0x0;
+// convert int status to string - both RGW and librdkafka values
+inline std::string status_to_string(int s) {
+  switch (s) {
+    case STATUS_CONNECTION_CLOSED:
+      return "Kafka connection closed";
+    case STATUS_CONF_ALLOC_FAILED:
+      return "Kafka configuration allocation failed";
+    case STATUS_CONNECTION_IDLE:
+      return "Kafka connection idle";
+    default:
+      return std::string(rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(s)));
+  }
+}
+
+// convert int status to errno - both RGW and librdkafka values
+inline int status_to_errno(int s) {
+  if (s == 0) return 0;
+  switch (s) {
+    case STATUS_CONNECTION_CLOSED:
+      return -EIO;
+    case STATUS_CONF_ALLOC_FAILED:
+      return -ENOMEM;
+    case STATUS_CONNECTION_IDLE:
+      return -EIO;
+    default:
+      return -rd_kafka_err2errno(static_cast<rd_kafka_resp_err_t>(s));
+  }
+}
 
 // struct for holding the callback and its tag in the callback list
 struct reply_callback_with_tag_t {
@@ -56,15 +99,11 @@ struct reply_callback_with_tag_t {
 
 typedef std::vector<reply_callback_with_tag_t> CallbackList;
 
-// struct for holding the connection state object as well as list of topics
-// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
-// since references to deleted objects may still exist in the calling code
 struct connection_t {
   rd_kafka_t* producer = nullptr;
-  rd_kafka_conf_t* temp_conf = nullptr;
   std::vector<rd_kafka_topic_t*> topics;
   uint64_t delivery_tag = 1;
-  int status = STATUS_OK;
+  int status = 0;
   CephContext* const cct;
   CallbackList callbacks;
   const std::string broker;
@@ -79,40 +118,31 @@ struct connection_t {
   // cleanup of all internal connection resource
   // the object can still remain, and internal connection
   // resources created again on successful reconnection
-  void destroy(int s) {
-    status = s;
-    // destroy temporary conf (if connection was never established)
-    if (temp_conf) {
-        rd_kafka_conf_destroy(temp_conf);
-        return;
-    }
-    if (!is_ok()) {
+  void destroy() {
+    if (!producer) {
       // no producer, nothing to destroy
       return;
     }
-    // wait for all remaining acks/nacks
-    rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
+    // wait for 500ms to try and handle pending callbacks
+    rd_kafka_flush(producer, 500);
     // destroy all topics
     std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
+    topics.clear();
     // destroy producer
     rd_kafka_destroy(producer);
     producer = nullptr;
     // fire all remaining callbacks (if not fired by rd_kafka_flush)
     std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
-        cb_tag.cb(status);
-        ldout(cct, 20) << "Kafka destroy: invoking callback with tag="
+        ldout(cct, 1) << "Kafka destroy: invoking callback with tag: "
                        << cb_tag.tag << " for: " << broker
-                       << " with status: " << status << dendl;
+                       << " with status: " << status_to_string(status) << dendl;
+        cb_tag.cb(status_to_errno(status));
       });
     callbacks.clear();
     delivery_tag = 1;
     ldout(cct, 20) << "Kafka destroy: complete for: " << broker << dendl;
   }
 
-  bool is_ok() const {
-    return (producer != nullptr);
-  }
-
   // ctor for setting immutable values
   connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl, 
           const boost::optional<const std::string&>& _ca_location,
@@ -121,41 +151,26 @@ struct connection_t {
 
   // dtor also destroys the internals
   ~connection_t() {
-    destroy(status);
+    destroy();
   }
 };
 
-// convert int status to string - including RGW specific values
-std::string status_to_string(int s) {
-  switch (s) {
-    case STATUS_OK:
-        return "STATUS_OK";
-    case STATUS_CONNECTION_CLOSED:
-      return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
-    case STATUS_QUEUE_FULL:
-      return "RGW_KAFKA_STATUS_QUEUE_FULL";
-    case STATUS_MAX_INFLIGHT:
-      return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
-    case STATUS_MANAGER_STOPPED:
-      return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
-    case STATUS_CONF_ALLOC_FAILED:
-      return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
-    case STATUS_CONF_REPLCACE:
-      return "RGW_KAFKA_STATUS_CONF_REPLCACE";
-    case STATUS_CONNECTION_IDLE:
-      return "RGW_KAFKA_STATUS_CONNECTION_IDLE";
-  }
-  return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
-}
-
 void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
   ceph_assert(opaque);
 
   const auto conn = reinterpret_cast<connection_t*>(opaque);
   const auto result = rkmessage->err;
 
+  if (rkmessage->err == 0) {
+      ldout(conn->cct, 20) << "Kafka run: ack received with result=" << 
+        rd_kafka_err2str(result) << dendl;
+  } else {
+      ldout(conn->cct, 1) << "Kafka run: nack received with result=" << 
+        rd_kafka_err2str(result) << dendl;
+  }
+
   if (!rkmessage->_private) {
-    ldout(conn->cct, 20) << "Kafka run: n/ack received, (no callback) with result=" << result << dendl;
+    ldout(conn->cct, 20) << "Kafka run: n/ack received without a callback" << dendl;
     return;  
   }
 
@@ -165,8 +180,8 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void*
   const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag);
   if (tag_it != callbacks_end) {
       ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" << 
-          *tag << " and result=" << rd_kafka_err2str(result) << dendl;
-      tag_it->cb(result);
+          *tag << dendl;
+      tag_it->cb(-rd_kafka_err2errno(result));
       conn->callbacks.erase(tag_it);
   } else {
     // TODO add counter for acks with no callback
@@ -201,88 +216,88 @@ using connection_t_ptr = std::unique_ptr<connection_t>;
 // utility function to create a producer, when the connection object already exists
 bool new_producer(connection_t* conn) {
   // reset all status codes
-  conn->status = STATUS_OK; 
-  char errstr[512] = {0};
+  conn->status = 0;
+  ceph_assert(!conn->producer);
 
-  conn->temp_conf = rd_kafka_conf_new();
-  if (!conn->temp_conf) {
+  auto kafka_conf_deleter = [](rd_kafka_conf_t* conf) {rd_kafka_conf_destroy(conf);};
+
+  std::unique_ptr<rd_kafka_conf_t, decltype(kafka_conf_deleter)> conf(rd_kafka_conf_new(), kafka_conf_deleter);
+  if (!conf) {
+    ldout(conn->cct, 1) << "Kafka connect: failed to allocate configuration" << dendl;
     conn->status = STATUS_CONF_ALLOC_FAILED;
     return false;
   }
 
+  char errstr[512] = {0};
+
   // set message timeout
   // according to documentation, value of zero will expire the message based on retries.
   // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
   constexpr std::uint64_t min_message_timeout = 1;
   const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
-  if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms", 
+  if (rd_kafka_conf_set(conf.get(), "message.timeout.ms", 
         std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+
   // get list of brokers based on the bootstrap broker
-  if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
-  
+  if (rd_kafka_conf_set(conf.get(), "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+
   if (conn->use_ssl) {
     if (!conn->user.empty()) {
       // use SSL+SASL
-      if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
-              rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
-              rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+      if (rd_kafka_conf_set(conf.get(), "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+              rd_kafka_conf_set(conf.get(), "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+              rd_kafka_conf_set(conf.get(), "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
       ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl;
 
       if (conn->mechanism) {
-        if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+        if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
         ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl;
       } else {
-        if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+        if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
         ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl;
       }
 
     } else {
       // use only SSL
-      if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+      if (rd_kafka_conf_set(conf.get(), "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
       ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl;
     }
     if (conn->ca_location) {
-      if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+      if (rd_kafka_conf_set(conf.get(), "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
       ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl;
     } else {
       ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl;
     }
     // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
-    // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+    // if (rd_kafka_conf_set(conn->conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
 
     ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
   } else if (!conn->user.empty()) {
       // use SASL+PLAINTEXT
-      if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
-              rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
-              rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+      if (rd_kafka_conf_set(conf.get(), "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+              rd_kafka_conf_set(conf.get(), "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+              rd_kafka_conf_set(conf.get(), "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
       ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL_PLAINTEXT" << dendl;
 
       if (conn->mechanism) {
-        if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+        if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
         ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl;
       } else {
-        if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+        if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
         ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl;
       }
   }
 
   // set the global callback for delivery success/fail
-  rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
-
+  rd_kafka_conf_set_dr_msg_cb(conf.get(), message_callback);
   // set the global opaque pointer to be the connection itself
-  rd_kafka_conf_set_opaque(conn->temp_conf, conn);
-
+  rd_kafka_conf_set_opaque(conf.get(), conn);
   // redirect kafka logs to RGW
-  rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
+  rd_kafka_conf_set_log_cb(conf.get(), log_callback);
   // define poll callback to allow reconnect
-  rd_kafka_conf_set_error_cb(conn->temp_conf, poll_err_callback);
-  // create the producer
-  if (conn->producer) {
-    ldout(conn->cct, 5) << "Kafka connect: producer already exists. detroying the existing before creating a new one" << dendl;
-    conn->destroy(STATUS_CONF_REPLCACE);
-  }
-  conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
+  rd_kafka_conf_set_error_cb(conf.get(), poll_err_callback);
+  // create the producer and move conf ownership to it
+  conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf.release(), errstr, sizeof(errstr));
   if (!conn->producer) {
     conn->status = rd_kafka_last_error();
     ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl;
@@ -302,8 +317,6 @@ bool new_producer(connection_t* conn) {
       rd_kafka_set_log_level(conn->producer, 7);
   }
 
-  // conf ownership passed to producer
-  conn->temp_conf = nullptr;
   return true;
 
 conf_error:
@@ -350,9 +363,9 @@ private:
     const std::unique_ptr<message_wrapper_t> msg_deleter(message);
     const auto conn_it = connections.find(message->conn_name);
     if (conn_it == connections.end()) {
-      ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue. error: " << STATUS_CONNECTION_CLOSED << dendl;
+      ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue" << dendl;
       if (message->cb) {
-        message->cb(STATUS_CONNECTION_CLOSED);
+        message->cb(status_to_errno(STATUS_CONNECTION_CLOSED));
       }
       return;
     }
@@ -360,12 +373,13 @@ private:
 
     conn->timestamp = ceph_clock_now(); 
 
-    if (!conn->is_ok()) {
+    ceph_assert(conn->producer);
+    if (conn->status != 0) {
       // connection had an issue while message was in the queue
       // TODO add error stats
-      ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. error: " << status_to_string(conn->status) << dendl;
+      ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. with status: " << status_to_string(conn->status) << dendl;
       if (message->cb) {
-        message->cb(conn->status);
+        message->cb(status_to_errno(conn->status));
       }
       return;
     }
@@ -377,11 +391,11 @@ private:
       topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
       if (!topic) {
         const auto err = rd_kafka_last_error();
-        ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl;
+        ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " 
+          << rd_kafka_err2str(err) << "(" << err << ")" << dendl;
         if (message->cb) {
-          message->cb(err);
+          message->cb(-rd_kafka_err2errno(err));
         }
-        conn->destroy(err);
         return;
       }
       // TODO use the topics list as an LRU cache
@@ -411,13 +425,11 @@ private:
             tag);
     if (rc == -1) {
       const auto err = rd_kafka_last_error();
-      ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
-      // TODO: dont error on full queue, and don't destroy connection, retry instead
+      ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
       // immediatly invoke callback on error if needed
       if (message->cb) {
-        message->cb(err);
+        message->cb(-rd_kafka_err2errno(err));
       }
-      conn->destroy(err);
       delete tag;
       return;
     }
@@ -432,7 +444,7 @@ private:
       } else {
         // immediately invoke callback with error - this is not a connection error
         ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl;
-        message->cb(STATUS_MAX_INFLIGHT);
+        message->cb(-EBUSY);
         // tag will be deleted when the global callback is invoked
       }
     } else {
@@ -441,18 +453,12 @@ private:
     // coverity[leaked_storage:SUPPRESS]
   }
 
-  // the managers thread:
-  // (1) empty the queue of messages to be published
-  // (2) loop over all connections and read acks
-  // (3) manages deleted connections
-  // (4) TODO reconnect on connection errors
-  // (5) TODO cleanup timedout callbacks
   void run() noexcept {
     while (!stopped) {
 
       // publish all messages in the queue
       auto reply_count = 0U;
-      const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
+      const auto send_count = messages.consume_all([this](auto message){this->publish_internal(message);});
       dequeued += send_count;
       ConnectionList::iterator conn_it;
       ConnectionList::const_iterator end_it;
@@ -472,7 +478,8 @@ private:
 
         // Checking the connection idleness
         if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
-          ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+          ldout(conn->cct, 20) << "kafka run: deleting a connection that was idle for: " << 
+            conn->cct->_conf->rgw_kafka_connection_idle << " seconds. last activity was at: " << conn->timestamp << dendl;
           std::lock_guard lock(connections_lock);
           conn->status = STATUS_CONNECTION_IDLE;
           conn_it = connections.erase(conn_it);
@@ -480,22 +487,6 @@ private:
           continue;
         }
 
-        // try to reconnect the connection if it has an error
-        if (!conn->is_ok()) {
-          ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
-          const auto& broker = conn_it->first;
-          ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
-          if (new_producer(conn.get()) == false) {
-            ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
-            // TODO: add error counter for failed retries
-            // TODO: add exponential backoff for retries
-          } else {
-            ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successful" << dendl;
-          }
-          ++conn_it;
-          continue;
-        }
-
         reply_count += rd_kafka_poll(conn->producer, read_timeout);
 
         // just increment the iterator
@@ -504,6 +495,7 @@ private:
       // sleep if no messages were received or published across all connection
       if (send_count == 0 && reply_count == 0) {
         std::this_thread::sleep_for(std::chrono::milliseconds(read_timeout*3));
+        // TODO: add exponential backoff to sleep time
       }
     }
   }
@@ -608,15 +600,16 @@ public:
       ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
       return false;
     }
-    // create_connection must always return a connection object
-    // even if error occurred during creation. 
-    // in such a case the creation will be retried in the main thread
+
+    auto conn = std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism);
+    if (!new_producer(conn.get())) {
+      ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl;
+      return false;
+    }
     ++connection_count;
+    connections.emplace(broker, std::move(conn));
+
     ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
-    auto conn = connections.emplace(broker, std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get();
-    if (!new_producer(conn)) {
-      ldout(cct, 10) << "Kafka connect: new connection is created. But producer creation failed. will retry" << dendl;
-    }
     return true;
   }
 
@@ -625,15 +618,15 @@ public:
     const std::string& topic,
     const std::string& message) {
     if (stopped) {
-      return STATUS_MANAGER_STOPPED;
+      return -ESRCH;
     }
     auto message_wrapper = std::make_unique<message_wrapper_t>(conn_name, topic, message, nullptr);
     if (messages.push(message_wrapper.get())) {
       std::ignore = message_wrapper.release();
       ++queued;
-      return STATUS_OK;
+      return 0;
     }
-    return STATUS_QUEUE_FULL;
+    return -EBUSY;
   }
   
   int publish_with_confirm(const std::string& conn_name, 
@@ -641,15 +634,15 @@ public:
     const std::string& message,
     reply_callback_t cb) {
     if (stopped) {
-      return STATUS_MANAGER_STOPPED;
+      return -ESRCH;
     }
     auto message_wrapper = std::make_unique<message_wrapper_t>(conn_name, topic, message, cb);
     if (messages.push(message_wrapper.get())) {
       std::ignore = message_wrapper.release();
       ++queued;
-      return STATUS_OK;
+      return 0;
     }
-    return STATUS_QUEUE_FULL;
+    return -EBUSY;
   }
 
   // dtor wait for thread to stop
@@ -658,6 +651,9 @@ public:
     stopped = true;
     runner.join();
     messages.consume_all(delete_message);
+    std::for_each(connections.begin(), connections.end(), [](auto& conn_pair) {
+        conn_pair.second->status = STATUS_CONNECTION_CLOSED;
+      });
   }
 
   // get the number of connections
@@ -725,7 +721,7 @@ int publish(const std::string& conn_name,
     const std::string& topic,
     const std::string& message) {
   std::shared_lock lock(s_manager_mutex);
-  if (!s_manager) return STATUS_MANAGER_STOPPED;
+  if (!s_manager) return -ESRCH;
   return s_manager->publish(conn_name, topic, message);
 }
 
@@ -734,7 +730,7 @@ int publish_with_confirm(const std::string& conn_name,
     const std::string& message,
     reply_callback_t cb) {
   std::shared_lock lock(s_manager_mutex);
-  if (!s_manager) return STATUS_MANAGER_STOPPED;
+  if (!s_manager) return -ESRCH;
   return s_manager->publish_with_confirm(conn_name, topic, message, cb);
 }