From 7fcffe3d9f748c8f3addf57fdfa6a6bf2fdeff66 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 7 Oct 2015 00:09:18 -0400 Subject: [PATCH] mon/MonClient: only send new subscriptions Instead of resending all subscriptions, only send the new ones. This avoids races like - ask for 4+ - mon sends maps 4-50 - ask for 4+ and something else - mon has to resend same maps and the other thing Signed-off-by: Sage Weil --- src/mon/MonClient.cc | 15 +++++++++++--- src/mon/MonClient.h | 49 ++++++++++++++++++++++++++++++-------------- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index ede6eebae59c9..718355142e6f7 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -659,7 +659,13 @@ void MonClient::_reopen_session(int rank, string name) ::encode(global_id, m->auth_payload); _send_mon_message(m, true); - if (!sub_have.empty()) + for (map::iterator p = sub_sent.begin(); + p != sub_sent.end(); + ++p) { + if (sub_new.count(p->first) == 0) + sub_new[p->first] = p->second; + } + if (!sub_new.empty()) _renew_subs(); } @@ -753,7 +759,7 @@ void MonClient::schedule_tick() void MonClient::_renew_subs() { assert(monc_lock.is_locked()); - if (sub_have.empty()) { + if (sub_new.empty()) { ldout(cct, 10) << "renew_subs - empty" << dendl; return; } @@ -766,8 +772,11 @@ void MonClient::_renew_subs() sub_renew_sent = ceph_clock_now(cct); MMonSubscribe *m = new MMonSubscribe; - m->what = sub_have; + m->what = sub_new; _send_mon_message(m); + + sub_sent.insert(sub_new.begin(), sub_new.end()); + sub_new.clear(); } } diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index a9761d1df4b64..70b84b3537d60 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -209,31 +209,42 @@ public: // mon subscriptions private: - map sub_have; // my subs, and current versions + map sub_sent; // my subs, and current versions + map sub_new; // unsent new subs utime_t sub_renew_sent, sub_renew_after; void _renew_subs(); void handle_subscribe_ack(MMonSubscribeAck* m); bool _sub_want(string what, version_t start, unsigned flags) { - if (sub_have.count(what) && - sub_have[what].start == start && - sub_have[what].flags == flags) + if ((sub_new.count(what) == 0 && + sub_sent.count(what) && + sub_sent[what].start == start && + sub_sent[what].flags == flags) || + (sub_new.count(what) && + sub_new[what].start == start && + sub_new[what].flags == flags)) return false; - sub_have[what].start = start; - sub_have[what].flags = flags; + sub_new[what].start = start; + sub_new[what].flags = flags; return true; } void _sub_got(string what, version_t got) { - if (sub_have.count(what)) { - if (sub_have[what].flags & CEPH_SUBSCRIBE_ONETIME) - sub_have.erase(what); + if (sub_new.count(what)) { + if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME) + sub_new.erase(what); else - sub_have[what].start = got + 1; + sub_new[what].start = got + 1; + } else if (sub_sent.count(what)) { + if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME) + sub_sent.erase(what); + else + sub_sent[what].start = got + 1; } } void _sub_unwant(string what) { - sub_have.erase(what); + sub_sent.erase(what); + sub_new.erase(what); } // auth tickets @@ -262,10 +273,18 @@ public: */ bool sub_want_increment(string what, version_t start, unsigned flags) { Mutex::Locker l(monc_lock); - map::iterator i = - sub_have.find(what); - if (i == sub_have.end() || i->second.start < start) { - ceph_mon_subscribe_item& item = sub_have[what]; + map::iterator i = sub_new.find(what); + if (i != sub_new.end()) { + if (i->second.start >= start) + return false; + i->second.start = start; + i->second.flags = flags; + return true; + } + + i = sub_sent.find(what); + if (i == sub_sent.end() || i->second.start < start) { + ceph_mon_subscribe_item& item = sub_new[what]; item.start = start; item.flags = flags; return true; -- 2.39.5