#define dout_subsys ceph_subsys_rgw_notification
-// TODO investigation, not necessarily issues:
-// (1) in case of single threaded writer context use spsc_queue
-// (2) check performance of emptying queue to local list, and go over the list and publish
-// (3) use std::shared_mutex (c++17) or equivalent for the connections lock
-
// comparison operator between topic pointer and name
bool operator==(const rd_kafka_topic_t* rkt, const std::string& name) {
return name == std::string_view(rd_kafka_topic_name(rkt));
}
+// this is the inverse of rd_kafka_errno2err
+// see: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.c
+inline int rd_kafka_err2errno(rd_kafka_resp_err_t err) {
+ if (err == 0) return 0;
+ switch (err) {
+ case RD_KAFKA_RESP_ERR__INVALID_ARG:
+ return EINVAL;
+ case RD_KAFKA_RESP_ERR__CONFLICT:
+ return EBUSY;
+ case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC:
+ return ENOENT;
+ case RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION:
+ return ESRCH;
+ case RD_KAFKA_RESP_ERR__TIMED_OUT:
+ case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT:
+ return ETIMEDOUT;
+ case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE:
+ return EMSGSIZE;
+ case RD_KAFKA_RESP_ERR__QUEUE_FULL:
+ return ENOBUFS;
+ default:
+ return EIO;
+ }
+}
+
namespace rgw::kafka {
-// status codes for publishing
-static const int STATUS_CONNECTION_CLOSED = -0x1002;
-static const int STATUS_QUEUE_FULL = -0x1003;
-static const int STATUS_MAX_INFLIGHT = -0x1004;
-static const int STATUS_MANAGER_STOPPED = -0x1005;
-static const int STATUS_CONNECTION_IDLE = -0x1006;
-// status code for connection opening
-static const int STATUS_CONF_ALLOC_FAILED = -0x2001;
-static const int STATUS_CONF_REPLCACE = -0x2002;
+enum Status {
+ STATUS_CONNECTION_CLOSED = -0x1002,
+ STATUS_CONNECTION_IDLE = -0x1006,
+ STATUS_CONF_ALLOC_FAILED = -0x2001,
+};
-static const int STATUS_OK = 0x0;
+// convert int status to string - both RGW and librdkafka values
+inline std::string status_to_string(int s) {
+ switch (s) {
+ case STATUS_CONNECTION_CLOSED:
+ return "Kafka connection closed";
+ case STATUS_CONF_ALLOC_FAILED:
+ return "Kafka configuration allocation failed";
+ case STATUS_CONNECTION_IDLE:
+ return "Kafka connection idle";
+ default:
+ return std::string(rd_kafka_err2str(static_cast<rd_kafka_resp_err_t>(s)));
+ }
+}
+
+// convert int status to errno - both RGW and librdkafka values
+inline int status_to_errno(int s) {
+ if (s == 0) return 0;
+ switch (s) {
+ case STATUS_CONNECTION_CLOSED:
+ return -EIO;
+ case STATUS_CONF_ALLOC_FAILED:
+ return -ENOMEM;
+ case STATUS_CONNECTION_IDLE:
+ return -EIO;
+ default:
+ return -rd_kafka_err2errno(static_cast<rd_kafka_resp_err_t>(s));
+ }
+}
// struct for holding the callback and its tag in the callback list
struct reply_callback_with_tag_t {
typedef std::vector<reply_callback_with_tag_t> CallbackList;
-// struct for holding the connection state object as well as list of topics
-// it is used inside an intrusive ref counted pointer (boost::intrusive_ptr)
-// since references to deleted objects may still exist in the calling code
struct connection_t {
rd_kafka_t* producer = nullptr;
- rd_kafka_conf_t* temp_conf = nullptr;
std::vector<rd_kafka_topic_t*> topics;
uint64_t delivery_tag = 1;
- int status = STATUS_OK;
+ int status = 0;
CephContext* const cct;
CallbackList callbacks;
const std::string broker;
// cleanup of all internal connection resource
// the object can still remain, and internal connection
// resources created again on successful reconnection
- void destroy(int s) {
- status = s;
- // destroy temporary conf (if connection was never established)
- if (temp_conf) {
- rd_kafka_conf_destroy(temp_conf);
- return;
- }
- if (!is_ok()) {
+ void destroy() {
+ if (!producer) {
// no producer, nothing to destroy
return;
}
- // wait for all remaining acks/nacks
- rd_kafka_flush(producer, 5*1000 /* wait for max 5 seconds */);
+ // wait for 500ms to try and handle pending callbacks
+ rd_kafka_flush(producer, 500);
// destroy all topics
std::for_each(topics.begin(), topics.end(), [](auto topic) {rd_kafka_topic_destroy(topic);});
+ topics.clear();
// destroy producer
rd_kafka_destroy(producer);
producer = nullptr;
// fire all remaining callbacks (if not fired by rd_kafka_flush)
std::for_each(callbacks.begin(), callbacks.end(), [this](auto& cb_tag) {
- cb_tag.cb(status);
- ldout(cct, 20) << "Kafka destroy: invoking callback with tag="
+ ldout(cct, 1) << "Kafka destroy: invoking callback with tag: "
<< cb_tag.tag << " for: " << broker
- << " with status: " << status << dendl;
+ << " with status: " << status_to_string(status) << dendl;
+ cb_tag.cb(status_to_errno(status));
});
callbacks.clear();
delivery_tag = 1;
ldout(cct, 20) << "Kafka destroy: complete for: " << broker << dendl;
}
- bool is_ok() const {
- return (producer != nullptr);
- }
-
// ctor for setting immutable values
connection_t(CephContext* _cct, const std::string& _broker, bool _use_ssl, bool _verify_ssl,
const boost::optional<const std::string&>& _ca_location,
// dtor also destroys the internals
~connection_t() {
- destroy(status);
+ destroy();
}
};
-// convert int status to string - including RGW specific values
-std::string status_to_string(int s) {
- switch (s) {
- case STATUS_OK:
- return "STATUS_OK";
- case STATUS_CONNECTION_CLOSED:
- return "RGW_KAFKA_STATUS_CONNECTION_CLOSED";
- case STATUS_QUEUE_FULL:
- return "RGW_KAFKA_STATUS_QUEUE_FULL";
- case STATUS_MAX_INFLIGHT:
- return "RGW_KAFKA_STATUS_MAX_INFLIGHT";
- case STATUS_MANAGER_STOPPED:
- return "RGW_KAFKA_STATUS_MANAGER_STOPPED";
- case STATUS_CONF_ALLOC_FAILED:
- return "RGW_KAFKA_STATUS_CONF_ALLOC_FAILED";
- case STATUS_CONF_REPLCACE:
- return "RGW_KAFKA_STATUS_CONF_REPLCACE";
- case STATUS_CONNECTION_IDLE:
- return "RGW_KAFKA_STATUS_CONNECTION_IDLE";
- }
- return std::string(rd_kafka_err2str((rd_kafka_resp_err_t)s));
-}
-
void message_callback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque) {
ceph_assert(opaque);
const auto conn = reinterpret_cast<connection_t*>(opaque);
const auto result = rkmessage->err;
+ if (rkmessage->err == 0) {
+ ldout(conn->cct, 20) << "Kafka run: ack received with result=" <<
+ rd_kafka_err2str(result) << dendl;
+ } else {
+ ldout(conn->cct, 1) << "Kafka run: nack received with result=" <<
+ rd_kafka_err2str(result) << dendl;
+ }
+
if (!rkmessage->_private) {
- ldout(conn->cct, 20) << "Kafka run: n/ack received, (no callback) with result=" << result << dendl;
+ ldout(conn->cct, 20) << "Kafka run: n/ack received without a callback" << dendl;
return;
}
const auto tag_it = std::find(callbacks_begin, callbacks_end, *tag);
if (tag_it != callbacks_end) {
ldout(conn->cct, 20) << "Kafka run: n/ack received, invoking callback with tag=" <<
- *tag << " and result=" << rd_kafka_err2str(result) << dendl;
- tag_it->cb(result);
+ *tag << dendl;
+ tag_it->cb(-rd_kafka_err2errno(result));
conn->callbacks.erase(tag_it);
} else {
// TODO add counter for acks with no callback
// utility function to create a producer, when the connection object already exists
bool new_producer(connection_t* conn) {
// reset all status codes
- conn->status = STATUS_OK;
- char errstr[512] = {0};
+ conn->status = 0;
+ ceph_assert(!conn->producer);
- conn->temp_conf = rd_kafka_conf_new();
- if (!conn->temp_conf) {
+ auto kafka_conf_deleter = [](rd_kafka_conf_t* conf) {rd_kafka_conf_destroy(conf);};
+
+ std::unique_ptr<rd_kafka_conf_t, decltype(kafka_conf_deleter)> conf(rd_kafka_conf_new(), kafka_conf_deleter);
+ if (!conf) {
+ ldout(conn->cct, 1) << "Kafka connect: failed to allocate configuration" << dendl;
conn->status = STATUS_CONF_ALLOC_FAILED;
return false;
}
+ char errstr[512] = {0};
+
// set message timeout
// according to documentation, value of zero will expire the message based on retries.
// however, testing with librdkafka v1.6.1 did not expire the message in that case. hence, a value of zero is changed to 1ms
constexpr std::uint64_t min_message_timeout = 1;
const auto message_timeout = std::max(min_message_timeout, conn->cct->_conf->rgw_kafka_message_timeout);
- if (rd_kafka_conf_set(conn->temp_conf, "message.timeout.ms",
+ if (rd_kafka_conf_set(conf.get(), "message.timeout.ms",
std::to_string(message_timeout).c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+
// get list of brokers based on the bootstrap broker
- if (rd_kafka_conf_set(conn->temp_conf, "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
-
+ if (rd_kafka_conf_set(conf.get(), "bootstrap.servers", conn->broker.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+
if (conn->use_ssl) {
if (!conn->user.empty()) {
// use SSL+SASL
- if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
- rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
- rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ if (rd_kafka_conf_set(conf.get(), "security.protocol", "SASL_SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+ rd_kafka_conf_set(conf.get(), "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+ rd_kafka_conf_set(conf.get(), "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL+SASL security" << dendl;
if (conn->mechanism) {
- if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl;
} else {
- if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl;
}
} else {
// use only SSL
- if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ if (rd_kafka_conf_set(conf.get(), "security.protocol", "SSL", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: successfully configured SSL security" << dendl;
}
if (conn->ca_location) {
- if (rd_kafka_conf_set(conn->temp_conf, "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ if (rd_kafka_conf_set(conf.get(), "ssl.ca.location", conn->ca_location->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: successfully configured CA location" << dendl;
} else {
ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl;
}
// Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
- // if (rd_kafka_conf_set(conn->temp_conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ // if (rd_kafka_conf_set(conn->conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
} else if (!conn->user.empty()) {
// use SASL+PLAINTEXT
- if (rd_kafka_conf_set(conn->temp_conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
- rd_kafka_conf_set(conn->temp_conf, "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
- rd_kafka_conf_set(conn->temp_conf, "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ if (rd_kafka_conf_set(conf.get(), "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+ rd_kafka_conf_set(conf.get(), "sasl.username", conn->user.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK ||
+ rd_kafka_conf_set(conf.get(), "sasl.password", conn->password.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL_PLAINTEXT" << dendl;
if (conn->mechanism) {
- if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", conn->mechanism->c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: successfully configured SASL mechanism" << dendl;
} else {
- if (rd_kafka_conf_set(conn->temp_conf, "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ if (rd_kafka_conf_set(conf.get(), "sasl.mechanism", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
ldout(conn->cct, 20) << "Kafka connect: using default SASL mechanism" << dendl;
}
}
// set the global callback for delivery success/fail
- rd_kafka_conf_set_dr_msg_cb(conn->temp_conf, message_callback);
-
+ rd_kafka_conf_set_dr_msg_cb(conf.get(), message_callback);
// set the global opaque pointer to be the connection itself
- rd_kafka_conf_set_opaque(conn->temp_conf, conn);
-
+ rd_kafka_conf_set_opaque(conf.get(), conn);
// redirect kafka logs to RGW
- rd_kafka_conf_set_log_cb(conn->temp_conf, log_callback);
+ rd_kafka_conf_set_log_cb(conf.get(), log_callback);
// define poll callback to allow reconnect
- rd_kafka_conf_set_error_cb(conn->temp_conf, poll_err_callback);
- // create the producer
- if (conn->producer) {
- ldout(conn->cct, 5) << "Kafka connect: producer already exists. detroying the existing before creating a new one" << dendl;
- conn->destroy(STATUS_CONF_REPLCACE);
- }
- conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conn->temp_conf, errstr, sizeof(errstr));
+ rd_kafka_conf_set_error_cb(conf.get(), poll_err_callback);
+ // create the producer and move conf ownership to it
+ conn->producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf.release(), errstr, sizeof(errstr));
if (!conn->producer) {
conn->status = rd_kafka_last_error();
ldout(conn->cct, 1) << "Kafka connect: failed to create producer: " << errstr << dendl;
rd_kafka_set_log_level(conn->producer, 7);
}
- // conf ownership passed to producer
- conn->temp_conf = nullptr;
return true;
conf_error:
const std::unique_ptr<message_wrapper_t> msg_deleter(message);
const auto conn_it = connections.find(message->conn_name);
if (conn_it == connections.end()) {
- ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue. error: " << STATUS_CONNECTION_CLOSED << dendl;
+ ldout(cct, 1) << "Kafka publish: connection was deleted while message was in the queue" << dendl;
if (message->cb) {
- message->cb(STATUS_CONNECTION_CLOSED);
+ message->cb(status_to_errno(STATUS_CONNECTION_CLOSED));
}
return;
}
conn->timestamp = ceph_clock_now();
- if (!conn->is_ok()) {
+ ceph_assert(conn->producer);
+ if (conn->status != 0) {
// connection had an issue while message was in the queue
// TODO add error stats
- ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. error: " << status_to_string(conn->status) << dendl;
+ ldout(conn->cct, 1) << "Kafka publish: producer was closed while message was in the queue. with status: " << status_to_string(conn->status) << dendl;
if (message->cb) {
- message->cb(conn->status);
+ message->cb(status_to_errno(conn->status));
}
return;
}
topic = rd_kafka_topic_new(conn->producer, message->topic.c_str(), nullptr);
if (!topic) {
const auto err = rd_kafka_last_error();
- ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: " << status_to_string(err) << dendl;
+ ldout(conn->cct, 1) << "Kafka publish: failed to create topic: " << message->topic << " error: "
+ << rd_kafka_err2str(err) << "(" << err << ")" << dendl;
if (message->cb) {
- message->cb(err);
+ message->cb(-rd_kafka_err2errno(err));
}
- conn->destroy(err);
return;
}
// TODO use the topics list as an LRU cache
tag);
if (rc == -1) {
const auto err = rd_kafka_last_error();
- ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
- // TODO: dont error on full queue, and don't destroy connection, retry instead
+ ldout(conn->cct, 1) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
// immediatly invoke callback on error if needed
if (message->cb) {
- message->cb(err);
+ message->cb(-rd_kafka_err2errno(err));
}
- conn->destroy(err);
delete tag;
return;
}
} else {
// immediately invoke callback with error - this is not a connection error
ldout(conn->cct, 1) << "Kafka publish (with callback): failed with error: callback queue full" << dendl;
- message->cb(STATUS_MAX_INFLIGHT);
+ message->cb(-EBUSY);
// tag will be deleted when the global callback is invoked
}
} else {
// coverity[leaked_storage:SUPPRESS]
}
- // the managers thread:
- // (1) empty the queue of messages to be published
- // (2) loop over all connections and read acks
- // (3) manages deleted connections
- // (4) TODO reconnect on connection errors
- // (5) TODO cleanup timedout callbacks
void run() noexcept {
while (!stopped) {
// publish all messages in the queue
auto reply_count = 0U;
- const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
+ const auto send_count = messages.consume_all([this](auto message){this->publish_internal(message);});
dequeued += send_count;
ConnectionList::iterator conn_it;
ConnectionList::const_iterator end_it;
// Checking the connection idleness
if(conn->timestamp.sec() + conn->cct->_conf->rgw_kafka_connection_idle < ceph_clock_now()) {
- ldout(conn->cct, 20) << "kafka run: deleting a connection due to idle behaviour: " << ceph_clock_now() << dendl;
+ ldout(conn->cct, 20) << "kafka run: deleting a connection that was idle for: " <<
+ conn->cct->_conf->rgw_kafka_connection_idle << " seconds. last activity was at: " << conn->timestamp << dendl;
std::lock_guard lock(connections_lock);
conn->status = STATUS_CONNECTION_IDLE;
conn_it = connections.erase(conn_it);
continue;
}
- // try to reconnect the connection if it has an error
- if (!conn->is_ok()) {
- ldout(conn->cct, 10) << "Kafka run: connection status is: " << status_to_string(conn->status) << dendl;
- const auto& broker = conn_it->first;
- ldout(conn->cct, 20) << "Kafka run: retry connection" << dendl;
- if (new_producer(conn.get()) == false) {
- ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry failed" << dendl;
- // TODO: add error counter for failed retries
- // TODO: add exponential backoff for retries
- } else {
- ldout(conn->cct, 10) << "Kafka run: connection (" << broker << ") retry successful" << dendl;
- }
- ++conn_it;
- continue;
- }
-
reply_count += rd_kafka_poll(conn->producer, read_timeout);
// just increment the iterator
// sleep if no messages were received or published across all connection
if (send_count == 0 && reply_count == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(read_timeout*3));
+ // TODO: add exponential backoff to sleep time
}
}
}
ldout(cct, 1) << "Kafka connect: max connections exceeded" << dendl;
return false;
}
- // create_connection must always return a connection object
- // even if error occurred during creation.
- // in such a case the creation will be retried in the main thread
+
+ auto conn = std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism);
+ if (!new_producer(conn.get())) {
+ ldout(cct, 10) << "Kafka connect: producer creation failed in new connection" << dendl;
+ return false;
+ }
++connection_count;
+ connections.emplace(broker, std::move(conn));
+
ldout(cct, 10) << "Kafka connect: new connection is created. Total connections: " << connection_count << dendl;
- auto conn = connections.emplace(broker, std::make_unique<connection_t>(cct, broker, use_ssl, verify_ssl, ca_location, user, password, mechanism)).first->second.get();
- if (!new_producer(conn)) {
- ldout(cct, 10) << "Kafka connect: new connection is created. But producer creation failed. will retry" << dendl;
- }
return true;
}
const std::string& topic,
const std::string& message) {
if (stopped) {
- return STATUS_MANAGER_STOPPED;
+ return -ESRCH;
}
auto message_wrapper = std::make_unique<message_wrapper_t>(conn_name, topic, message, nullptr);
if (messages.push(message_wrapper.get())) {
std::ignore = message_wrapper.release();
++queued;
- return STATUS_OK;
+ return 0;
}
- return STATUS_QUEUE_FULL;
+ return -EBUSY;
}
int publish_with_confirm(const std::string& conn_name,
const std::string& message,
reply_callback_t cb) {
if (stopped) {
- return STATUS_MANAGER_STOPPED;
+ return -ESRCH;
}
auto message_wrapper = std::make_unique<message_wrapper_t>(conn_name, topic, message, cb);
if (messages.push(message_wrapper.get())) {
std::ignore = message_wrapper.release();
++queued;
- return STATUS_OK;
+ return 0;
}
- return STATUS_QUEUE_FULL;
+ return -EBUSY;
}
// dtor wait for thread to stop
stopped = true;
runner.join();
messages.consume_all(delete_message);
+ std::for_each(connections.begin(), connections.end(), [](auto& conn_pair) {
+ conn_pair.second->status = STATUS_CONNECTION_CLOSED;
+ });
}
// get the number of connections
const std::string& topic,
const std::string& message) {
std::shared_lock lock(s_manager_mutex);
- if (!s_manager) return STATUS_MANAGER_STOPPED;
+ if (!s_manager) return -ESRCH;
return s_manager->publish(conn_name, topic, message);
}
const std::string& message,
reply_callback_t cb) {
std::shared_lock lock(s_manager_mutex);
- if (!s_manager) return STATUS_MANAGER_STOPPED;
+ if (!s_manager) return -ESRCH;
return s_manager->publish_with_confirm(conn_name, topic, message, cb);
}