// vim: ts=8 sw=2 smarttab
#include "rgw_amqp.h"
+#include <atomic>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
MessageQueue messages;
std::atomic<size_t> queued;
std::atomic<size_t> dequeued;
+ CephContext* const cct;
mutable std::mutex connections_lock;
std::thread runner;
for (;conn_it != end_it;) {
auto& conn = conn_it->second;
- ldout(conn->cct, 20) << "AMQP run: start processing connection" << dendl;
// delete the connection if marked for deletion
if (conn->marked_for_deletion) {
ldout(conn->cct, 10) << "AMQP run: connection is deleted" << dendl;
if (rc == AMQP_STATUS_TIMEOUT) {
// TODO mark connection as idle
- ldout(conn->cct, 20) << "AMQP run: connection is idle" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
}
Manager(size_t _max_connections,
size_t _max_inflight,
size_t _max_queue,
- long _usec_timeout) :
+ long _usec_timeout,
+ CephContext* _cct) :
max_connections(_max_connections),
max_inflight(_max_inflight),
max_queue(_max_queue),
messages(max_queue),
queued(0),
dequeued(0),
+ cct(_cct),
runner(&Manager::run, this) {
// The hashmap has "max connections" as the initial number of buckets,
// and allows for 10 collisions per bucket before rehash.
}
// connect to a broker, or reuse an existing connection if already connected
- connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct) {
+ connection_ptr_t connect(const std::string& url, const std::string& exchange) {
if (stopped) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
// singleton manager
// note that the manager itself is not a singleton, and multiple instances may co-exist
+// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
static Manager* s_manager = nullptr;
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const size_t MAX_QUEUE_DEFAULT = 8192;
-class SingletonManager {
- std::mutex manager_creation_lock;
- public:
- SingletonManager(CephContext* cct=nullptr) {
- // TODO get parameters from conf
- std::lock_guard<std::mutex> l(manager_creation_lock);
- if (s_manager == nullptr) {
- s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100);
- }
- }
-};
+bool init(CephContext* cct) {
+ if (s_manager) {
+ return false;
+ }
+ // TODO: take conf from CephContext
+ s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
+ return true;
+}
+
+void shutdown() {
+ delete s_manager;
+ s_manager = nullptr;
+}
-connection_ptr_t connect(const std::string& url, const std::string& exchange, CephContext* cct) {
- static const SingletonManager singleton(cct);
- return s_manager->connect(url, exchange, cct);
+connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+ if (!s_manager) return nullptr;
+ return s_manager->connect(url, exchange);
}
int publish(connection_ptr_t& conn,
const std::string& topic,
const std::string& message) {
- if (s_manager == nullptr) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
return s_manager->publish(conn, topic, message);
}
const std::string& topic,
const std::string& message,
reply_callback_t cb) {
- if (s_manager == nullptr) return RGW_AMQP_STATUS_MANAGER_STOPPED;
+ if (!s_manager) return RGW_AMQP_STATUS_MANAGER_STOPPED;
return s_manager->publish_with_confirm(conn, topic, message, cb);
}
size_t get_connection_count() {
- if (s_manager == nullptr) return 0;
+ if (!s_manager) return 0;
return s_manager->get_connection_count();
}
size_t get_inflight() {
- if (s_manager == nullptr) return 0;
+ if (!s_manager) return 0;
return s_manager->get_inflight();
}
size_t get_queued() {
- if (s_manager == nullptr) return 0;
+ if (!s_manager) return 0;
return s_manager->get_queued();
}
size_t get_dequeued() {
- if (s_manager == nullptr) return 0;
+ if (!s_manager) return 0;
return s_manager->get_dequeued();
}
size_t get_max_connections() {
- if (s_manager == nullptr) return MAX_CONNECTIONS_DEFAULT;
+ if (!s_manager) return MAX_CONNECTIONS_DEFAULT;
return s_manager->max_connections;
}
size_t get_max_inflight() {
- if (s_manager == nullptr) return MAX_INFLIGHT_DEFAULT;
+ if (!s_manager) return MAX_INFLIGHT_DEFAULT;
return s_manager->max_inflight;
}
size_t get_max_queue() {
- if (s_manager == nullptr) return MAX_QUEUE_DEFAULT;
+ if (!s_manager) return MAX_QUEUE_DEFAULT;
return s_manager->max_queue;
}
bool disconnect(connection_ptr_t& conn) {
- if (s_manager == nullptr) return false;
+ if (!s_manager) return false;
return s_manager->disconnect(conn);
}
const std::chrono::milliseconds wait_time(300);
+class CctCleaner {
+ CephContext* cct;
+public:
+ CctCleaner(CephContext* _cct) : cct(_cct) {}
+ ~CctCleaner() {
+#ifdef WITH_SEASTAR
+ delete cct;
+#else
+ cct->put();
+#endif
+ }
+};
+
auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT);
-TEST(AMQP_Connection, ConnectionOK)
+CctCleaner cleaner(cct);
+
+class TestAMQP : public ::testing::Test {
+protected:
+ void SetUp() override {
+ ASSERT_TRUE(amqp::init(cct));
+ }
+
+ void TearDown() override {
+ amqp::shutdown();
+ }
+};
+
+TEST_F(TestAMQP, ConnectionOK)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_EQ(rc, 0);
}
-TEST(AMQP_Connection, ConnectionReuse)
+TEST_F(TestAMQP, ConnectionReuse)
{
+ amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1");
+ EXPECT_TRUE(conn1);
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://localhost", "ex1", cct);
- EXPECT_TRUE(conn);
+ amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1");
+ EXPECT_TRUE(conn2);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
- auto rc = amqp::publish(conn, "topic", "message");
+ auto rc = amqp::publish(conn1, "topic", "message");
EXPECT_EQ(rc, 0);
}
-TEST(AMQP_Connection, NameResolutionFail)
+TEST_F(TestAMQP, NameResolutionFail)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://kaboom", "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://kaboom", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
}
-TEST(AMQP_Connection, InvalidPort)
+TEST_F(TestAMQP, InvalidPort)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:1234", "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:1234", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
}
-TEST(AMQP_Connection, InvalidHost)
+TEST_F(TestAMQP, InvalidHost)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://0.0.0.1", "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://0.0.0.1", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
}
-TEST(AMQP_Connection, InvalidVhost)
+TEST_F(TestAMQP, InvalidVhost)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://localhost/kaboom", "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://localhost/kaboom", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
}
-TEST(AMQP_Connection, UserPassword)
+TEST_F(TestAMQP, UserPassword)
{
amqp_mock::set_valid_host("127.0.0.1");
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::set_valid_host("127.0.0.2");
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::set_valid_host("localhost");
}
-TEST(AMQP_Connection, URLParseError)
+TEST_F(TestAMQP, URLParseError)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex1");
EXPECT_FALSE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
}
-TEST(AMQP_Connection, ExchangeMismatch)
+TEST_F(TestAMQP, ExchangeMismatch)
{
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex2", cct);
+ amqp::connection_ptr_t conn = amqp::connect("http://localhost", "ex2");
EXPECT_FALSE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
}
-TEST(AMQP_Connection, MaxConnections)
+TEST_F(TestAMQP, MaxConnections)
{
// fill up all connections
std::vector<amqp::connection_ptr_t> connections;
while (remaining_connections > 0) {
const auto host = "127.10.0." + std::to_string(remaining_connections);
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_EQ(rc, 0);
{
const std::string host = "toomany";
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_FALSE(conn);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
callback_invoked = true;
}
-TEST(AMQP_PublishAndWait, ReceiveAck)
+TEST_F(TestAMQP, ReceiveAck)
{
callback_invoked = false;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
amqp_mock::set_valid_host("localhost");
}
-TEST(AMQP_PublishAndWait, ReceiveNack)
+TEST_F(TestAMQP, ReceiveNack)
{
callback_invoked = false;
amqp_mock::REPLY_ACK = false;
const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
amqp_mock::set_valid_host("localhost");
}
-TEST(AMQP_PublishAndWait, FailWrite)
+TEST_F(TestAMQP, FailWrite)
{
callback_invoked = false;
amqp_mock::FAIL_NEXT_WRITE = true;
const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
amqp_mock::set_valid_host("localhost");
}
-TEST(AMQP_PublishAndWait, ClosedConnection)
+TEST_F(TestAMQP, ClosedConnection)
{
callback_invoked = false;
const auto current_connections = amqp::get_connection_count();
const std::string host("localhost3");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
EXPECT_TRUE(amqp::disconnect(conn));
amqp_mock::set_valid_host("localhost");
}
-TEST(AMQP_ConnectionRetry, InvalidHost)
+TEST_F(TestAMQP, RetryInvalidHost)
{
const std::string host = "192.168.0.1";
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://"+host, "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://"+host, "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::set_valid_host("localhost");
}
-TEST(AMQP_ConnectionRetry, InvalidPort)
+TEST_F(TestAMQP, RetryInvalidPort)
{
const int port = 9999;
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1");
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::set_valid_port(5672);
}
-TEST(AMQP_ConnectionRetry, FailWrite)
+TEST_F(TestAMQP, RetryFailWrite)
{
callback_invoked = false;
amqp_mock::FAIL_NEXT_WRITE = true;
const std::string host("localhost4");
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", cct);
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);