while (!stopped) {
// publish all messages in the queue
- auto event_count = 0U;
- const auto count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
- dequeued += count;
+ auto reply_count = 0U;
+ const auto send_count = messages.consume_all(std::bind(&Manager::publish_internal, this, std::placeholders::_1));
+ dequeued += send_count;
ConnectionList::iterator conn_it;
ConnectionList::const_iterator end_it;
{
INCREMENT_AND_CONTINUE(conn_it);
}
- event_count += rd_kafka_poll(conn->producer, read_timeout_ms);
+ reply_count += rd_kafka_poll(conn->producer, read_timeout_ms);
// just increment the iterator
++conn_it;
}
// if no messages were received or published
// across all connection, sleep for 100ms
- if (count == 0 && event_count) {
+ if (send_count == 0 && reply_count == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
# configure logging for the tests module
log = logging.getLogger(__name__)
-skip_push_tests = False
+skip_push_tests = True
####################################
# utility functions for pubsub tests