&Name=<topic-name>
&push-endpoint=<endpoint>
[&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
- [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker]
+ [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker|routable]
[&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false]
[&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
[&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
+ - "routable": message is considered "delivered" if broker can route to a consumer
- Kafka endpoint
::
- PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
+ PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
Request parameters:
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
+ - "routable": message is considered "delivered" if broker can route to a consumer
- Kafka endpoint
::
- PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]
+ PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]
Request parameters:
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
+ - "routable": message is considered "delivered" if broker can route to a consumer
- Kafka endpoint
};
std::string to_string(const connection_id_t& id) {
- return id.host+":"+std::to_string(id.port)+"/"+id.vhost;
+ return id.host+":"+std::to_string(id.port)+id.vhost;
}
// connection_t state cleaner
mutable std::atomic<int> ref_count;
CephContext* cct;
CallbackList callbacks;
+ ceph::coarse_real_clock::time_point next_reconnect;
+ bool mandatory;
// default ctor
connection_t() :
reply_type(AMQP_RESPONSE_NORMAL),
reply_code(RGW_AMQP_NO_REPLY_CODE),
ref_count(0),
- cct(nullptr) {}
+ cct(nullptr),
+ next_reconnect(ceph::coarse_real_clock::now()),
+ mandatory(false)
+ {}
// cleanup of all internal connection resource
// the object can still remain, and internal connection
// utility function to create a new connection
connection_ptr_t create_new_connection(const amqp_connection_info& info,
- const std::string& exchange, CephContext* cct) {
+ const std::string& exchange, bool mandatory_delivery, CephContext* cct) {
// create connection state
connection_ptr_t conn = new connection_t;
conn->exchange = exchange;
conn->user.assign(info.user);
conn->password.assign(info.password);
+ conn->mandatory = mandatory_delivery;
conn->cct = cct;
return create_connection(conn, info);
}
CephContext* const cct;
mutable std::mutex connections_lock;
std::thread runner;
+ const ceph::coarse_real_clock::duration idle_time;
+ const ceph::coarse_real_clock::duration reconnect_time;
void publish_internal(message_wrapper_t* message) {
const std::unique_ptr<message_wrapper_t> msg_owner(message);
CHANNEL_ID,
amqp_cstring_bytes(conn->exchange.c_str()),
amqp_cstring_bytes(message->topic.c_str()),
- 1, // mandatory, TODO: take from conf
+ 0, // does not have to be routable
0, // not immediate
- nullptr,
+ nullptr, // no properties needed
amqp_cstring_bytes(message->message.c_str()));
if (rc == AMQP_STATUS_OK) {
ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
CONFIRMING_CHANNEL_ID,
amqp_cstring_bytes(conn->exchange.c_str()),
amqp_cstring_bytes(message->topic.c_str()),
- 1, // mandatory, TODO: take from conf
+ conn->mandatory,
0, // not immediate
&props,
amqp_cstring_bytes(message->message.c_str()));
// try to reconnect the connection if it has an error
if (!conn->is_ok()) {
- // pointers are used temporarily inside the amqp_connection_info object
- // as read-only values, hence the assignment, and const_cast are safe here
- amqp_connection_info info;
- info.host = const_cast<char*>(conn_it->first.host.c_str());
- info.port = conn_it->first.port;
- info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
- info.user = const_cast<char*>(conn->user.c_str());
- info.password = const_cast<char*>(conn->password.c_str());
- ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
- if (create_connection(conn, info)->is_ok() == false) {
- ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
- // TODO: add error counter for failed retries
- // TODO: add exponential backoff for retries
- } else {
- ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
+ const auto now = ceph::coarse_real_clock::now();
+ if (now >= conn->next_reconnect) {
+ // pointers are used temporarily inside the amqp_connection_info object
+ // as read-only values, hence the assignment, and const_cast are safe here
+ amqp_connection_info info;
+ info.host = const_cast<char*>(conn_it->first.host.c_str());
+ info.port = conn_it->first.port;
+ info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
+ info.user = const_cast<char*>(conn->user.c_str());
+ info.password = const_cast<char*>(conn->password.c_str());
+ ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
+ if (create_connection(conn, info)->is_ok() == false) {
+ ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed. error: " <<
+ status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
+ // TODO: add error counter for failed retries
+ // TODO: add exponential backoff for retries
+ conn->next_reconnect = now + reconnect_time;
+ } else {
+ ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
+ }
}
INCREMENT_AND_CONTINUE(conn_it);
}
}
if (frame.frame_type != AMQP_FRAME_METHOD) {
- ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl;
+ ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
+ << unsigned(frame.frame_type) << dendl;
// handler is for publish confirmation only - handle only method frames
- // TODO: add a counter
INCREMENT_AND_CONTINUE(conn_it);
}
multiple = nack->multiple;
break;
}
+ case AMQP_BASIC_REJECT_METHOD:
+ {
+ result = RGW_AMQP_STATUS_BROKER_NACK;
+ const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
+ tag = reject->delivery_tag;
+ multiple = false;
+ break;
+ }
case AMQP_CONNECTION_CLOSE_METHOD:
// TODO on channel close, no need to reopen the connection
case AMQP_CHANNEL_CLOSE_METHOD:
}
case AMQP_BASIC_RETURN_METHOD:
// message was not delivered, returned to sender
- // TODO: add a counter
- ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl;
+ ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
break;
default:
// unexpected method
- // TODO: add a counter
ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
}
conn->callbacks.erase(tag_it);
}
} else {
- // TODO add counter for acks with no callback
ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
}
// just increment the iterator
}
// if no messages were received or published, sleep for 100ms
if (count == 0 && !incoming_message) {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ std::this_thread::sleep_for(idle_time);
}
}
}
size_t _max_inflight,
size_t _max_queue,
long _usec_timeout,
+ unsigned reconnect_time_ms,
+ unsigned idle_time_ms,
CephContext* _cct) :
max_connections(_max_connections),
max_inflight(_max_inflight),
queued(0),
dequeued(0),
cct(_cct),
- runner(&Manager::run, this) {
+ runner(&Manager::run, this),
+ idle_time(std::chrono::milliseconds(idle_time_ms)),
+ reconnect_time(std::chrono::milliseconds(reconnect_time_ms)) {
// The hashmap has "max connections" as the initial number of buckets,
// and allows for 10 collisions per bucket before rehash.
// This is to prevent rehashing so that iterators are not invalidated
}
// connect to a broker, or reuse an existing connection if already connected
- connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+ connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
if (stopped) {
- // TODO: increment counter
ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
return nullptr;
}
// cache the URL so that parsing could happen in-place
std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
- // TODO: increment counter
ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
return nullptr;
}
const auto it = connections.find(id);
if (it != connections.end()) {
if (it->second->marked_for_deletion) {
- // TODO: increment counter
ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
return nullptr;
} else if (it->second->exchange != exchange) {
- // TODO: increment counter
ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
return nullptr;
}
// connection not found, creating a new one
if (connection_count >= max_connections) {
- // TODO: increment counter
ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
return nullptr;
}
- const auto conn = create_new_connection(info, exchange, cct);
+ const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct);
+ if (!conn->is_ok()) {
+ ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
+ status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
+ }
// create_new_connection must always return a connection object
// even if error occurred during creation.
// in such a case the creation will be retried in the main thread
const std::string& topic,
const std::string& message) {
if (stopped) {
+ ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
return RGW_AMQP_STATUS_MANAGER_STOPPED;
}
if (!conn || !conn->is_ok()) {
+ ldout(cct, 1) << "AMQP publish: no connection" << dendl;
return RGW_AMQP_STATUS_CONNECTION_CLOSED;
}
if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
++queued;
return AMQP_STATUS_OK;
}
+ ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
return RGW_AMQP_STATUS_QUEUE_FULL;
}
const std::string& message,
reply_callback_t cb) {
if (stopped) {
+ ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
return RGW_AMQP_STATUS_MANAGER_STOPPED;
}
if (!conn || !conn->is_ok()) {
+ ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
return RGW_AMQP_STATUS_CONNECTION_CLOSED;
}
if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
++queued;
return AMQP_STATUS_OK;
}
+ ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl;
return RGW_AMQP_STATUS_QUEUE_FULL;
}
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const size_t MAX_QUEUE_DEFAULT = 8192;
+static const long READ_TIMEOUT_USEC = 100;
+static const unsigned IDLE_TIME_MS = 100;
+static const unsigned RECONNECT_TIME_MS = 100;
bool init(CephContext* cct) {
if (s_manager) {
return false;
}
// TODO: take conf from CephContext
- s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
+ s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
+ READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
return true;
}
s_manager = nullptr;
}
-connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
if (!s_manager) return nullptr;
- return s_manager->connect(url, exchange);
+ return s_manager->connect(url, exchange, mandatory_delivery);
}
int publish(connection_ptr_t& conn,
void shutdown();
// connect to an amqp endpoint
-connection_ptr_t connect(const std::string& url, const std::string& exchange);
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery);
// publish a message over a connection that was already created
int publish(connection_ptr_t& conn,
const std::string endpoint;
const std::string topic;
const std::string exchange;
- amqp::connection_ptr_t conn;
ack_level_t ack_level;
- std::string str_ack_level;
+ amqp::connection_ptr_t conn;
- static std::string get_exchange(const RGWHTTPArgs& args) {
+ std::string get_exchange(const RGWHTTPArgs& args) {
bool exists;
const auto exchange = args.get("amqp-exchange", &exists);
if (!exists) {
return exchange;
}
+ ack_level_t get_ack_level(const RGWHTTPArgs& args) {
+ bool exists;
+ const auto& str_ack_level = args.get("amqp-ack-level", &exists);
+ if (!exists || str_ack_level == "broker") {
+ // "broker" is default
+ return ack_level_t::Broker;
+ }
+ if (str_ack_level == "none") {
+ return ack_level_t::None;
+ }
+ if (str_ack_level == "routable") {
+ return ack_level_t::Routable;
+ }
+ throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
+ }
+
// NoAckPublishCR implements async amqp publishing via coroutine
// This coroutine ends when it send the message and does not wait for an ack
class NoAckPublishCR : public RGWCoroutine {
const std::string topic;
amqp::connection_ptr_t conn;
const std::string message;
- [[maybe_unused]] const ack_level_t ack_level; // TODO not used for now
public:
AckPublishCR(CephContext* cct,
const std::string& _topic,
amqp::connection_ptr_t& _conn,
- const std::string& _message,
- ack_level_t _ack_level) :
+ const std::string& _message) :
RGWCoroutine(cct),
- topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {}
+ topic(_topic), conn(_conn), message(_message) {}
// send message to endpoint, waiting for reply
int operate() override {
return nullptr;
}
};
-
+
public:
RGWPubSubAMQPEndpoint(const std::string& _endpoint,
const std::string& _topic,
endpoint(_endpoint),
topic(_topic),
exchange(get_exchange(args)),
- conn(amqp::connect(endpoint, exchange)) {
+ ack_level(get_ack_level(args)),
+ conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker))) {
if (!conn) {
throw configuration_error("AMQP: failed to create connection to: " + endpoint);
}
- bool exists;
- // get ack level
- str_ack_level = args.get("amqp-ack-level", &exists);
- if (!exists || str_ack_level == "broker") {
- // "broker" is default
- ack_level = ack_level_t::Broker;
- } else if (str_ack_level == "none") {
- ack_level = ack_level_t::None;
- } else if (str_ack_level == "routable") {
- ack_level = ack_level_t::Routable;
- } else {
- throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
- }
}
RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
if (ack_level == ack_level_t::None) {
return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
} else {
- // TODO: currently broker and routable are the same - this will require different flags
- // but the same mechanism
- return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level);
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
}
}
if (ack_level == ack_level_t::None) {
return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
} else {
- // TODO: currently broker and routable are the same - this will require different flags
- // but the same mechanism
- return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level);
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
}
}
str += "\nURI: " + endpoint;
str += "\nTopic: " + topic;
str += "\nExchange: " + exchange;
- str += "\nAck Level: " + str_ack_level;
return str;
}
};
kafka::connection_ptr_t conn;
const ack_level_t ack_level;
- static bool get_verify_ssl(const RGWHTTPArgs& args) {
+ bool get_verify_ssl(const RGWHTTPArgs& args) {
bool exists;
auto str_verify_ssl = args.get("verify-ssl", &exists);
if (!exists) {
throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
}
- static bool get_use_ssl(const RGWHTTPArgs& args) {
+ bool get_use_ssl(const RGWHTTPArgs& args) {
bool exists;
auto str_use_ssl = args.get("use-ssl", &exists);
if (!exists) {
throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl);
}
- static ack_level_t get_ack_level(const RGWHTTPArgs& args) {
+ ack_level_t get_ack_level(const RGWHTTPArgs& args) {
bool exists;
// get ack level
const auto str_ack_level = args.get("kafka-ack-level", &exists);
topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
topic_arn1 = topic_conf1.set_config()
# without acks from broker
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
topic_arn2 = topic_conf2.set_config()
# create s3 notification
if skip_push_tests:
return SkipTest("PubSub push tests don't run in teuthology")
hostname = get_ip()
- zones, _ = init_env(require_ps=False)
+ master_zone, _ = init_env(require_ps=False)
realm = get_realm()
zonegroup = realm.master_zonegroup()
# create bucket
bucket_name = gen_bucket_name()
- bucket = zones[0].create_bucket(bucket_name)
+ bucket = master_zone.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX
# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address
- topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+ topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
'TopicArn': topic_arn,
'Events': ['s3:ObjectRemoved:*']
}]
- s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+ s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)
keys = list(bucket.list())
start_time = time.time()
- delete_all_objects(zones[0].conn, bucket_name)
+ delete_all_objects(master_zone.conn, bucket_name)
time_diff = time.time() - start_time
print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
topic_conf.del_config()
s3_notification_conf.del_config(notification=notification_name)
# delete the bucket
- zones[0].delete_bucket(bucket_name)
+ master_zone.delete_bucket(bucket_name)
http_server.close()
# create s3 topic
endpoint_address = 'amqp://' + hostname
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
# create s3 topic
endpoint_address = 'amqp://' + hostname
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()
# create s3 notification
TEST_F(TestAMQP, ConnectionOK)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost", "ex1");
+ conn = amqp::connect("amqp://localhost", "ex1", false);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, ConnectionReuse)
{
- amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1");
+ amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false);
EXPECT_TRUE(conn1);
const auto connection_number = amqp::get_connection_count();
- amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1");
+ amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false);
EXPECT_TRUE(conn2);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
auto rc = amqp::publish(conn1, "topic", "message");
TEST_F(TestAMQP, NameResolutionFail)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://kaboom", "ex1");
+ conn = amqp::connect("amqp://kaboom", "ex1", false);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, InvalidPort)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost:1234", "ex1");
+ conn = amqp::connect("amqp://localhost:1234", "ex1", false);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, InvalidHost)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://0.0.0.1", "ex1");
+ conn = amqp::connect("amqp://0.0.0.1", "ex1", false);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, InvalidVhost)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost/kaboom", "ex1");
+ conn = amqp::connect("amqp://localhost/kaboom", "ex1", false);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::set_valid_host("127.0.0.1");
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1");
+ conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::set_valid_host("127.0.0.2");
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1");
+ conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, URLParseError)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("http://localhost", "ex1");
+ conn = amqp::connect("http://localhost", "ex1", false);
EXPECT_FALSE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
auto rc = amqp::publish(conn, "topic", "message");
TEST_F(TestAMQP, ExchangeMismatch)
{
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("http://localhost", "ex2");
+ conn = amqp::connect("http://localhost", "ex2", false);
EXPECT_FALSE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number);
auto rc = amqp::publish(conn, "topic", "message");
while (remaining_connections > 0) {
const auto host = "127.10.0." + std::to_string(remaining_connections);
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_TRUE(conn);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_EQ(rc, 0);
{
const std::string host = "toomany";
amqp_mock::set_valid_host(host);
- amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+ amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_FALSE(conn);
auto rc = amqp::publish(conn, "topic", "message");
EXPECT_LT(rc, 0);
callback_invoked = false;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
EXPECT_EQ(rc, 0);
callback_invoked = false;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_TRUE(conn);
const auto NUMBER_OF_CALLS = 2000;
for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_TRUE(conn);
const auto NUMBER_OF_CALLS = 100;
for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_TRUE(conn);
amqp_mock::set_multiple(59);
const auto NUMBER_OF_CALLS = 100;
callbacks_invoked = 0;
const std::string host("localhost1");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_TRUE(conn);
amqp_mock::set_multiple(59);
const auto NUMBER_OF_CALLS = 100;
amqp_mock::REPLY_ACK = false;
const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
amqp_mock::FAIL_NEXT_WRITE = true;
const std::string host("localhost2");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);
const auto current_connections = amqp::get_connection_count();
const std::string host("localhost3");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
EXPECT_TRUE(amqp::disconnect(conn));
{
const std::string host = "192.168.0.1";
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://"+host, "ex1");
+ conn = amqp::connect("amqp://"+host, "ex1", false);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
{
const int port = 9999;
const auto connection_number = amqp::get_connection_count();
- conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1");
+ conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false);
EXPECT_TRUE(conn);
EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
auto rc = amqp::publish(conn, "topic", "message");
amqp_mock::FAIL_NEXT_WRITE = true;
const std::string host("localhost4");
amqp_mock::set_valid_host(host);
- conn = amqp::connect("amqp://" + host, "ex1");
+ conn = amqp::connect("amqp://" + host, "ex1", false);
EXPECT_TRUE(conn);
auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
EXPECT_EQ(rc, 0);