const RGWCacheNotifyInfo& cni,
optional_yield y)
{
- // The reply of every machine that acks goes in here.
- boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
- bufferlist bl, rbl;
+ bufferlist bl;
encode(cni, bl);
// First, try to send, without being fancy about it.
- auto r = notify_obj.notify(dpp, bl, 0, &rbl, y);
+ auto r = notify_obj.notify(dpp, bl, 0, nullptr, y);
- // If that doesn't work, get serious.
if (r < 0) {
- ldpp_dout(dpp, 1) << "robust_notify: If at first you don't succeed: "
+ ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " Notify failed on object " << cni.obj << ": "
<< cpp_strerror(-r) << dendl;
+ }
-
- auto p = rbl.cbegin();
- // Gather up the replies to the first attempt.
- try {
- uint32_t num_acks;
- decode(num_acks, p);
- // Doing this ourselves since we don't care about the payload;
- for (auto i = 0u; i < num_acks; ++i) {
- std::pair<uint64_t, uint64_t> id;
- decode(id, p);
- acks.insert(id);
- ldpp_dout(dpp, 20) << "robust_notify: acked by " << id << dendl;
- uint32_t blen;
- decode(blen, p);
- p += blen;
- }
- } catch (const buffer::error& e) {
- ldpp_dout(dpp, 0) << "robust_notify: notify response parse failed: "
- << e.what() << dendl;
- acks.clear(); // Throw away junk on failed parse.
- }
-
-
- // Every machine that fails to reply and hasn't acked a previous
- // attempt goes in here.
- boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
-
- auto tries = 1u;
- while (r < 0 && tries < max_notify_retries) {
- ++tries;
- rbl.clear();
- // Reset the timeouts, we're only concerned with new ones.
- timeouts.clear();
- r = notify_obj.notify(dpp, bl, 0, &rbl, y);
+ // If we timed out, get serious.
+ if (r == -ETIMEDOUT) {
+ RGWCacheNotifyInfo info;
+ info.op = INVALIDATE_OBJ;
+ info.obj = cni.obj;
+ bufferlist retrybl;
+ encode(info, retrybl);
+
+ for (auto tries = 0u;
+ r == -ETIMEDOUT && tries < max_notify_retries;
+ ++tries) {
+ ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " Invalidating obj=" << info.obj << " tries="
+ << tries << dendl;
+ r = notify_obj.notify(dpp, bl, 0, nullptr, y);
if (r < 0) {
- ldpp_dout(dpp, 1) << "robust_notify: retry " << tries << " failed: "
- << cpp_strerror(-r) << dendl;
- p = rbl.begin();
- try {
- uint32_t num_acks;
- decode(num_acks, p);
- // Not only do we not care about the payload, but we don't
- // want to empty the container; we just want to augment it
- // with any new members.
- for (auto i = 0u; i < num_acks; ++i) {
- std::pair<uint64_t, uint64_t> id;
- decode(id, p);
- auto ir = acks.insert(id);
- if (ir.second) {
- ldpp_dout(dpp, 20) << "robust_notify: acked by " << id << dendl;
- }
- uint32_t blen;
- decode(blen, p);
- p += blen;
- }
-
- uint32_t num_timeouts;
- decode(num_timeouts, p);
- for (auto i = 0u; i < num_timeouts; ++i) {
- std::pair<uint64_t, uint64_t> id;
- decode(id, p);
- // Only track timeouts from hosts that haven't acked previously.
- if (acks.find(id) != acks.cend()) {
- ldpp_dout(dpp, 20) << "robust_notify: " << id << " timed out."
- << dendl;
- timeouts.insert(id);
- }
- }
- } catch (const buffer::error& e) {
- ldpp_dout(dpp, 0) << "robust_notify: notify response parse failed: "
- << e.what() << dendl;
- continue;
- }
- // If we got a good parse and timeouts is empty, that means
- // everyone who timed out in one call received the update in a
- // previous one.
- if (timeouts.empty()) {
- r = 0;
- }
+ ldpp_dout(dpp, 1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " invalidation attempt " << tries << " failed: "
+ << cpp_strerror(-r) << dendl;
}
}
}