From: Yuval Lifshitz Date: Thu, 7 Mar 2024 11:49:10 +0000 (+0000) Subject: rgw/kafka: do not destroy the connection on errors X-Git-Tag: v20.0.0~1866^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1b6e8501565972bc833118a3fe34ef2f43f400ad;p=ceph.git rgw/kafka: do not destroy the connection on errors as well as other simplifications: * do not store temporary configuration in the connection object. just use as a local variable * do not create a connection without a producer other improvements: * copy to a local list before publishing * convert internal error codes to errno Fixes: https://tracker.ceph.com/issues/66017 Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/rgw_kafka.cc b/src/rgw/rgw_kafka.cc index ce1b273d8b78b..9a356d9c6f04e 100644 --- a/src/rgw/rgw_kafka.cc +++ b/src/rgw/rgw_kafka.cc @@ -18,29 +18,72 @@ #define dout_subsys ceph_subsys_rgw_notification -// TODO investigation, not necessarily issues: -// (1) in case of single threaded writer context use spsc_queue -// (2) check performance of emptying queue to local list, and go over the list and publish -// (3) use std::shared_mutex (c++17) or equivalent for the connections lock - // comparison operator between topic pointer and name bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) { return name == std::string_view(rd_kafka_topic_name(rkt)); } +// this is the inverse of rd_kafka_errno2err +// see: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.c +inline int rd_kafka_err2errno(rd_kafka_resp_err_t err) { + if (err == 0) return 0; + switch (err) { + case RD_KAFKA_RESP_ERR__INVALID_ARG: + return EINVAL; + case RD_KAFKA_RESP_ERR__CONFLICT: + return EBUSY; + case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC: + return ENOENT; + case RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: + return ESRCH; + case RD_KAFKA_RESP_ERR__TIMED_OUT: + case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: + return ETIMEDOUT; + case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE: + return EMSGSIZE; + case RD_KAFKA_RESP_ERR__QUEUE_FULL: + return ENOBUFS; + default: + return EIO; + } +} + namespace rgw::kafka { -// status codes for publishing -static const int STATUS_CONNECTION_CLOSED = -0x1002; -static const int STATUS_QUEUE_FULL = -0x1003; -static const int STATUS_MAX_INFLIGHT = -0x1004; -static const int STATUS_MANAGER_STOPPED = -0x1005; -static const int STATUS_CONNECTION_IDLE = -0x1006; -// status code for connection opening -static const int STATUS_CONF_ALLOC_FAILED = -0x2001; -static const int STATUS_CONF_REPLCACE = -0x2002; +enum Status { + STATUS_CONNECTION_CLOSED = -0x1002, + STATUS_CONNECTION_IDLE = -0x1006, + STATUS_CONF_ALLOC_FAILED = -0x2001, +}; -static const int STATUS_OK = 0x0; +// convert int status to string - both RGW and librdkafka values +inline std::string status_to_string(int s) { + switch (s) { + case STATUS_CONNECTION_CLOSED: + return "Kafka connection closed"; + case STATUS_CONF_ALLOC_FAILED: + return "Kafka configuration allocation failed"; + case STATUS_CONNECTION_IDLE: + return "Kafka connection idle"; + default: + return std::string(rd_kafka_err2str(static_cast(s))); + } +} + +// convert int status to errno - both RGW and librdkafka values +inline int status_to_errno(int s) { + if (s == 0) return 0; + switch (s) { + case STATUS_CONNECTION_CLOSED: + return -EIO; + case STATUS_CONF_ALLOC_FAILED: + return -ENOMEM; + case STATUS_CONNECTION_IDLE: + return -EIO; + default: + return -rd_kafka_err2errno(static_cast(s)); + } +} // struct for holding the callback and its tag in the callback list struct reply_callback_with_tag_t { @@ -56,15 +99,11 @@ struct reply_callback_with_tag_t { typedef std::vector CallbackList; -// struct for holding the connection state object as well as list of topics -// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr) -// since references to deleted objects may still exist in the calling code struct connection_t { rd_kafka_t* producer = nullptr; - rd_kafka_conf_t* temp_conf = nullptr; std::vector topics; uint64_t delivery_tag = 1; - int status = STATUS_OK; + int status = 0; CephContext* const cct; CallbackList callbacks; const std::string broker; @@ -79,40 +118,31 @@ struct connection_t { // cleanup of all internal connection resource // the object can still remain, and internal connection // resources created again on successful reconnection - void destroy(int s) { - status = s; - // destroy temporary conf (if connection was never established) - if (temp_conf) { - rd_kafka_conf_destroy(temp_conf); - return; - } - if (!is_ok()) { + void destroy() { + if (!producer) { // no producer, nothing to destroy return; } - // wait for all remaining acks/nacks - rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */); + // wait for 500ms to try and handle pending callbacks + rd_kafka_flush(producer, 500); // destroy all topics std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);}); + topics.clear(); // destroy producer rd_kafka_destroy(producer); producer = nullptr; // fire all remaining callbacks (if not fired by rd_kafka_flush) std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) { - cb_tag.cb(status); - ldout(cct, 20) << "Kafka destroy: invoking callback with tag=" + ldout(cct, 1) << "Kafka destroy: invoking callback with tag: " << cb_tag.tag << " for: " << broker - << " with status: " << status << dendl; + << " with status: " << status_to_string(status) << dendl; + cb_tag.cb(status_to_errno(status)); }); callbacks.clear(); delivery_tag = 1; ldout(cct, 20) << "Kafka destroy: complete for: " << broker << dendl; } - bool is_ok() const { - return (producer != nullptr); - } - // ctor for setting immutable values connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl, const boost::optional& _ca_location, @@ -121,41 +151,26 @@ struct connection_t { // dtor also destroys the internals ~connection_t() { - destroy(status); + destroy(); } }; -// convert int status to string - including RGW specific values -std::string status_to_string(int s) { - switch (s) { - case STATUS_OK: - return "STATUS_OK"; - case STATUS_CONNECTION_CLOSED: - return "RGW_KAFKA_STATUS_CONNECTION_CLOSED"; - case STATUS_QUEUE_FULL: - return "RGW_KAFKA_STATUS_QUEUE_FULL"; - case STATUS_MAX_INFLIGHT: - return "RGW_KAFKA_STATUS_MAX_INFLIGHT"; - case STATUS_MANAGER_STOPPED: - return "RGW_KAFKA_STATUS_MANAGER_STOPPED"; - case STATUS_CONF_ALLOC_FAILED: - return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED"; - case STATUS_CONF_REPLCACE: - return "RGW_KAFKA_STATUS_CONF_REPLCACE"; - case STATUS_CONNECTION_IDLE: - return "RGW_KAFKA_STATUS_CONNECTION_IDLE"; - } - return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s)); -} - void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) { ceph_assert(opaque); const auto conn = reinterpret_cast(opaque); const auto result = rkmessage->err; + if (rkmessage->err == 0) { + ldout(conn->cct, 20) << "Kafka run: ack received with result=" << + rd_kafka_err2str(result) << dendl; + } else { + ldout(conn->cct, 1) << "Kafka run: nack received with result=" << + rd_kafka_err2str(result) << dendl; + } + if (!rkmessage->_private) { - ldout(conn->cct, 20) << "Kafka run: n/ack received, (no callback) with result=" << result << dendl; + ldout(conn->cct, 20) << "Kafka run: n/ack received without a callback" << dendl; return; } @@ -165,8 +180,8 @@ void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag); if (tag_it != callbacks_end) { ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" << - *tag << " and result=" << rd_kafka_err2str(result) << dendl; - tag_it->cb(result); + *tag << dendl; + tag_it->cb(-rd_kafka_err2errno(result)); conn->callbacks.erase(tag_it); } else { // TODO add counter for acks with no callback @@ -201,88 +216,88 @@ using connection_t_ptr = std::unique_ptr; // utility function to create a producer, when the connection object already exists bool new_producer(connection_t* conn) { // reset all status codes - conn->status = STATUS_OK; - char errstr[512] = {0}; + conn->status = 0; + ceph_assert(!conn->producer); - conn->temp_conf = rd_kafka_conf_new(); - if (!conn->temp_conf) { + auto kafka_conf_deleter = [](rd_kafka_conf_t* conf) {rd_kafka_conf_destroy(conf);}; + + std::unique_ptr conf(rd_kafka_conf_new(), kafka_conf_deleter); + if (!conf) { + ldout(conn->cct, 1) << "Kafka connect: failed to allocate configuration" << dendl; conn->status = STATUS_CONF_ALLOC_FAILED; return false; } + char errstr[512] = {0}; + // set message timeout // according to documentation, value of zero will expire the message based on retries. // however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms constexpr std::uint64_t min_message_timeout = 1; const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout); - if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms", + if (rd_kafka_conf_set(conf.get(), "message.timeout.ms", std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + // get list of brokers based on the bootstrap broker - if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; - + if (rd_kafka_conf_set(conf.get(), "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + if (conn->use_ssl) { if (!conn->user.empty()) { // use SSL+SASL - if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || - rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || - rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + if (rd_kafka_conf_set(conf.get(), "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conf.get(), "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conf.get(), "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl; if (conn->mechanism) { - if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl; } else { - if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl; } } else { // use only SSL - if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + if (rd_kafka_conf_set(conf.get(), "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl; } if (conn->ca_location) { - if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + if (rd_kafka_conf_set(conf.get(), "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl; } else { ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl; } // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call - // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + // if (rd_kafka_conf_set(conn->conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl; } else if (!conn->user.empty()) { // use SASL+PLAINTEXT - if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || - rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || - rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + if (rd_kafka_conf_set(conf.get(), "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conf.get(), "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || + rd_kafka_conf_set(conf.get(), "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL_PLAINTEXT" << dendl; if (conn->mechanism) { - if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl; } else { - if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; + if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error; ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl; } } // set the global callback for delivery success/fail - rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback); - + rd_kafka_conf_set_dr_msg_cb(conf.get(), message_callback); // set the global opaque pointer to be the connection itself - rd_kafka_conf_set_opaque(conn->temp_conf, conn); - + rd_kafka_conf_set_opaque(conf.get(), conn); // redirect kafka logs to RGW - rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback); + rd_kafka_conf_set_log_cb(conf.get(), log_callback); // define poll callback to allow reconnect - rd_kafka_conf_set_error_cb(conn->temp_conf, poll_err_callback); - // create the producer - if (conn->producer) { - ldout(conn->cct, 5) << "Kafka connect: producer already exists. detroying the existing before creating a new one" << dendl; - conn->destroy(STATUS_CONF_REPLCACE); - } - conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr)); + rd_kafka_conf_set_error_cb(conf.get(), poll_err_callback); + // create the producer and move conf ownership to it + conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf.release(), errstr, sizeof(errstr)); if (!conn->producer) { conn->status = rd_kafka_last_error(); ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl; @@ -302,8 +317,6 @@ bool new_producer(connection_t* conn) { rd_kafka_set_log_level(conn->producer, 7); } - // conf ownership passed to producer - conn->temp_conf = nullptr; return true; conf_error: @@ -350,9 +363,9 @@ private: const std::unique_ptr msg_deleter(message); const auto conn_it = connections.find(message->conn_name); if (conn_it == connections.end()) { - ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue. error: " << STATUS_CONNECTION_CLOSED << dendl; + ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue" << dendl; if (message->cb) { - message->cb(STATUS_CONNECTION_CLOSED); + message->cb(status_to_errno(STATUS_CONNECTION_CLOSED)); } return; } @@ -360,12 +373,13 @@ private: conn->timestamp = ceph_clock_now(); - if (!conn->is_ok()) { + ceph_assert(conn->producer); + if (conn->status != 0) { // connection had an issue while message was in the queue // TODO add error stats - ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. error: " << status_to_string(conn->status) << dendl; + ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. with status: " << status_to_string(conn->status) << dendl; if (message->cb) { - message->cb(conn->status); + message->cb(status_to_errno(conn->status)); } return; } @@ -377,11 +391,11 @@ private: topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr); if (!topic) { const auto err = rd_kafka_last_error(); - ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl; + ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " + << rd_kafka_err2str(err) << "(" << err << ")" << dendl; if (message->cb) { - message->cb(err); + message->cb(-rd_kafka_err2errno(err)); } - conn->destroy(err); return; } // TODO use the topics list as an LRU cache @@ -411,13 +425,11 @@ private: tag); if (rc == -1) { const auto err = rd_kafka_last_error(); - ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl; - // TODO: dont error on full queue, and don't destroy connection, retry instead + ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl; // immediatly invoke callback on error if needed if (message->cb) { - message->cb(err); + message->cb(-rd_kafka_err2errno(err)); } - conn->destroy(err); delete tag; return; } @@ -432,7 +444,7 @@ private: } else { // immediately invoke callback with error - this is not a connection error ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl; - message->cb(STATUS_MAX_INFLIGHT); + message->cb(-EBUSY); // tag will be deleted when the global callback is invoked } } else { @@ -441,18 +453,12 @@ private: // coverity[leaked_storage:SUPPRESS] } - // the managers thread: - // (1) empty the queue of messages to be published - // (2) loop over all connections and read acks - // (3) manages deleted connections - // (4) TODO reconnect on connection errors - // (5) TODO cleanup timedout callbacks void run() noexcept { while (!stopped) { // publish all messages in the queue auto reply_count = 0U; - const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1)); + const auto send_count = messages.consume_all([this](auto message){this->publish_internal(message);}); dequeued += send_count; ConnectionList::iterator conn_it; ConnectionList::const_iterator end_it; @@ -472,7 +478,8 @@ private: // Checking the connection idleness if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) { - ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl; + ldout(conn->cct, 20) << "kafka run: deleting a connection that was idle for: " << + conn->cct->_conf->rgw_kafka_connection_idle << " seconds. last activity was at: " << conn->timestamp << dendl; std::lock_guard lock(connections_lock); conn->status = STATUS_CONNECTION_IDLE; conn_it = connections.erase(conn_it); @@ -480,22 +487,6 @@ private: continue; } - // try to reconnect the connection if it has an error - if (!conn->is_ok()) { - ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl; - const auto& broker = conn_it->first; - ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl; - if (new_producer(conn.get()) == false) { - ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl; - // TODO: add error counter for failed retries - // TODO: add exponential backoff for retries - } else { - ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successful" << dendl; - } - ++conn_it; - continue; - } - reply_count += rd_kafka_poll(conn->producer, read_timeout); // just increment the iterator @@ -504,6 +495,7 @@ private: // sleep if no messages were received or published across all connection if (send_count == 0 && reply_count == 0) { std::this_thread::sleep_for(std::chrono::milliseconds(read_timeout*3)); + // TODO: add exponential backoff to sleep time } } } @@ -608,15 +600,16 @@ public: ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl; return false; } - // create_connection must always return a connection object - // even if error occurred during creation. - // in such a case the creation will be retried in the main thread + + auto conn = std::make_unique(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism); + if (!new_producer(conn.get())) { + ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl; + return false; + } ++connection_count; + connections.emplace(broker, std::move(conn)); + ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl; - auto conn = connections.emplace(broker, std::make_unique(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get(); - if (!new_producer(conn)) { - ldout(cct, 10) << "Kafka connect: new connection is created. But producer creation failed. will retry" << dendl; - } return true; } @@ -625,15 +618,15 @@ public: const std::string& topic, const std::string& message) { if (stopped) { - return STATUS_MANAGER_STOPPED; + return -ESRCH; } auto message_wrapper = std::make_unique(conn_name, topic, message, nullptr); if (messages.push(message_wrapper.get())) { std::ignore = message_wrapper.release(); ++queued; - return STATUS_OK; + return 0; } - return STATUS_QUEUE_FULL; + return -EBUSY; } int publish_with_confirm(const std::string& conn_name, @@ -641,15 +634,15 @@ public: const std::string& message, reply_callback_t cb) { if (stopped) { - return STATUS_MANAGER_STOPPED; + return -ESRCH; } auto message_wrapper = std::make_unique(conn_name, topic, message, cb); if (messages.push(message_wrapper.get())) { std::ignore = message_wrapper.release(); ++queued; - return STATUS_OK; + return 0; } - return STATUS_QUEUE_FULL; + return -EBUSY; } // dtor wait for thread to stop @@ -658,6 +651,9 @@ public: stopped = true; runner.join(); messages.consume_all(delete_message); + std::for_each(connections.begin(), connections.end(), [](auto& conn_pair) { + conn_pair.second->status = STATUS_CONNECTION_CLOSED; + }); } // get the number of connections @@ -725,7 +721,7 @@ int publish(const std::string& conn_name, const std::string& topic, const std::string& message) { std::shared_lock lock(s_manager_mutex); - if (!s_manager) return STATUS_MANAGER_STOPPED; + if (!s_manager) return -ESRCH; return s_manager->publish(conn_name, topic, message); } @@ -734,7 +730,7 @@ int publish_with_confirm(const std::string& conn_name, const std::string& message, reply_callback_t cb) { std::shared_lock lock(s_manager_mutex); - if (!s_manager) return STATUS_MANAGER_STOPPED; + if (!s_manager) return -ESRCH; return s_manager->publish_with_confirm(conn_name, topic, message, cb); }