"of RGW instances under heavy use. If you would like "
"to turn off cache expiry, set this value to zero."),
-
Option("rgw_inject_notify_timeout_probability", Option::TYPE_FLOAT,
Option::LEVEL_DEV)
.set_default(0)
"do not set it in a production cluster, as it "
"actively causes failures. Set this to a floating "
"point value between 0 and 1."),
+ Option("rgw_max_notify_retries", Option::TYPE_UINT,
+ Option::LEVEL_ADVANCED)
+ .set_default(3)
+ .add_tag("error recovery")
+ .add_service("rgw")
+ .set_description("Number of attempts to notify peers before giving up.")
+ .set_long_description("The number of times we will attempt to update "
+ "a peer's cache in the event of error before giving "
+ "up. This is unlikely to be an issue unless your "
+ "cluster is very heavily loaded. Beware that "
+ "increasing this value may cause some operations to "
+ "take longer in exceptional cases and thus may, "
+ "rarely, cause clients to time out."),
});
}
#include <sys/types.h>
#include <boost/algorithm/string.hpp>
+#include <boost/container/flat_set.hpp>
#include <boost/format.hpp>
#include <boost/optional.hpp>
#include <boost/utility/in_place_factory.hpp>
inject_notify_timeout_probability =
cct->_conf.get_val<double>("rgw_inject_notify_timeout_probability");
+ max_notify_retries = cct->_conf.get_val<uint64_t>("rgw_max_notify_retries");
ret = init_rados();
if (ret < 0)
pick_control_oid(key, notify_oid);
ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
- return control_pool_ctx.notify2(notify_oid, bl, 0, NULL);
+ return robust_notify(notify_oid, bl);
+}
+
+int RGWRados::robust_notify(const string& notify_oid, bufferlist& bl)
+{
+ // The reply of every machine that acks goes in here.
+ boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
+ bufferlist rbl;
+
+ // First, try to send, without being fancy about it.
+ auto r = control_pool_ctx.notify2(notify_oid, bl, 0, &rbl);
+
+ // If that doesn't work, get serious.
+ if (r < 0) {
+ ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
+ << cpp_strerror(-r) << dendl;
+
+
+ auto p = rbl.cbegin();
+ // Gather up the replies to the first attempt.
+ try {
+ uint32_t num_acks;
+ decode(num_acks, p);
+ // Doing this ourselves since we don't care about the payload;
+ for (auto i = 0u; i < num_acks; ++i) {
+ std::pair<uint64_t, uint64_t> id;
+ decode(id, p);
+ acks.insert(id);
+ ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
+ uint32_t blen;
+ decode(blen, p);
+ p.advance(blen);
+ }
+ } catch (const buffer::error& e) {
+ ldout(cct, 0) << "robust_notify: notify response parse failed: "
+ << e.what() << dendl;
+ acks.clear(); // Throw away junk on failed parse.
+ }
+
+
+ // Every machine that fails to reply and hasn't acked a previous
+ // attempt goes in here.
+ boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
+
+ auto tries = 1u;
+ while (r < 0 && tries < max_notify_retries) {
+ ++tries;
+ rbl.clear();
+ // Reset the timeouts, we're only concerned with new ones.
+ timeouts.clear();
+ r = control_pool_ctx.notify2(notify_oid, bl, 0, &rbl);
+ if (r < 0) {
+ ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
+ << cpp_strerror(-r) << dendl;
+ p = rbl.begin();
+ try {
+ uint32_t num_acks;
+ decode(num_acks, p);
+ // Not only do we not care about the payload, but we don't
+ // want to empty the container; we just want to augment it
+ // with any new members.
+ for (auto i = 0u; i < num_acks; ++i) {
+ std::pair<uint64_t, uint64_t> id;
+ decode(id, p);
+ auto ir = acks.insert(id);
+ if (ir.second) {
+ ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
+ }
+ uint32_t blen;
+ decode(blen, p);
+ p.advance(blen);
+ }
+
+ uint32_t num_timeouts;
+ decode(num_timeouts, p);
+ for (auto i = 0u; i < num_timeouts; ++i) {
+ std::pair<uint64_t, uint64_t> id;
+ decode(id, p);
+ // Only track timeouts from hosts that haven't acked previously.
+ if (acks.find(id) != acks.cend()) {
+ ldout(cct, 20) << "robust_notify: " << id << " timed out."
+ << dendl;
+ timeouts.insert(id);
+ }
+ }
+ } catch (const buffer::error& e) {
+ ldout(cct, 0) << "robust_notify: notify response parse failed: "
+ << e.what() << dendl;
+ continue;
+ }
+ // If we got a good parse and timeouts is empty, that means
+ // everyone who timed out in one call received the update in a
+ // previous one.
+ if (timeouts.empty()) {
+ r = 0;
+ }
+ }
+ }
+ }
+ return r;
}
int RGWRados::pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx)