]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/amqp: fix the "routable" delivery mode 34376/head
authorYuval Lifshitz <yuvalif@yahoo.com>
Thu, 2 Apr 2020 13:32:06 +0000 (16:32 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Sun, 19 Apr 2020 07:25:56 +0000 (10:25 +0300)
this option was not exposed to the configuration API
however, it was still set, as hardcoded value in the code
(details:
https://www.rabbitmq.com/confirms.html#publisher-confirms)

Fixes: https://tracker.ceph.com/issues/44915
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
src/rgw/rgw_amqp.cc
src/rgw/rgw_amqp.h
src/rgw/rgw_pubsub_push.cc
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/test_rgw_amqp.cc

index 4c47c211394e97c0055bccd9e8b0c1a892e79dac..4101b8dd826e47965ef2089e91d82a6d384f5702 100644 (file)
@@ -67,7 +67,7 @@ To update a topic, use the same command used for topic creation, with the topic
    &Name=<topic-name>
    &push-endpoint=<endpoint>
    [&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
-   [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker]
+   [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker|routable]
    [&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false]
    [&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
    [&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
@@ -93,10 +93,11 @@ Request parameters:
  - port defaults to: 5672
  - vhost defaults to: "/"
  - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
 
   - "none": message is considered "delivered" if sent to broker
   - "broker": message is considered "delivered" if acked by broker (default)
+  - "routable": message is considered "delivered" if broker can route to a consumer
 
 - Kafka endpoint 
 
index 61cd4def207ff93c54b66c97216a18c1f92dec08..fd3b9f021e64e48e0313bcac1906e16305f60cfb 100644 (file)
@@ -150,7 +150,7 @@ To update a topic, use the same command used for topic creation, with the topic
 
 ::
 
-   PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
+   PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
 
 Request parameters:
 
@@ -173,10 +173,11 @@ The endpoint URI may include parameters depending with the type of endpoint:
  - port defaults to: 5672
  - vhost defaults to: "/"
  - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
 
   - "none": message is considered "delivered" if sent to broker
   - "broker": message is considered "delivered" if acked by broker (default)
+  - "routable": message is considered "delivered" if broker can route to a consumer
 
 - Kafka endpoint 
 
@@ -348,7 +349,7 @@ Creates a new subscription.
 
 ::
 
-   PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]
+   PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]
 
 Request parameters:
 
@@ -370,10 +371,11 @@ The endpoint URI may include parameters depending with the type of endpoint:
  - port defaults to: 5672
  - vhost defaults to: "/"
  - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
+ - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:
 
   - "none": message is considered "delivered" if sent to broker
   - "broker": message is considered "delivered" if acked by broker (default)
+  - "routable": message is considered "delivered" if broker can route to a consumer
 
 - Kafka endpoint 
 
index a5f6ca00d84f411f257607bcf6980ea01864fa22..78446e88f68e667443406700a2659aafdabb57cf 100644 (file)
@@ -72,7 +72,7 @@ struct connection_id_t {
 };
 
 std::string to_string(const connection_id_t& id) {
-    return id.host+":"+std::to_string(id.port)+"/"+id.vhost;
+    return id.host+":"+std::to_string(id.port)+id.vhost;
 }
 
 // connection_t state cleaner
@@ -124,6 +124,8 @@ struct connection_t {
   mutable std::atomic<int> ref_count;
   CephContext* cct;
   CallbackList callbacks;
+  ceph::coarse_real_clock::time_point next_reconnect;
+  bool mandatory;
 
   // default ctor
   connection_t() :
@@ -135,7 +137,10 @@ struct connection_t {
     reply_type(AMQP_RESPONSE_NORMAL),
     reply_code(RGW_AMQP_NO_REPLY_CODE),
     ref_count(0),
-    cct(nullptr) {}
+    cct(nullptr),
+    next_reconnect(ceph::coarse_real_clock::now()),
+    mandatory(false)
+  {}
 
   // cleanup of all internal connection resource
   // the object can still remain, and internal connection
@@ -489,12 +494,13 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio
 
 // utility function to create a new connection
 connection_ptr_t create_new_connection(const amqp_connection_info& info, 
-    const std::string& exchange, CephContext* cct) { 
+    const std::string& exchange, bool mandatory_delivery, CephContext* cct) { 
   // create connection state
   connection_ptr_t conn = new connection_t;
   conn->exchange = exchange;
   conn->user.assign(info.user);
   conn->password.assign(info.password);
+  conn->mandatory = mandatory_delivery;
   conn->cct = cct;
   return create_connection(conn, info);
 }
@@ -542,6 +548,8 @@ private:
   CephContext* const cct;
   mutable std::mutex connections_lock;
   std::thread runner;
+  const ceph::coarse_real_clock::duration idle_time;
+  const ceph::coarse_real_clock::duration reconnect_time;
 
   void publish_internal(message_wrapper_t* message) {
     const std::unique_ptr<message_wrapper_t> msg_owner(message);
@@ -563,9 +571,9 @@ private:
         CHANNEL_ID,
         amqp_cstring_bytes(conn->exchange.c_str()),
         amqp_cstring_bytes(message->topic.c_str()),
-        1, // mandatory, TODO: take from conf
+        0, // does not have to be routable
         0, // not immediate
-        nullptr,
+        nullptr, // no properties needed
         amqp_cstring_bytes(message->message.c_str()));
       if (rc == AMQP_STATUS_OK) {
         ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
@@ -589,7 +597,7 @@ private:
       CONFIRMING_CHANNEL_ID,
       amqp_cstring_bytes(conn->exchange.c_str()),
       amqp_cstring_bytes(message->topic.c_str()),
-      1, // mandatory, TODO: take from conf
+      conn->mandatory,
       0, // not immediate
       &props,
       amqp_cstring_bytes(message->message.c_str()));
@@ -653,21 +661,26 @@ private:
 
         // try to reconnect the connection if it has an error
         if (!conn->is_ok()) {
-          // pointers are used temporarily inside the amqp_connection_info object
-          // as read-only values, hence the assignment, and const_cast are safe here
-          amqp_connection_info info;
-          info.host = const_cast<char*>(conn_it->first.host.c_str());
-          info.port = conn_it->first.port;
-          info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
-          info.user = const_cast<char*>(conn->user.c_str());
-          info.password = const_cast<char*>(conn->password.c_str());
-          ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
-          if (create_connection(conn, info)->is_ok() == false) {
-            ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
-            // TODO: add error counter for failed retries
-            // TODO: add exponential backoff for retries
-          } else {
-            ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
+          const auto now = ceph::coarse_real_clock::now();
+          if (now >= conn->next_reconnect) {
+            // pointers are used temporarily inside the amqp_connection_info object
+            // as read-only values, hence the assignment, and const_cast are safe here
+            amqp_connection_info info;
+            info.host = const_cast<char*>(conn_it->first.host.c_str());
+            info.port = conn_it->first.port;
+            info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
+            info.user = const_cast<char*>(conn->user.c_str());
+            info.password = const_cast<char*>(conn->password.c_str());
+            ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
+            if (create_connection(conn, info)->is_ok() == false) {
+              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed. error: " <<
+                status_to_string(conn->status) << " (" << conn->reply_code << ")"  << dendl;
+              // TODO: add error counter for failed retries
+              // TODO: add exponential backoff for retries
+              conn->next_reconnect = now + reconnect_time;
+            } else {
+              ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
+            }
           }
           INCREMENT_AND_CONTINUE(conn_it);
         }
@@ -693,9 +706,9 @@ private:
         }
 
         if (frame.frame_type != AMQP_FRAME_METHOD) {
-          ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl;
+          ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: " 
+            << unsigned(frame.frame_type) << dendl;
           // handler is for publish confirmation only - handle only method frames
-          // TODO: add a counter
           INCREMENT_AND_CONTINUE(conn_it);
         }
 
@@ -722,6 +735,14 @@ private:
               multiple = nack->multiple;
               break;
             }
+          case AMQP_BASIC_REJECT_METHOD:                                                   
+            {                                                                              
+              result = RGW_AMQP_STATUS_BROKER_NACK;                                        
+              const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;      
+              tag = reject->delivery_tag;                                                  
+              multiple = false;                                                            
+              break;
+            }
           case AMQP_CONNECTION_CLOSE_METHOD:
             // TODO on channel close, no need to reopen the connection
           case AMQP_CHANNEL_CLOSE_METHOD:
@@ -733,13 +754,11 @@ private:
             }
           case AMQP_BASIC_RETURN_METHOD:
             // message was not delivered, returned to sender
-            // TODO: add a counter
-            ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl;
+            ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
             INCREMENT_AND_CONTINUE(conn_it);
             break;
           default:
             // unexpected method
-            // TODO: add a counter
             ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
             INCREMENT_AND_CONTINUE(conn_it);
         }
@@ -764,7 +783,6 @@ private:
             conn->callbacks.erase(tag_it);
           }
         } else {
-          // TODO add counter for acks with no callback
           ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
         }
         // just increment the iterator
@@ -772,7 +790,7 @@ private:
       }
       // if no messages were received or published, sleep for 100ms
       if (count == 0 && !incoming_message) {
-        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        std::this_thread::sleep_for(idle_time);
       }
     }
   }
@@ -787,6 +805,8 @@ public:
       size_t _max_inflight,
       size_t _max_queue, 
       long _usec_timeout,
+      unsigned reconnect_time_ms,
+      unsigned idle_time_ms,
       CephContext* _cct) : 
     max_connections(_max_connections),
     max_inflight(_max_inflight),
@@ -799,7 +819,9 @@ public:
     queued(0),
     dequeued(0),
     cct(_cct),
-    runner(&Manager::run, this) {
+    runner(&Manager::run, this),
+    idle_time(std::chrono::milliseconds(idle_time_ms)),
+    reconnect_time(std::chrono::milliseconds(reconnect_time_ms)) {
       // The hashmap has "max connections" as the initial number of buckets, 
       // and allows for 10 collisions per bucket before rehash.
       // This is to prevent rehashing so that iterators are not invalidated 
@@ -829,9 +851,8 @@ public:
   }
 
   // connect to a broker, or reuse an existing connection if already connected
-  connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+  connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
     if (stopped) {
-      // TODO: increment counter
       ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
       return nullptr;
     }
@@ -840,7 +861,6 @@ public:
     // cache the URL so that parsing could happen in-place
     std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
     if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
-      // TODO: increment counter
       ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
       return nullptr;
     }
@@ -850,11 +870,9 @@ public:
     const auto it = connections.find(id);
     if (it != connections.end()) {
       if (it->second->marked_for_deletion) {
-        // TODO: increment counter
         ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
         return nullptr;
       } else if (it->second->exchange != exchange) {
-        // TODO: increment counter
         ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
         return nullptr;
       }
@@ -865,11 +883,14 @@ public:
 
     // connection not found, creating a new one
     if (connection_count >= max_connections) {
-      // TODO: increment counter
       ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
       return nullptr;
     }
-    const auto conn = create_new_connection(info, exchange, cct);
+    const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct);
+    if (!conn->is_ok()) {
+      ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
+              status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
+    }
     // create_new_connection must always return a connection object
     // even if error occurred during creation. 
     // in such a case the creation will be retried in the main thread
@@ -885,15 +906,18 @@ public:
     const std::string& topic,
     const std::string& message) {
     if (stopped) {
+      ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
       return RGW_AMQP_STATUS_MANAGER_STOPPED;
     }
     if (!conn || !conn->is_ok()) {
+      ldout(cct, 1) << "AMQP publish: no connection" << dendl;
       return RGW_AMQP_STATUS_CONNECTION_CLOSED;
     }
     if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
       ++queued;
       return AMQP_STATUS_OK;
     }
+    ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
     return RGW_AMQP_STATUS_QUEUE_FULL;
   }
   
@@ -902,15 +926,18 @@ public:
     const std::string& message,
     reply_callback_t cb) {
     if (stopped) {
+      ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
       return RGW_AMQP_STATUS_MANAGER_STOPPED;
     }
     if (!conn || !conn->is_ok()) {
+      ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
       return RGW_AMQP_STATUS_CONNECTION_CLOSED;
     }
     if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
       ++queued;
       return AMQP_STATUS_OK;
     }
+    ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl;
     return RGW_AMQP_STATUS_QUEUE_FULL;
   }
 
@@ -956,13 +983,17 @@ static Manager* s_manager = nullptr;
 static const size_t MAX_CONNECTIONS_DEFAULT = 256;
 static const size_t MAX_INFLIGHT_DEFAULT = 8192; 
 static const size_t MAX_QUEUE_DEFAULT = 8192;
+static const long READ_TIMEOUT_USEC = 100;
+static const unsigned IDLE_TIME_MS = 100;
+static const unsigned RECONNECT_TIME_MS = 100;
 
 bool init(CephContext* cct) {
   if (s_manager) {
     return false;
   }
   // TODO: take conf from CephContext
-  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
+  s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 
+      READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
   return true;
 }
 
@@ -971,9 +1002,9 @@ void shutdown() {
   s_manager = nullptr;
 }
 
-connection_ptr_t connect(const std::string& url, const std::string& exchange) {
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
   if (!s_manager) return nullptr;
-  return s_manager->connect(url, exchange);
+  return s_manager->connect(url, exchange, mandatory_delivery);
 }
 
 int publish(connection_ptr_t& conn, 
index bbfce2d5dcd66a293699233131ba7dd3e97fcf0b..eaf97ed9dc01acbeb64ebe2957b52d8008babed3 100644 (file)
@@ -30,7 +30,7 @@ bool init(CephContext* cct);
 void shutdown();
 
 // connect to an amqp endpoint
-connection_ptr_t connect(const std::string& url, const std::string& exchange);
+connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery);
 
 // publish a message over a connection that was already created
 int publish(connection_ptr_t& conn,
index 6230330d4a6a32440a0bfe485b6b1c40670e6e7e..0f927e4f46fc3f4dfd1b865960dcb914b2128d42 100644 (file)
@@ -170,11 +170,10 @@ private:
   const std::string endpoint;
   const std::string topic;
   const std::string exchange;
-  amqp::connection_ptr_t conn;
   ack_level_t ack_level;
-  std::string str_ack_level;
+  amqp::connection_ptr_t conn;
 
-  static std::string get_exchange(const RGWHTTPArgs& args) {
+  std::string get_exchange(const RGWHTTPArgs& args) {
     bool exists;
     const auto exchange = args.get("amqp-exchange", &exists);
     if (!exists) {
@@ -183,6 +182,22 @@ private:
     return exchange;
   }
 
+  ack_level_t get_ack_level(const RGWHTTPArgs& args) {
+    bool exists;
+    const auto& str_ack_level = args.get("amqp-ack-level", &exists);
+    if (!exists || str_ack_level == "broker") {
+      // "broker" is default
+      return ack_level_t::Broker;
+    }
+    if (str_ack_level == "none") {
+      return ack_level_t::None;
+    }
+    if (str_ack_level == "routable") {
+      return ack_level_t::Routable;
+    }
+    throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
+  }
+
   // NoAckPublishCR implements async amqp publishing via coroutine
   // This coroutine ends when it send the message and does not wait for an ack
   class NoAckPublishCR : public RGWCoroutine {
@@ -220,16 +235,14 @@ private:
     const std::string topic;
     amqp::connection_ptr_t conn;
     const std::string message;
-    [[maybe_unused]] const ack_level_t ack_level; // TODO not used for now
 
   public:
     AckPublishCR(CephContext* cct,
               const std::string& _topic,
               amqp::connection_ptr_t& _conn,
-              const std::string& _message,
-              ack_level_t _ack_level) :
+              const std::string& _message) :
       RGWCoroutine(cct),
-      topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {}
+      topic(_topic), conn(_conn), message(_message) {}
 
     // send message to endpoint, waiting for reply
     int operate() override {
@@ -273,7 +286,7 @@ private:
       return nullptr;
     }
   };
-
+  
 public:
   RGWPubSubAMQPEndpoint(const std::string& _endpoint,
       const std::string& _topic,
@@ -283,23 +296,11 @@ public:
         endpoint(_endpoint), 
         topic(_topic),
         exchange(get_exchange(args)),
-        conn(amqp::connect(endpoint, exchange)) {
+        ack_level(get_ack_level(args)),
+        conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker))) {
     if (!conn) { 
       throw configuration_error("AMQP: failed to create connection to: " + endpoint);
     }
-    bool exists;
-    // get ack level
-    str_ack_level = args.get("amqp-ack-level", &exists);
-    if (!exists || str_ack_level == "broker") {
-      // "broker" is default
-      ack_level = ack_level_t::Broker;
-    } else if (str_ack_level == "none") {
-      ack_level = ack_level_t::None;
-    } else if (str_ack_level == "routable") {
-      ack_level = ack_level_t::Routable;
-    } else {
-      throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level);
-    }
   }
 
   RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override {
@@ -307,9 +308,7 @@ public:
     if (ack_level == ack_level_t::None) {
       return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
     } else {
-      // TODO: currently broker and routable are the same - this will require different flags
-      // but the same mechanism
-      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level);
+      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
     }
   }
   
@@ -318,9 +317,7 @@ public:
     if (ack_level == ack_level_t::None) {
       return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
     } else {
-      // TODO: currently broker and routable are the same - this will require different flags
-      // but the same mechanism
-      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level);
+      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
     }
   }
 
@@ -405,7 +402,6 @@ public:
     str += "\nURI: " + endpoint;
     str += "\nTopic: " + topic;
     str += "\nExchange: " + exchange;
-    str += "\nAck Level: " + str_ack_level;
     return str;
   }
 };
@@ -428,7 +424,7 @@ private:
   kafka::connection_ptr_t conn;
   const ack_level_t ack_level;
 
-  static bool get_verify_ssl(const RGWHTTPArgs& args) {
+  bool get_verify_ssl(const RGWHTTPArgs& args) {
     bool exists;
     auto str_verify_ssl = args.get("verify-ssl", &exists);
     if (!exists) {
@@ -445,7 +441,7 @@ private:
     throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl);
   }
 
-  static bool get_use_ssl(const RGWHTTPArgs& args) {
+  bool get_use_ssl(const RGWHTTPArgs& args) {
     bool exists;
     auto str_use_ssl = args.get("use-ssl", &exists);
     if (!exists) {
@@ -462,7 +458,7 @@ private:
     throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl);
   }
 
-  static ack_level_t get_ack_level(const RGWHTTPArgs& args) {
+  ack_level_t get_ack_level(const RGWHTTPArgs& args) {
     bool exists;
     // get ack level
     const auto str_ack_level = args.get("kafka-ack-level", &exists);
index b3aead9c38c1664ac4bf7ac70ff801ef35bfaefd..065ab15b54d44e7e1e4df597bfd1084f08dacf91 100644 (file)
@@ -1371,7 +1371,7 @@ def test_ps_s3_notification_push_amqp_on_master():
     topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn1 = topic_conf1.set_config()
     # without acks from broker
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
     topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn2 = topic_conf2.set_config()
     # create s3 notification
@@ -1724,7 +1724,7 @@ def test_ps_s3_notification_multi_delete_on_master():
     if skip_push_tests:
         return SkipTest("PubSub push tests don't run in teuthology")
     hostname = get_ip()
-    zones, _  = init_env(require_ps=False)
+    master_zone, _  = init_env(require_ps=False)
     realm = get_realm()
     zonegroup = realm.master_zonegroup()
     
@@ -1737,13 +1737,13 @@ def test_ps_s3_notification_multi_delete_on_master():
     
     # create bucket
     bucket_name = gen_bucket_name()
-    bucket = zones[0].create_bucket(bucket_name)
+    bucket = master_zone.create_bucket(bucket_name)
     topic_name = bucket_name + TOPIC_SUFFIX
 
     # create s3 topic
     endpoint_address = 'http://'+host+':'+str(port)
     endpoint_args = 'push-endpoint='+endpoint_address
-    topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
+    topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
     notification_name = bucket_name + NOTIFICATION_SUFFIX
@@ -1751,7 +1751,7 @@ def test_ps_s3_notification_multi_delete_on_master():
                         'TopicArn': topic_arn,
                         'Events': ['s3:ObjectRemoved:*']
                        }]
-    s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list)
+    s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list)
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
@@ -1769,7 +1769,7 @@ def test_ps_s3_notification_multi_delete_on_master():
     keys = list(bucket.list())
 
     start_time = time.time()
-    delete_all_objects(zones[0].conn, bucket_name)
+    delete_all_objects(master_zone.conn, bucket_name)
     time_diff = time.time() - start_time
     print('average time for deletion + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
 
@@ -1783,7 +1783,7 @@ def test_ps_s3_notification_multi_delete_on_master():
     topic_conf.del_config()
     s3_notification_conf.del_config(notification=notification_name)
     # delete the bucket
-    zones[0].delete_bucket(bucket_name)
+    master_zone.delete_bucket(bucket_name)
     http_server.close()
 
 
@@ -2873,7 +2873,7 @@ def test_ps_s3_metadata_on_master():
 
     # create s3 topic
     endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
     topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
@@ -2974,7 +2974,7 @@ def test_ps_s3_tags_on_master():
 
     # create s3 topic
     endpoint_address = 'amqp://' + hostname
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker'
+    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable'
     topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args)
     topic_arn = topic_conf.set_config()
     # create s3 notification
index 0f11b817e8d19e97b0c844c183e98f423a5d1aa2..13bab823f02cf198ff0c8f7ed1b674879051a0c8 100644 (file)
@@ -61,7 +61,7 @@ protected:
 TEST_F(TestAMQP, ConnectionOK)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost", "ex1");
+  conn = amqp::connect("amqp://localhost", "ex1", false);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -70,10 +70,10 @@ TEST_F(TestAMQP, ConnectionOK)
 
 TEST_F(TestAMQP, ConnectionReuse)
 {
-  amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1");
+  amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false);
   EXPECT_TRUE(conn1);
   const auto connection_number = amqp::get_connection_count();
-  amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1");
+  amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false);
   EXPECT_TRUE(conn2);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
   auto rc = amqp::publish(conn1, "topic", "message");
@@ -83,7 +83,7 @@ TEST_F(TestAMQP, ConnectionReuse)
 TEST_F(TestAMQP, NameResolutionFail)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://kaboom", "ex1");
+  conn = amqp::connect("amqp://kaboom", "ex1", false);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -93,7 +93,7 @@ TEST_F(TestAMQP, NameResolutionFail)
 TEST_F(TestAMQP, InvalidPort)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost:1234", "ex1");
+  conn = amqp::connect("amqp://localhost:1234", "ex1", false);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -103,7 +103,7 @@ TEST_F(TestAMQP, InvalidPort)
 TEST_F(TestAMQP, InvalidHost)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://0.0.0.1", "ex1");
+  conn = amqp::connect("amqp://0.0.0.1", "ex1", false);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -113,7 +113,7 @@ TEST_F(TestAMQP, InvalidHost)
 TEST_F(TestAMQP, InvalidVhost)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost/kaboom", "ex1");
+  conn = amqp::connect("amqp://localhost/kaboom", "ex1", false);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -125,7 +125,7 @@ TEST_F(TestAMQP, UserPassword)
   amqp_mock::set_valid_host("127.0.0.1");
   {
     const auto connection_number = amqp::get_connection_count();
-    conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1");
+    conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false);
     EXPECT_TRUE(conn);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
     auto rc = amqp::publish(conn, "topic", "message");
@@ -135,7 +135,7 @@ TEST_F(TestAMQP, UserPassword)
   amqp_mock::set_valid_host("127.0.0.2");
   {
     const auto connection_number = amqp::get_connection_count();
-    conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1");
+    conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false);
     EXPECT_TRUE(conn);
     EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
     auto rc = amqp::publish(conn, "topic", "message");
@@ -147,7 +147,7 @@ TEST_F(TestAMQP, UserPassword)
 TEST_F(TestAMQP, URLParseError)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("http://localhost", "ex1");
+  conn = amqp::connect("http://localhost", "ex1", false);
   EXPECT_FALSE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -157,7 +157,7 @@ TEST_F(TestAMQP, URLParseError)
 TEST_F(TestAMQP, ExchangeMismatch)
 {
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("http://localhost", "ex2");
+  conn = amqp::connect("http://localhost", "ex2", false);
   EXPECT_FALSE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -172,7 +172,7 @@ TEST_F(TestAMQP, MaxConnections)
   while (remaining_connections > 0) {
     const auto host = "127.10.0." + std::to_string(remaining_connections);
     amqp_mock::set_valid_host(host);
-    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false);
     EXPECT_TRUE(conn);
     auto rc = amqp::publish(conn, "topic", "message");
     EXPECT_EQ(rc, 0);
@@ -184,7 +184,7 @@ TEST_F(TestAMQP, MaxConnections)
   {
     const std::string host = "toomany";
     amqp_mock::set_valid_host(host);
-    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1");
+    amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false);
     EXPECT_FALSE(conn);
     auto rc = amqp::publish(conn, "topic", "message");
     EXPECT_LT(rc, 0);
@@ -246,7 +246,7 @@ TEST_F(TestAMQP, ReceiveAck)
   callback_invoked = false;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1", false);
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack);
   EXPECT_EQ(rc, 0);
@@ -260,7 +260,7 @@ TEST_F(TestAMQP, ImplicitConnectionClose)
   callback_invoked = false;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1", false);
   EXPECT_TRUE(conn);
   const auto NUMBER_OF_CALLS = 2000;
   for (auto i = 0; i < NUMBER_OF_CALLS; ++i) {
@@ -278,7 +278,7 @@ TEST_F(TestAMQP, ReceiveMultipleAck)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1", false);
   EXPECT_TRUE(conn);
   const auto NUMBER_OF_CALLS = 100;
   for (auto i=0; i < NUMBER_OF_CALLS; ++i) {
@@ -296,7 +296,7 @@ TEST_F(TestAMQP, ReceiveAckForMultiple)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1", false);
   EXPECT_TRUE(conn);
   amqp_mock::set_multiple(59);
   const auto NUMBER_OF_CALLS = 100;
@@ -315,7 +315,7 @@ TEST_F(TestAMQP, DynamicCallback)
   callbacks_invoked = 0;
   const std::string host("localhost1");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1", false);
   EXPECT_TRUE(conn);
   amqp_mock::set_multiple(59);
   const auto NUMBER_OF_CALLS = 100;
@@ -336,7 +336,7 @@ TEST_F(TestAMQP, ReceiveNack)
   amqp_mock::REPLY_ACK = false;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1", false);
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
@@ -353,7 +353,7 @@ TEST_F(TestAMQP, FailWrite)
   amqp_mock::FAIL_NEXT_WRITE = true;
   const std::string host("localhost2");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1", false);
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);
@@ -370,7 +370,7 @@ TEST_F(TestAMQP, ClosedConnection)
   const auto current_connections = amqp::get_connection_count();
   const std::string host("localhost3");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1", false);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), current_connections + 1);
   EXPECT_TRUE(amqp::disconnect(conn));
@@ -389,7 +389,7 @@ TEST_F(TestAMQP, RetryInvalidHost)
 {
   const std::string host = "192.168.0.1";
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://"+host, "ex1");
+  conn = amqp::connect("amqp://"+host, "ex1", false);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -406,7 +406,7 @@ TEST_F(TestAMQP, RetryInvalidPort)
 {
   const int port = 9999;
   const auto connection_number = amqp::get_connection_count();
-  conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1");
+  conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false);
   EXPECT_TRUE(conn);
   EXPECT_EQ(amqp::get_connection_count(), connection_number + 1);
   auto rc = amqp::publish(conn, "topic", "message");
@@ -425,7 +425,7 @@ TEST_F(TestAMQP, RetryFailWrite)
   amqp_mock::FAIL_NEXT_WRITE = true;
   const std::string host("localhost4");
   amqp_mock::set_valid_host(host);
-  conn = amqp::connect("amqp://" + host, "ex1");
+  conn = amqp::connect("amqp://" + host, "ex1", false);
   EXPECT_TRUE(conn);
   auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack);
   EXPECT_EQ(rc, 0);