MonSub is extracted so crimson/mon/MonClient can reuse it.
also, it's more readable this way:
- use ceph::coarse_mono_time instead of utime_t for renew_sent, and
renew_after timestamps
- modernize the code using C++17
* use range-based loop
* use structured-bind
- fix the anti-pattern of map.count(foo) and then map[foo], use
auto i = map.find(foo); instead.
- some logging messages are dropped in this refactory, but they are
printed only when talking with pre-infernalis monitor.
Signed-off-by: Kefu Chai <kchai@redhat.com>
mon/MonCap.cc
mon/MonClient.cc
mon/MonMap.cc
+ mon/MonSub.cc
mgr/MgrClient.cc
mgr/ServiceMap.cc
osd/ECMsgTypes.cc
ldout(cct, 10) << __func__ << dendl;
Mutex::Locker l(monc_lock);
- _sub_want("monmap", 0, 0);
+ sub.want("monmap", 0, 0);
if (!_opened())
_reopen_session();
monmap.print(*_dout);
*_dout << dendl;
- _sub_got("monmap", monmap.get_epoch());
+ sub.got("monmap", monmap.get_epoch());
if (!monmap.get_addr_name(peer, cur_mon)) {
ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl;
ldout(cct, 5) << "already authenticated" << dendl;
return 0;
}
- _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
- _sub_want("config", 0, 0);
+ sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
+ sub.want("config", 0, 0);
if (!_opened())
_reopen_session();
c.second.start(monmap.get_epoch(), entity_name, *auth_supported);
}
- for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin();
- p != sub_sent.end();
- ++p) {
- if (sub_new.count(p->first) == 0)
- sub_new[p->first] = p->second;
- }
- if (!sub_new.empty())
+ if (sub.reload()) {
_renew_subs();
+ }
}
MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id)
utime_t now = ceph_clock_now();
auto cur_con = active_con->get_con();
if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
- ldout(cct, 10) << "renew subs? (now: " << now
- << "; renew after: " << sub_renew_after << ") -- "
- << (now > sub_renew_after ? "yes" : "no")
+ const bool maybe_renew = sub.need_renew();
+ ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
<< dendl;
- if (now > sub_renew_after)
+ if (maybe_renew) {
_renew_subs();
+ }
}
cur_con->send_keepalive();
void MonClient::_renew_subs()
{
assert(monc_lock.is_locked());
- if (sub_new.empty()) {
+ if (!sub.have_new()) {
ldout(cct, 10) << __func__ << " - empty" << dendl;
return;
}
if (!_opened())
_reopen_session();
else {
- if (sub_renew_sent == utime_t())
- sub_renew_sent = ceph_clock_now();
-
MMonSubscribe *m = new MMonSubscribe;
- m->what = sub_new;
+ m->what = sub.get_subs();
_send_mon_message(m);
-
- // update sub_sent with sub_new
- sub_new.insert(sub_sent.begin(), sub_sent.end());
- std::swap(sub_new, sub_sent);
- sub_new.clear();
+ sub.renewed();
}
}
void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
{
- if (sub_renew_sent != utime_t()) {
- // NOTE: this is only needed for legacy (infernalis or older)
- // mons; see tick().
- sub_renew_after = sub_renew_sent;
- sub_renew_after += m->interval / 2.0;
- ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl;
- sub_renew_sent = utime_t();
- } else {
- ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << ", ignoring" << dendl;
- }
-
+ sub.acked(m->interval);
m->put();
}
#include "msg/Messenger.h"
#include "MonMap.h"
+#include "MonSub.h"
#include "common/Timer.h"
#include "common/Finisher.h"
*/
void flush_log();
- // mon subscriptions
private:
- map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
- map<string,ceph_mon_subscribe_item> sub_new; // unsent new subs
- utime_t sub_renew_sent, sub_renew_after;
-
+ // mon subscriptions
+ MonSub sub;
void _renew_subs();
void handle_subscribe_ack(MMonSubscribeAck* m);
- bool _sub_want(const string &what, version_t start, unsigned flags) {
- auto sub = sub_new.find(what);
- if (sub != sub_new.end() &&
- sub->second.start == start &&
- sub->second.flags == flags) {
- return false;
- } else {
- sub = sub_sent.find(what);
- if (sub != sub_sent.end() &&
- sub->second.start == start &&
- sub->second.flags == flags)
- return false;
- }
-
- sub_new[what].start = start;
- sub_new[what].flags = flags;
- return true;
- }
- void _sub_got(const string &what, version_t got) {
- if (sub_new.count(what)) {
- if (sub_new[what].start <= got) {
- if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
- sub_new.erase(what);
- else
- sub_new[what].start = got + 1;
- }
- } else if (sub_sent.count(what)) {
- if (sub_sent[what].start <= got) {
- if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
- sub_sent.erase(what);
- else
- sub_sent[what].start = got + 1;
- }
- }
- }
- void _sub_unwant(const string &what) {
- sub_sent.erase(what);
- sub_new.erase(what);
- }
-
public:
void renew_subs() {
Mutex::Locker l(monc_lock);
}
bool sub_want(string what, version_t start, unsigned flags) {
Mutex::Locker l(monc_lock);
- return _sub_want(what, start, flags);
+ return sub.want(what, start, flags);
}
void sub_got(string what, version_t have) {
Mutex::Locker l(monc_lock);
- _sub_got(what, have);
+ sub.got(what, have);
}
void sub_unwant(string what) {
Mutex::Locker l(monc_lock);
- _sub_unwant(what);
+ sub.unwant(what);
}
- /**
- * Increase the requested subscription start point. If you do increase
- * the value, apply the passed-in flags as well; otherwise do nothing.
- */
bool sub_want_increment(string what, version_t start, unsigned flags) {
Mutex::Locker l(monc_lock);
- map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
- if (i != sub_new.end()) {
- if (i->second.start >= start)
- return false;
- i->second.start = start;
- i->second.flags = flags;
- return true;
- }
-
- i = sub_sent.find(what);
- if (i == sub_sent.end() || i->second.start < start) {
- ceph_mon_subscribe_item& item = sub_new[what];
- item.start = start;
- item.flags = flags;
- return true;
- }
- return false;
+ return sub.inc_want(what, start, flags);
}
std::unique_ptr<KeyRing> keyring;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "MonSub.h"
+
+bool MonSub::have_new() const {
+ return !sub_new.empty();
+}
+
+bool MonSub::need_renew() const
+{
+ return ceph::coarse_mono_clock::now() > renew_after;
+}
+
+void MonSub::renewed()
+{
+ if (clock::is_zero(renew_sent)) {
+ renew_sent = clock::now();
+ }
+ // update sub_sent with sub_new
+ sub_new.insert(sub_sent.begin(), sub_sent.end());
+ std::swap(sub_new, sub_sent);
+ sub_new.clear();
+}
+
+void MonSub::acked(uint32_t interval)
+{
+ if (!clock::is_zero(renew_sent)) {
+ // NOTE: this is only needed for legacy (infernalis or older)
+ // mons; see MonClient::tick().
+ renew_after = renew_sent;
+ renew_after += ceph::make_timespan(interval / 2.0);
+ renew_sent = clock::zero();
+ }
+}
+
+bool MonSub::reload()
+{
+ for (auto& [what, sub] : sub_sent) {
+ if (sub_new.count(what) == 0) {
+ sub_new[what] = sub;
+ }
+ }
+ return have_new();
+}
+
+void MonSub::got(const std::string& what, version_t have)
+{
+ if (auto i = sub_new.find(what); i != sub_new.end()) {
+ auto& sub = i->second;
+ if (sub.start <= have) {
+ if (sub.flags & CEPH_SUBSCRIBE_ONETIME) {
+ sub_new.erase(i);
+ } else {
+ sub.start = have + 1;
+ }
+ }
+ } else if (auto i = sub_sent.find(what); i != sub_sent.end()) {
+ auto& sub = i->second;
+ if (sub.start <= have) {
+ if (sub.flags & CEPH_SUBSCRIBE_ONETIME) {
+ sub_sent.erase(i);
+ } else {
+ sub.start = have + 1;
+ }
+ }
+ }
+}
+
+bool MonSub::want(const std::string& what, version_t start, unsigned flags)
+{
+ if (auto sub = sub_new.find(what);
+ sub != sub_new.end() &&
+ sub->second.start == start &&
+ sub->second.flags == flags) {
+ return false;
+ } else if (auto sub = sub_sent.find(what);
+ sub != sub_sent.end() &&
+ sub->second.start == start &&
+ sub->second.flags == flags) {
+ return false;
+ } else {
+ sub_new[what].start = start;
+ sub_new[what].flags = flags;
+ return true;
+ }
+}
+
+bool MonSub::inc_want(const std::string& what, version_t start, unsigned flags)
+{
+ if (auto sub = sub_new.find(what); sub != sub_new.end()) {
+ if (sub->second.start >= start) {
+ return false;
+ } else {
+ sub->second.start = start;
+ sub->second.flags = flags;
+ return true;
+ }
+ } else if (auto sub = sub_sent.find(what);
+ sub == sub_sent.end() || sub->second.start < start) {
+ auto& item = sub_new[what];
+ item.start = start;
+ item.flags = flags;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void MonSub::unwant(const std::string& what)
+{
+ sub_sent.erase(what);
+ sub_new.erase(what);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+#include <string>
+
+#include "common/ceph_time.h"
+#include "include/types.h"
+
+// mon subscriptions
+class MonSub
+{
+public:
+ // @returns true if there is any "new" subscriptions
+ bool have_new() const;
+ auto get_subs() const {
+ return sub_new;
+ }
+ bool need_renew() const;
+ // change the status of "new" subscriptions to "sent"
+ void renewed();
+ // the peer acked the subscription request
+ void acked(uint32_t interval);
+ void got(const std::string& what, version_t version);
+ // revert the status of subscriptions from "sent" to "new"
+ // @returns true if there is any pending "new" subscriptions
+ bool reload();
+ // add a new subscription
+ bool want(const std::string& what, version_t start, unsigned flags);
+ // increment the requested subscription start point. If you do increase
+ // the value, apply the passed-in flags as well; otherwise do nothing.
+ bool inc_want(const std::string& what, version_t start, unsigned flags);
+ // cancel a subscription
+ void unwant(const std::string& what);
+private:
+ // my subs, and current versions
+ std::map<std::string,ceph_mon_subscribe_item> sub_sent;
+ // unsent new subs
+ std::map<std::string,ceph_mon_subscribe_item> sub_new;
+ using time_point = ceph::coarse_mono_time;
+ using clock = typename time_point::clock;
+ time_point renew_sent;
+ time_point renew_after;
+};