From ea9c881ec6d70c4c0a9255b71d3e506163f18429 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Wed, 22 Aug 2018 19:14:30 +0800 Subject: [PATCH] mon/MonClient: extract MonSub out MonSub is extracted so crimson/mon/MonClient can reuse it. also, it's more readable this way: - use ceph::coarse_mono_time instead of utime_t for renew_sent, and renew_after timestamps - modernize the code using C++17 * use range-based loop * use structured-bind - fix the anti-pattern of map.count(foo) and then map[foo], use auto i = map.find(foo); instead. - some logging messages are dropped in this refactory, but they are printed only when talking with pre-infernalis monitor. Signed-off-by: Kefu Chai --- src/CMakeLists.txt | 1 + src/mon/MonClient.cc | 50 ++++++------------- src/mon/MonClient.h | 76 +++-------------------------- src/mon/MonSub.cc | 114 +++++++++++++++++++++++++++++++++++++++++++ src/mon/MonSub.h | 46 +++++++++++++++++ 5 files changed, 182 insertions(+), 105 deletions(-) create mode 100644 src/mon/MonSub.cc create mode 100644 src/mon/MonSub.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 70b24c95e33f5..02c11e2b659d0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -321,6 +321,7 @@ set(libcommon_files mon/MonCap.cc mon/MonClient.cc mon/MonMap.cc + mon/MonSub.cc mgr/MgrClient.cc mgr/ServiceMap.cc osd/ECMsgTypes.cc diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 09a86749faef3..6cf4a4ea5d3be 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -81,7 +81,7 @@ int MonClient::get_monmap() ldout(cct, 10) << __func__ << dendl; Mutex::Locker l(monc_lock); - _sub_want("monmap", 0, 0); + sub.want("monmap", 0, 0); if (!_opened()) _reopen_session(); @@ -358,7 +358,7 @@ void MonClient::handle_monmap(MMonMap *m) monmap.print(*_dout); *_dout << dendl; - _sub_got("monmap", monmap.get_epoch()); + sub.got("monmap", monmap.get_epoch()); if (!monmap.get_addr_name(peer, cur_mon)) { ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl; @@ -484,8 +484,8 @@ int MonClient::authenticate(double timeout) ldout(cct, 5) << "already authenticated" << dendl; return 0; } - _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0); - _sub_want("config", 0, 0); + sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0); + sub.want("config", 0, 0); if (!_opened()) _reopen_session(); @@ -652,14 +652,9 @@ void MonClient::_reopen_session(int rank) c.second.start(monmap.get_epoch(), entity_name, *auth_supported); } - for (map::iterator p = sub_sent.begin(); - p != sub_sent.end(); - ++p) { - if (sub_new.count(p->first) == 0) - sub_new[p->first] = p->second; - } - if (!sub_new.empty()) + if (sub.reload()) { _renew_subs(); + } } MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id) @@ -788,12 +783,12 @@ void MonClient::tick() utime_t now = ceph_clock_now(); auto cur_con = active_con->get_con(); if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) { - ldout(cct, 10) << "renew subs? (now: " << now - << "; renew after: " << sub_renew_after << ") -- " - << (now > sub_renew_after ? "yes" : "no") + const bool maybe_renew = sub.need_renew(); + ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no") << dendl; - if (now > sub_renew_after) + if (maybe_renew) { _renew_subs(); + } } cur_con->send_keepalive(); @@ -842,7 +837,7 @@ void MonClient::schedule_tick() void MonClient::_renew_subs() { assert(monc_lock.is_locked()); - if (sub_new.empty()) { + if (!sub.have_new()) { ldout(cct, 10) << __func__ << " - empty" << dendl; return; } @@ -851,33 +846,16 @@ void MonClient::_renew_subs() if (!_opened()) _reopen_session(); else { - if (sub_renew_sent == utime_t()) - sub_renew_sent = ceph_clock_now(); - MMonSubscribe *m = new MMonSubscribe; - m->what = sub_new; + m->what = sub.get_subs(); _send_mon_message(m); - - // update sub_sent with sub_new - sub_new.insert(sub_sent.begin(), sub_sent.end()); - std::swap(sub_new, sub_sent); - sub_new.clear(); + sub.renewed(); } } void MonClient::handle_subscribe_ack(MMonSubscribeAck *m) { - if (sub_renew_sent != utime_t()) { - // NOTE: this is only needed for legacy (infernalis or older) - // mons; see tick(). - sub_renew_after = sub_renew_sent; - sub_renew_after += m->interval / 2.0; - ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl; - sub_renew_sent = utime_t(); - } else { - ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << ", ignoring" << dendl; - } - + sub.acked(m->interval); m->put(); } diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index bffd2d916e67c..cf765f6eb935c 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -19,6 +19,7 @@ #include "msg/Messenger.h" #include "MonMap.h" +#include "MonSub.h" #include "common/Timer.h" #include "common/Finisher.h" @@ -245,55 +246,12 @@ public: */ void flush_log(); - // mon subscriptions private: - map sub_sent; // my subs, and current versions - map sub_new; // unsent new subs - utime_t sub_renew_sent, sub_renew_after; - + // mon subscriptions + MonSub sub; void _renew_subs(); void handle_subscribe_ack(MMonSubscribeAck* m); - bool _sub_want(const string &what, version_t start, unsigned flags) { - auto sub = sub_new.find(what); - if (sub != sub_new.end() && - sub->second.start == start && - sub->second.flags == flags) { - return false; - } else { - sub = sub_sent.find(what); - if (sub != sub_sent.end() && - sub->second.start == start && - sub->second.flags == flags) - return false; - } - - sub_new[what].start = start; - sub_new[what].flags = flags; - return true; - } - void _sub_got(const string &what, version_t got) { - if (sub_new.count(what)) { - if (sub_new[what].start <= got) { - if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME) - sub_new.erase(what); - else - sub_new[what].start = got + 1; - } - } else if (sub_sent.count(what)) { - if (sub_sent[what].start <= got) { - if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME) - sub_sent.erase(what); - else - sub_sent[what].start = got + 1; - } - } - } - void _sub_unwant(const string &what) { - sub_sent.erase(what); - sub_new.erase(what); - } - public: void renew_subs() { Mutex::Locker l(monc_lock); @@ -301,39 +259,19 @@ public: } bool sub_want(string what, version_t start, unsigned flags) { Mutex::Locker l(monc_lock); - return _sub_want(what, start, flags); + return sub.want(what, start, flags); } void sub_got(string what, version_t have) { Mutex::Locker l(monc_lock); - _sub_got(what, have); + sub.got(what, have); } void sub_unwant(string what) { Mutex::Locker l(monc_lock); - _sub_unwant(what); + sub.unwant(what); } - /** - * Increase the requested subscription start point. If you do increase - * the value, apply the passed-in flags as well; otherwise do nothing. - */ bool sub_want_increment(string what, version_t start, unsigned flags) { Mutex::Locker l(monc_lock); - map::iterator i = sub_new.find(what); - if (i != sub_new.end()) { - if (i->second.start >= start) - return false; - i->second.start = start; - i->second.flags = flags; - return true; - } - - i = sub_sent.find(what); - if (i == sub_sent.end() || i->second.start < start) { - ceph_mon_subscribe_item& item = sub_new[what]; - item.start = start; - item.flags = flags; - return true; - } - return false; + return sub.inc_want(what, start, flags); } std::unique_ptr keyring; diff --git a/src/mon/MonSub.cc b/src/mon/MonSub.cc new file mode 100644 index 0000000000000..a2c60ba918dc8 --- /dev/null +++ b/src/mon/MonSub.cc @@ -0,0 +1,114 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "MonSub.h" + +bool MonSub::have_new() const { + return !sub_new.empty(); +} + +bool MonSub::need_renew() const +{ + return ceph::coarse_mono_clock::now() > renew_after; +} + +void MonSub::renewed() +{ + if (clock::is_zero(renew_sent)) { + renew_sent = clock::now(); + } + // update sub_sent with sub_new + sub_new.insert(sub_sent.begin(), sub_sent.end()); + std::swap(sub_new, sub_sent); + sub_new.clear(); +} + +void MonSub::acked(uint32_t interval) +{ + if (!clock::is_zero(renew_sent)) { + // NOTE: this is only needed for legacy (infernalis or older) + // mons; see MonClient::tick(). + renew_after = renew_sent; + renew_after += ceph::make_timespan(interval / 2.0); + renew_sent = clock::zero(); + } +} + +bool MonSub::reload() +{ + for (auto& [what, sub] : sub_sent) { + if (sub_new.count(what) == 0) { + sub_new[what] = sub; + } + } + return have_new(); +} + +void MonSub::got(const std::string& what, version_t have) +{ + if (auto i = sub_new.find(what); i != sub_new.end()) { + auto& sub = i->second; + if (sub.start <= have) { + if (sub.flags & CEPH_SUBSCRIBE_ONETIME) { + sub_new.erase(i); + } else { + sub.start = have + 1; + } + } + } else if (auto i = sub_sent.find(what); i != sub_sent.end()) { + auto& sub = i->second; + if (sub.start <= have) { + if (sub.flags & CEPH_SUBSCRIBE_ONETIME) { + sub_sent.erase(i); + } else { + sub.start = have + 1; + } + } + } +} + +bool MonSub::want(const std::string& what, version_t start, unsigned flags) +{ + if (auto sub = sub_new.find(what); + sub != sub_new.end() && + sub->second.start == start && + sub->second.flags == flags) { + return false; + } else if (auto sub = sub_sent.find(what); + sub != sub_sent.end() && + sub->second.start == start && + sub->second.flags == flags) { + return false; + } else { + sub_new[what].start = start; + sub_new[what].flags = flags; + return true; + } +} + +bool MonSub::inc_want(const std::string& what, version_t start, unsigned flags) +{ + if (auto sub = sub_new.find(what); sub != sub_new.end()) { + if (sub->second.start >= start) { + return false; + } else { + sub->second.start = start; + sub->second.flags = flags; + return true; + } + } else if (auto sub = sub_sent.find(what); + sub == sub_sent.end() || sub->second.start < start) { + auto& item = sub_new[what]; + item.start = start; + item.flags = flags; + return true; + } else { + return false; + } +} + +void MonSub::unwant(const std::string& what) +{ + sub_sent.erase(what); + sub_new.erase(what); +} diff --git a/src/mon/MonSub.h b/src/mon/MonSub.h new file mode 100644 index 0000000000000..8ff5a8f1872fd --- /dev/null +++ b/src/mon/MonSub.h @@ -0,0 +1,46 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "common/ceph_time.h" +#include "include/types.h" + +// mon subscriptions +class MonSub +{ +public: + // @returns true if there is any "new" subscriptions + bool have_new() const; + auto get_subs() const { + return sub_new; + } + bool need_renew() const; + // change the status of "new" subscriptions to "sent" + void renewed(); + // the peer acked the subscription request + void acked(uint32_t interval); + void got(const std::string& what, version_t version); + // revert the status of subscriptions from "sent" to "new" + // @returns true if there is any pending "new" subscriptions + bool reload(); + // add a new subscription + bool want(const std::string& what, version_t start, unsigned flags); + // increment the requested subscription start point. If you do increase + // the value, apply the passed-in flags as well; otherwise do nothing. + bool inc_want(const std::string& what, version_t start, unsigned flags); + // cancel a subscription + void unwant(const std::string& what); +private: + // my subs, and current versions + std::map sub_sent; + // unsent new subs + std::map sub_new; + using time_point = ceph::coarse_mono_time; + using clock = typename time_point::clock; + time_point renew_sent; + time_point renew_after; +}; -- 2.39.5