]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon/MonClient: extract MonSub out 23688/head
authorKefu Chai <kchai@redhat.com>
Wed, 22 Aug 2018 11:14:30 +0000 (19:14 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 22 Aug 2018 13:12:52 +0000 (21:12 +0800)
MonSub is extracted so crimson/mon/MonClient can reuse it.

also, it's more readable this way:

- use ceph::coarse_mono_time instead of utime_t for renew_sent, and
  renew_after timestamps
- modernize the code using C++17
  * use range-based loop
  * use structured-bind
- fix the anti-pattern of map.count(foo) and then map[foo], use
  auto i = map.find(foo); instead.
- some logging messages are dropped in this refactory, but they are
  printed only when talking with pre-infernalis monitor.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/CMakeLists.txt
src/mon/MonClient.cc
src/mon/MonClient.h
src/mon/MonSub.cc [new file with mode: 0644]
src/mon/MonSub.h [new file with mode: 0644]

index 70b24c95e33f5cd58a50a6cbbb51fffdd47ef388..02c11e2b659d0e14b93bcc0eb0068545d581f952 100644 (file)
@@ -321,6 +321,7 @@ set(libcommon_files
   mon/MonCap.cc
   mon/MonClient.cc
   mon/MonMap.cc
+  mon/MonSub.cc
   mgr/MgrClient.cc
   mgr/ServiceMap.cc
   osd/ECMsgTypes.cc
index 09a86749faef39d8b9fdfe9e4b2d669b44aeb25c..6cf4a4ea5d3be773d3908f6ed103717fb482401d 100644 (file)
@@ -81,7 +81,7 @@ int MonClient::get_monmap()
   ldout(cct, 10) << __func__ << dendl;
   Mutex::Locker l(monc_lock);
   
-  _sub_want("monmap", 0, 0);
+  sub.want("monmap", 0, 0);
   if (!_opened())
     _reopen_session();
 
@@ -358,7 +358,7 @@ void MonClient::handle_monmap(MMonMap *m)
   monmap.print(*_dout);
   *_dout << dendl;
 
-  _sub_got("monmap", monmap.get_epoch());
+  sub.got("monmap", monmap.get_epoch());
 
   if (!monmap.get_addr_name(peer, cur_mon)) {
     ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl;
@@ -484,8 +484,8 @@ int MonClient::authenticate(double timeout)
     ldout(cct, 5) << "already authenticated" << dendl;
     return 0;
   }
-  _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
-  _sub_want("config", 0, 0);
+  sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
+  sub.want("config", 0, 0);
   if (!_opened())
     _reopen_session();
 
@@ -652,14 +652,9 @@ void MonClient::_reopen_session(int rank)
     c.second.start(monmap.get_epoch(), entity_name, *auth_supported);
   }
 
-  for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin();
-       p != sub_sent.end();
-       ++p) {
-    if (sub_new.count(p->first) == 0)
-      sub_new[p->first] = p->second;
-  }
-  if (!sub_new.empty())
+  if (sub.reload()) {
     _renew_subs();
+  }
 }
 
 MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id)
@@ -788,12 +783,12 @@ void MonClient::tick()
     utime_t now = ceph_clock_now();
     auto cur_con = active_con->get_con();
     if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
-      ldout(cct, 10) << "renew subs? (now: " << now
-                    << "; renew after: " << sub_renew_after << ") -- "
-                    << (now > sub_renew_after ? "yes" : "no")
+      const bool maybe_renew = sub.need_renew();
+      ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
                     << dendl;
-      if (now > sub_renew_after)
+      if (maybe_renew) {
        _renew_subs();
+      }
     }
 
     cur_con->send_keepalive();
@@ -842,7 +837,7 @@ void MonClient::schedule_tick()
 void MonClient::_renew_subs()
 {
   assert(monc_lock.is_locked());
-  if (sub_new.empty()) {
+  if (!sub.have_new()) {
     ldout(cct, 10) << __func__ << " - empty" << dendl;
     return;
   }
@@ -851,33 +846,16 @@ void MonClient::_renew_subs()
   if (!_opened())
     _reopen_session();
   else {
-    if (sub_renew_sent == utime_t())
-      sub_renew_sent = ceph_clock_now();
-
     MMonSubscribe *m = new MMonSubscribe;
-    m->what = sub_new;
+    m->what = sub.get_subs();
     _send_mon_message(m);
-
-    // update sub_sent with sub_new
-    sub_new.insert(sub_sent.begin(), sub_sent.end());
-    std::swap(sub_new, sub_sent);
-    sub_new.clear();
+    sub.renewed();
   }
 }
 
 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
 {
-  if (sub_renew_sent != utime_t()) {
-    // NOTE: this is only needed for legacy (infernalis or older)
-    // mons; see tick().
-    sub_renew_after = sub_renew_sent;
-    sub_renew_after += m->interval / 2.0;
-    ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl;
-    sub_renew_sent = utime_t();
-  } else {
-    ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << ", ignoring" << dendl;
-  }
-
+  sub.acked(m->interval);
   m->put();
 }
 
index bffd2d916e67c83a150d52e8bfa161cb81253b34..cf765f6eb935c2a2ac725590b8f5508ae78d829b 100644 (file)
@@ -19,6 +19,7 @@
 #include "msg/Messenger.h"
 
 #include "MonMap.h"
+#include "MonSub.h"
 
 #include "common/Timer.h"
 #include "common/Finisher.h"
@@ -245,55 +246,12 @@ public:
    */
   void flush_log();
 
-  // mon subscriptions
 private:
-  map<string,ceph_mon_subscribe_item> sub_sent; // my subs, and current versions
-  map<string,ceph_mon_subscribe_item> sub_new;  // unsent new subs
-  utime_t sub_renew_sent, sub_renew_after;
-
+  // mon subscriptions
+  MonSub sub;
   void _renew_subs();
   void handle_subscribe_ack(MMonSubscribeAck* m);
 
-  bool _sub_want(const string &what, version_t start, unsigned flags) {
-    auto sub = sub_new.find(what);
-    if (sub != sub_new.end() &&
-        sub->second.start == start &&
-        sub->second.flags == flags) {
-      return false;
-    } else {
-      sub = sub_sent.find(what);
-      if (sub != sub_sent.end() &&
-         sub->second.start == start &&
-         sub->second.flags == flags)
-       return false;
-    }
-
-    sub_new[what].start = start;
-    sub_new[what].flags = flags;
-    return true;
-  }
-  void _sub_got(const string &what, version_t got) {
-    if (sub_new.count(what)) {
-      if (sub_new[what].start <= got) {
-       if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME)
-         sub_new.erase(what);
-       else
-         sub_new[what].start = got + 1;
-      }
-    } else if (sub_sent.count(what)) {
-      if (sub_sent[what].start <= got) {
-       if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME)
-         sub_sent.erase(what);
-       else
-         sub_sent[what].start = got + 1;
-      }
-    }
-  }
-  void _sub_unwant(const string &what) {
-    sub_sent.erase(what);
-    sub_new.erase(what);
-  }
-
 public:
   void renew_subs() {
     Mutex::Locker l(monc_lock);
@@ -301,39 +259,19 @@ public:
   }
   bool sub_want(string what, version_t start, unsigned flags) {
     Mutex::Locker l(monc_lock);
-    return _sub_want(what, start, flags);
+    return sub.want(what, start, flags);
   }
   void sub_got(string what, version_t have) {
     Mutex::Locker l(monc_lock);
-    _sub_got(what, have);
+    sub.got(what, have);
   }
   void sub_unwant(string what) {
     Mutex::Locker l(monc_lock);
-    _sub_unwant(what);
+    sub.unwant(what);
   }
-  /**
-   * Increase the requested subscription start point. If you do increase
-   * the value, apply the passed-in flags as well; otherwise do nothing.
-   */
   bool sub_want_increment(string what, version_t start, unsigned flags) {
     Mutex::Locker l(monc_lock);
-    map<string,ceph_mon_subscribe_item>::iterator i = sub_new.find(what);
-    if (i != sub_new.end()) {
-      if (i->second.start >= start)
-       return false;
-      i->second.start = start;
-      i->second.flags = flags;
-      return true;
-    }
-
-    i = sub_sent.find(what);
-    if (i == sub_sent.end() || i->second.start < start) {
-      ceph_mon_subscribe_item& item = sub_new[what];
-      item.start = start;
-      item.flags = flags;
-      return true;
-    }
-    return false;
+    return sub.inc_want(what, start, flags);
   }
   
   std::unique_ptr<KeyRing> keyring;
diff --git a/src/mon/MonSub.cc b/src/mon/MonSub.cc
new file mode 100644 (file)
index 0000000..a2c60ba
--- /dev/null
@@ -0,0 +1,114 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "MonSub.h"
+
+bool MonSub::have_new() const {
+  return !sub_new.empty();
+}
+
+bool MonSub::need_renew() const
+{
+  return ceph::coarse_mono_clock::now() > renew_after;
+}
+
+void MonSub::renewed()
+{
+  if (clock::is_zero(renew_sent)) {
+    renew_sent = clock::now();
+  }
+  // update sub_sent with sub_new
+  sub_new.insert(sub_sent.begin(), sub_sent.end());
+  std::swap(sub_new, sub_sent);
+  sub_new.clear();
+}
+
+void MonSub::acked(uint32_t interval)
+{
+  if (!clock::is_zero(renew_sent)) {
+    // NOTE: this is only needed for legacy (infernalis or older)
+    // mons; see MonClient::tick().
+    renew_after = renew_sent;
+    renew_after += ceph::make_timespan(interval / 2.0);
+    renew_sent = clock::zero();
+  }
+}
+
+bool MonSub::reload()
+{
+  for (auto& [what, sub] : sub_sent) {
+    if (sub_new.count(what) == 0) {
+      sub_new[what] = sub;
+    }
+  }
+  return have_new();
+}
+
+void MonSub::got(const std::string& what, version_t have)
+{
+  if (auto i = sub_new.find(what); i != sub_new.end()) {
+    auto& sub = i->second;
+    if (sub.start <= have) {
+      if (sub.flags & CEPH_SUBSCRIBE_ONETIME) {
+        sub_new.erase(i);
+      } else {
+        sub.start = have + 1;
+      }
+    }
+  } else if (auto i = sub_sent.find(what); i != sub_sent.end()) {
+    auto& sub = i->second;
+    if (sub.start <= have) {
+      if (sub.flags & CEPH_SUBSCRIBE_ONETIME) {
+        sub_sent.erase(i);
+      } else {
+        sub.start = have + 1;
+      }
+    }
+  }
+}
+
+bool MonSub::want(const std::string& what, version_t start, unsigned flags)
+{
+  if (auto sub = sub_new.find(what);
+      sub != sub_new.end() &&
+      sub->second.start == start &&
+      sub->second.flags == flags) {
+    return false;
+  } else if (auto sub = sub_sent.find(what);
+      sub != sub_sent.end() &&
+      sub->second.start == start &&
+      sub->second.flags == flags) {
+       return false;
+  } else {
+    sub_new[what].start = start;
+    sub_new[what].flags = flags;
+    return true;
+  }
+}
+
+bool MonSub::inc_want(const std::string& what, version_t start, unsigned flags)
+{
+  if (auto sub = sub_new.find(what); sub != sub_new.end()) {
+    if (sub->second.start >= start) {
+      return false;
+    } else {
+      sub->second.start = start;
+      sub->second.flags = flags;
+      return true;
+    }
+  } else if (auto sub = sub_sent.find(what);
+             sub == sub_sent.end() || sub->second.start < start) {
+    auto& item = sub_new[what];
+    item.start = start;
+    item.flags = flags;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+void MonSub::unwant(const std::string& what)
+{
+  sub_sent.erase(what);
+  sub_new.erase(what);
+}
diff --git a/src/mon/MonSub.h b/src/mon/MonSub.h
new file mode 100644 (file)
index 0000000..8ff5a8f
--- /dev/null
@@ -0,0 +1,46 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+#include <string>
+
+#include "common/ceph_time.h"
+#include "include/types.h"
+
+// mon subscriptions
+class MonSub
+{
+public:
+  // @returns true if there is any "new" subscriptions
+  bool have_new() const;
+  auto get_subs() const {
+    return sub_new;
+  }
+  bool need_renew() const;
+  // change the status of "new" subscriptions to "sent"
+  void renewed();
+  // the peer acked the subscription request
+  void acked(uint32_t interval);
+  void got(const std::string& what, version_t version);
+  // revert the status of subscriptions from "sent" to "new"
+  // @returns true if there is any pending "new" subscriptions
+  bool reload();
+  // add a new subscription
+  bool want(const std::string& what, version_t start, unsigned flags);
+  // increment the requested subscription start point. If you do increase
+  // the value, apply the passed-in flags as well; otherwise do nothing.
+  bool inc_want(const std::string& what, version_t start, unsigned flags);
+  // cancel a subscription
+  void unwant(const std::string& what);
+private:
+  // my subs, and current versions
+  std::map<std::string,ceph_mon_subscribe_item> sub_sent;
+  // unsent new subs
+  std::map<std::string,ceph_mon_subscribe_item> sub_new;
+  using time_point = ceph::coarse_mono_time;
+  using clock = typename time_point::clock;
+  time_point renew_sent;
+  time_point renew_after;
+};