]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: Robustly notify
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 14 Mar 2018 18:47:15 +0000 (14:47 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Fri, 20 Jul 2018 16:25:07 +0000 (12:25 -0400)
If at first you don't succeed, try, try again.

Keep track of what you succeed with and what you don't.

If you succeeded with something on a previous try, don't worry if you
didn't get it this time.

After a reasonable number of tries, give up.

http://tracker.ceph.com/issues/24963

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/common/options.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 79e88c5f5fae2d85f9eeb99fd7a3388efb99ec39..46644d82064033032f2ea11b3515b5d2d376bf94 100644 (file)
@@ -6262,7 +6262,6 @@ std::vector<Option> get_rgw_options() {
                          "of RGW instances under heavy use. If you would like "
                          "to turn off cache expiry, set this value to zero."),
 
-
     Option("rgw_inject_notify_timeout_probability", Option::TYPE_FLOAT,
           Option::LEVEL_DEV)
     .set_default(0)
@@ -6278,6 +6277,19 @@ std::vector<Option> get_rgw_options() {
                          "do not set it in a production cluster, as it "
                          "actively causes failures. Set this to a floating "
                          "point value between 0 and 1."),
+    Option("rgw_max_notify_retries", Option::TYPE_UINT,
+          Option::LEVEL_ADVANCED)
+    .set_default(3)
+    .add_tag("error recovery")
+    .add_service("rgw")
+    .set_description("Number of attempts to notify peers before giving up.")
+    .set_long_description("The number of times we will attempt to update "
+                         "a peer's cache in the event of error before giving "
+                         "up. This is unlikely to be an issue unless your "
+                         "cluster is very heavily loaded. Beware that "
+                         "increasing this value may cause some operations to "
+                         "take longer in exceptional cases and thus may, "
+                         "rarely, cause clients to time out."),
   });
 }
 
index 33ba7c30309cc91b45d7aa99ebb4d4b223aeca85..b9a07a827cd24bcf2b3be3c1f6f241c57a362bb0 100644 (file)
@@ -7,6 +7,7 @@
 #include <sys/types.h>
 #include <boost/algorithm/string.hpp>
 
+#include <boost/container/flat_set.hpp>
 #include <boost/format.hpp>
 #include <boost/optional.hpp>
 #include <boost/utility/in_place_factory.hpp>
@@ -4755,6 +4756,7 @@ int RGWRados::initialize()
 
   inject_notify_timeout_probability =
     cct->_conf.get_val<double>("rgw_inject_notify_timeout_probability");
+  max_notify_retries = cct->_conf.get_val<uint64_t>("rgw_max_notify_retries");
 
   ret = init_rados();
   if (ret < 0)
@@ -12679,7 +12681,106 @@ int RGWRados::distribute(const string& key, bufferlist& bl)
   pick_control_oid(key, notify_oid);
 
   ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
-  return control_pool_ctx.notify2(notify_oid, bl, 0, NULL);
+  return robust_notify(notify_oid, bl);
+}
+
+int RGWRados::robust_notify(const string& notify_oid, bufferlist& bl)
+{
+  // The reply of every machine that acks goes in here.
+  boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
+  bufferlist rbl;
+
+  // First, try to send, without being fancy about it.
+  auto r = control_pool_ctx.notify2(notify_oid, bl, 0, &rbl);
+
+  // If that doesn't work, get serious.
+  if (r < 0) {
+    ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
+                 << cpp_strerror(-r) << dendl;
+
+
+    auto p = rbl.cbegin();
+    // Gather up the replies to the first attempt.
+    try {
+      uint32_t num_acks;
+      decode(num_acks, p);
+      // Doing this ourselves since we don't care about the payload;
+      for (auto i = 0u; i < num_acks; ++i) {
+       std::pair<uint64_t, uint64_t> id;
+       decode(id, p);
+       acks.insert(id);
+       ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
+       uint32_t blen;
+       decode(blen, p);
+       p.advance(blen);
+      }
+    } catch (const buffer::error& e) {
+      ldout(cct, 0) << "robust_notify: notify response parse failed: "
+                   << e.what() << dendl;
+      acks.clear(); // Throw away junk on failed parse.
+    }
+
+
+    // Every machine that fails to reply and hasn't acked a previous
+    // attempt goes in here.
+    boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
+
+    auto tries = 1u;
+    while (r < 0 && tries < max_notify_retries) {
+      ++tries;
+      rbl.clear();
+      // Reset the timeouts, we're only concerned with new ones.
+      timeouts.clear();
+      r = control_pool_ctx.notify2(notify_oid, bl, 0, &rbl);
+      if (r < 0) {
+       ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
+                     << cpp_strerror(-r) << dendl;
+       p = rbl.begin();
+       try {
+         uint32_t num_acks;
+         decode(num_acks, p);
+         // Not only do we not care about the payload, but we don't
+         // want to empty the container; we just want to augment it
+         // with any new members.
+         for (auto i = 0u; i < num_acks; ++i) {
+           std::pair<uint64_t, uint64_t> id;
+           decode(id, p);
+           auto ir = acks.insert(id);
+           if (ir.second) {
+             ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
+           }
+           uint32_t blen;
+           decode(blen, p);
+           p.advance(blen);
+         }
+
+         uint32_t num_timeouts;
+         decode(num_timeouts, p);
+         for (auto i = 0u; i < num_timeouts; ++i) {
+           std::pair<uint64_t, uint64_t> id;
+           decode(id, p);
+           // Only track timeouts from hosts that haven't acked previously.
+           if (acks.find(id) != acks.cend()) {
+             ldout(cct, 20) << "robust_notify: " << id << " timed out."
+                            << dendl;
+             timeouts.insert(id);
+           }
+         }
+       } catch (const buffer::error& e) {
+         ldout(cct, 0) << "robust_notify: notify response parse failed: "
+                       << e.what() << dendl;
+         continue;
+       }
+       // If we got a good parse and timeouts is empty, that means
+       // everyone who timed out in one call received the update in a
+       // previous one.
+       if (timeouts.empty()) {
+         r = 0;
+       }
+      }
+    }
+  }
+  return r;
 }
 
 int RGWRados::pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx)
index e997197ae4481620811e1f6ec7fbbc7432b2f37c..463a35446b535b821b4649d9f9348a80e7b0680c 100644 (file)
@@ -2321,6 +2321,7 @@ class RGWRados : public AdminSocketHook
   bool watch_initialized;
 
   double inject_notify_timeout_probability = 0;
+  unsigned max_notify_retries = 0;
 
   friend class RGWWatcher;
 
@@ -3435,6 +3436,9 @@ public:
   int init_watch();
   void finalize_watch();
   int distribute(const string& key, bufferlist& bl);
+private:
+  int robust_notify(const string& notify_oid, bufferlist& bl);
+public:
   virtual int watch_cb(uint64_t notify_id,
                       uint64_t cookie,
                       uint64_t notifier_id,