]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mon/monclient: hunt for multiple monitor in parallel
authorKefu Chai <kchai@redhat.com>
Wed, 14 Sep 2016 13:39:04 +0000 (21:39 +0800)
committerKefu Chai <kchai@redhat.com>
Tue, 14 Feb 2017 04:58:22 +0000 (12:58 +0800)
* add an option "mon_client_hunt_parallel" for the maxmimum number of parallel
  hunting sessions.

Fixes: http://tracker.ceph.com/issues/16091
Signed-off-by: Steven Dieffenbach <sdieffen@redhat.com>
Signed-off-by: Kefu Chai <kchai@redhat.com>
12 files changed:
src/client/Client.cc
src/common/SimpleRNG.h [deleted file]
src/common/config_opts.h
src/librados/RadosClient.cc
src/mds/MDSDaemon.cc
src/mgr/DaemonServer.cc
src/mgr/MgrStandby.cc
src/mon/MonClient.cc
src/mon/MonClient.h
src/osd/OSD.cc
src/osdc/Objecter.cc
src/tools/cephfs/MDSUtility.cc

index d0ba9f9aa9570e11abf9908f9af49ebef7780b4c..c18f393110c36e968b4f694f365a3fa482fc8325 100644 (file)
@@ -12728,7 +12728,7 @@ bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool
 {
   if (dest_type == CEPH_ENTITY_TYPE_MON)
     return true;
-  *authorizer = monclient->auth->build_authorizer(dest_type);
+  *authorizer = monclient->build_authorizer(dest_type);
   return true;
 }
 
diff --git a/src/common/SimpleRNG.h b/src/common/SimpleRNG.h
deleted file mode 100644 (file)
index 70b008e..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef __CEPH_COMMON_SIMPLERNG_H_
-#define __CEPH_COMMON_SIMPLERNG_H_
-
-/*
- * rand() is not thread-safe.  random_r family segfaults.
- * boost::random::* have build issues.
- */
-class SimpleRNG {
-  unsigned m_z, m_w;
-
-public:
-  SimpleRNG(int seed) : m_z(seed), m_w(123) {}
-
-  unsigned operator()() {
-    m_z = 36969 * (m_z & 65535) + (m_z >> 16);
-    m_w = 18000 * (m_w & 65535) + (m_w >> 16);
-    return (m_z << 16) + m_w;
-  }
-};
-
-#endif
index 76c3f0ddd1607f1ba4cea0ebce4f0346311d1764..85bb3c2702cdee8f8a45f497a5c050b0a77d8c44 100644 (file)
@@ -398,6 +398,7 @@ OPTION(cephx_sign_messages, OPT_BOOL, true)  // Default to signing session messa
 OPTION(auth_mon_ticket_ttl, OPT_DOUBLE, 60*60*12)
 OPTION(auth_service_ticket_ttl, OPT_DOUBLE, 60*60)
 OPTION(auth_debug, OPT_BOOL, false)          // if true, assert when weird things happen
+OPTION(mon_client_hunt_parallel, OPT_U32, 2)   // how many mons to try to connect to in parallel during hunt
 OPTION(mon_client_hunt_interval, OPT_DOUBLE, 3.0)   // try new mon every N seconds until we connect
 OPTION(mon_client_ping_interval, OPT_DOUBLE, 10.0)  // ping every N seconds
 OPTION(mon_client_ping_timeout, OPT_DOUBLE, 30.0)   // fail if we don't hear back
index 2045516eafe6dfbad16d49f94a4856860bba0920..fc44ea9a40f13058377aba692e58fe062fccd919 100644 (file)
@@ -60,7 +60,7 @@ bool librados::RadosClient::ms_get_authorizer(int dest_type,
   /* monitor authorization is being handled on different layer */
   if (dest_type == CEPH_ENTITY_TYPE_MON)
     return true;
-  *authorizer = monclient.auth->build_authorizer(dest_type);
+  *authorizer = monclient.build_authorizer(dest_type);
   return *authorizer != NULL;
 }
 
index 58f59f60a6388532499e8786ea4586a61af09039..cc7e12b8b165fba83c1ebb5b92a1dcaacc071a20 100644 (file)
@@ -1185,7 +1185,7 @@ bool MDSDaemon::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bo
       return false;
   }
 
-  *authorizer = monc->auth->build_authorizer(dest_type);
+  *authorizer = monc->build_authorizer(dest_type);
   return *authorizer != NULL;
 }
 
index e3f327c0ecc19f9e965f14ba1e929ced72978c50..798b01ec140ddd5678a2cc8980f3754595d0c9a0 100644 (file)
@@ -117,7 +117,7 @@ bool DaemonServer::ms_get_authorizer(int dest_type,
       return false;
   }
 
-  *authorizer = monc->auth->build_authorizer(dest_type);
+  *authorizer = monc->build_authorizer(dest_type);
   dout(20) << "got authorizer " << *authorizer << dendl;
   return *authorizer != NULL;
 }
index 7a87e60886b7441ac4c2612a67b9ca4f8ea8a525..92cb8013a7f7507779b89b8b3dcd7eee2e67f37a 100644 (file)
@@ -211,7 +211,7 @@ bool MgrStandby::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
       return false;
   }
 
-  *authorizer = monc->auth->build_authorizer(dest_type);
+  *authorizer = monc->build_authorizer(dest_type);
   return *authorizer != NULL;
 }
 
index a00fda322f22f32bccc3b5df25156e1236c93ac7..1107e33eaedd8f2594beeac4e1d07686cc354a4b 100644 (file)
@@ -12,6 +12,8 @@
  * 
  */
 
+#include <random>
+
 #include "messages/MMonGetMap.h"
 #include "messages/MMonGetVersion.h"
 #include "messages/MMonGetVersionReply.h"
 
 #define dout_subsys ceph_subsys_monc
 #undef dout_prefix
-#define dout_prefix *_dout << "monclient" << (hunting ? "(hunting)":"") << ": "
+#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
 
 MonClient::MonClient(CephContext *cct_) :
   Dispatcher(cct_),
-  state(MC_STATE_NONE),
   messenger(NULL),
-  cur_con(NULL),
-  rng(getpid()),
   monc_lock("MonClient::monc_lock"),
   timer(cct_, monc_lock), finisher(cct_),
   initialized(false),
   no_keyring_disabled_cephx(false),
   log_client(NULL),
   more_log_pending(false),
-  hunting(true),
   want_monmap(true),
-  want_keys(0), global_id(0),
-  authenticate_err(0),
   had_a_connection(false),
   reopen_interval_multiplier(1.0),
   last_mon_command_tid(0),
@@ -80,7 +76,7 @@ int MonClient::get_monmap()
   Mutex::Locker l(monc_lock);
   
   _sub_want("monmap", 0, 0);
-  if (cur_mon.empty())
+  if (!_opened())
     _reopen_session();
 
   while (want_monmap)
@@ -111,14 +107,17 @@ int MonClient::get_monmap_privately()
 
   ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl;
 
+  std::random_device rd;
+  std::mt19937 rng(rd());
+  assert(monmap.size() > 0);
+  std::uniform_int_distribution<unsigned> ranks(0, monmap.size() - 1);
   while (monmap.fsid.is_zero()) {
-    cur_mon = _pick_random_mon();
-    cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
-    if (cur_con) {
-      ldout(cct, 10) << "querying mon." << cur_mon << " "
-                    << cur_con->get_peer_addr() << dendl;
-      cur_con->send_message(new MMonGetMap);
-    }
+    auto rank = ranks(rng);
+    auto& pending_con = _add_conn(rank);
+    auto con = pending_con.get_con();
+    ldout(cct, 10) << "querying mon." << monmap.get_name(rank) << " "
+                  << con->get_peer_addr() << dendl;
+    con->send_message(new MMonGetMap);
 
     if (--attempt == 0)
       break;
@@ -127,17 +126,13 @@ int MonClient::get_monmap_privately()
     interval.set_from_double(cct->_conf->mon_client_hunt_interval);
     map_cond.WaitInterval(monc_lock, interval);
 
-    if (monmap.fsid.is_zero() && cur_con) {
-      cur_con->mark_down();  // nope, clean that connection up
+    if (monmap.fsid.is_zero() && con) {
+      con->mark_down();  // nope, clean that connection up
     }
   }
 
   if (temp_msgr) {
-    if (cur_con) {
-      cur_con->mark_down();
-      cur_con.reset(NULL);
-      cur_mon.clear();
-    }
+    pending_cons.clear();
     monc_lock.Unlock();
     messenger->shutdown();
     if (smessenger)
@@ -147,9 +142,7 @@ int MonClient::get_monmap_privately()
     monc_lock.Lock();
   }
 
-  hunting = true;  // reset this to true!
-  cur_mon.clear();
-  cur_con.reset(NULL);
+  pending_cons.clear();
 
   if (!monmap.fsid.is_zero())
     return 0;
@@ -254,8 +247,15 @@ bool MonClient::ms_dispatch(Message *m)
 
   Mutex::Locker lock(monc_lock);
 
-  // ignore any messages outside our current session
-  if (m->get_connection() != cur_con) {
+  if (_hunting()) {
+    if (!pending_cons.count(m->get_source_addr())) {
+      // ignore any messages outside hunting sessions
+      ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
+      m->put();
+      return true;
+    }
+  } else if (!active_con || active_con->get_con() != m->get_connection()) {
+    // ignore any messages outside our session(s)
     ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
     m->put();
     return true;
@@ -311,10 +311,12 @@ void MonClient::flush_log()
 void MonClient::handle_monmap(MMonMap *m)
 {
   ldout(cct, 10) << __func__ << " " << *m << dendl;
+  auto peer = m->get_source_addr();
+  string cur_mon = monmap.get_name(peer);
+
   bufferlist::iterator p = m->monmapbl.begin();
   ::decode(monmap, p);
 
-  assert(!cur_mon.empty());
   ldout(cct, 10) << " got monmap " << monmap.epoch
                 << ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon)
                 << dendl;
@@ -324,9 +326,10 @@ void MonClient::handle_monmap(MMonMap *m)
 
   _sub_got("monmap", monmap.get_epoch());
 
-  if (!monmap.get_addr_name(cur_con->get_peer_addr(), cur_mon)) {
+  if (!monmap.get_addr_name(peer, cur_mon)) {
     ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl;
-    _reopen_session();  // can't find the mon we were talking to (above)
+    // can't find the mon we were talking to (above)
+    _reopen_session();
   }
 
   map_cond.Signal();
@@ -409,10 +412,9 @@ void MonClient::shutdown()
     waiting_for_session.pop_front();
   }
 
-  if (cur_con)
-    cur_con->mark_down();
-  cur_con.reset(NULL);
-  cur_mon.clear();
+  active_con.reset();
+  pending_cons.clear();
+  auth.reset();
 
   monc_lock.Unlock();
 
@@ -430,20 +432,20 @@ int MonClient::authenticate(double timeout)
 {
   Mutex::Locker lock(monc_lock);
 
-  if (state == MC_STATE_HAVE_SESSION) {
+  if (active_con) {
     ldout(cct, 5) << "already authenticated" << dendl;
     return 0;
   }
 
   _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
-  if (cur_mon.empty())
+  if (!_opened())
     _reopen_session();
 
   utime_t until = ceph_clock_now();
   until += timeout;
   if (timeout > 0.0)
     ldout(cct, 10) << "authenticate will time out at " << until << dendl;
-  while (state != MC_STATE_HAVE_SESSION && !authenticate_err) {
+  while (!active_con && !authenticate_err) {
     if (timeout > 0.0) {
       int r = auth_cond.WaitUntil(monc_lock, until);
       if (r == ETIMEDOUT) {
@@ -455,8 +457,9 @@ int MonClient::authenticate(double timeout)
     }
   }
 
-  if (state == MC_STATE_HAVE_SESSION) {
-    ldout(cct, 5) << __func__ << " success, global_id " << global_id << dendl;
+  if (active_con) {
+    ldout(cct, 5) << __func__ << " success, global_id "
+                 << active_con->get_global_id() << dendl;
   }
 
   if (authenticate_err < 0 && no_keyring_disabled_cephx) {
@@ -468,104 +471,94 @@ int MonClient::authenticate(double timeout)
 
 void MonClient::handle_auth(MAuthReply *m)
 {
-  Context *cb = NULL;
-  bufferlist::iterator p = m->result_bl.begin();
-  if (state == MC_STATE_NEGOTIATING) {
-    if (!auth || (int)m->protocol != auth->get_protocol()) {
-      auth.reset(get_auth_client_handler(cct, m->protocol,
-                                        rotating_secrets.get()));
-      if (!auth) {
-       ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl;
-       if (m->result == -ENOTSUP) {
-         ldout(cct, 10) << "none of our auth protocols are supported by the server"
-                        << dendl;
-         authenticate_err = m->result;
-         auth_cond.SignalAll();
-       }
-       m->put();
-       return;
-      }
-      // do not request MGR key unless the mon has the SERVER_KRAKEN
-      // feature.  otherwise it will give us an auth error.  note that
-      // we have to use the FEATUREMASK because pre-jewel the kraken
-      // feature bit was used for something else.
-      if ((want_keys & CEPH_ENTITY_TYPE_MGR) &&
-         !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
-       ldout(cct, 1) << __func__
-                     << " not requesting MGR keys from pre-kraken monitor"
-                     << dendl;
-       want_keys &= ~CEPH_ENTITY_TYPE_MGR;
-      }
-      auth->set_want_keys(want_keys);
-      auth->init(entity_name);
-      auth->set_global_id(global_id);
-    } else {
-      auth->reset();
+  assert(monc_lock.is_locked());
+  if (!_hunting()) {
+    std::swap(active_con->get_auth(), auth);
+    int ret = active_con->authenticate(m);
+    m->put();
+    std::swap(auth, active_con->get_auth());
+    if (ret != -EAGAIN) {
+      _finish_auth(ret);
     }
-    state = MC_STATE_AUTHENTICATING;
-  }
-  assert(auth);
-  if (m->global_id && m->global_id != global_id) {
-    global_id = m->global_id;
-    auth->set_global_id(global_id);
-    ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
+    return;
   }
 
-  int ret = auth->handle_response(m->result, p);
+  // hunting
+  auto found = pending_cons.find(m->get_source_addr());
+  assert(found != pending_cons.end());
+  int auth_err = found->second.handle_auth(m, entity_name, want_keys,
+                                          rotating_secrets.get());
   m->put();
-
-  if (ret == -EAGAIN) {
-    MAuth *ma = new MAuth;
-    ma->protocol = auth->get_protocol();
-    auth->prepare_build_request();
-    ret = auth->build_request(ma->auth_payload);
-    _send_mon_message(ma, true);
+  if (auth_err == -EAGAIN) {
     return;
   }
+  if (auth_err) {
+    pending_cons.erase(found);
+    if (!pending_cons.empty()) {
+      // keep trying with pending connections
+      return;
+    }
+    // the last try just failed, give up.
+  } else {
+    auto& mc = found->second;
+    assert(mc.have_session());
+    active_con.reset(new MonConnection(std::move(mc)));
+    pending_cons.clear();
+  }
 
   _finish_hunting();
 
-  authenticate_err = ret;
-  if (ret == 0) {
-    if (state != MC_STATE_HAVE_SESSION) {
-      state = MC_STATE_HAVE_SESSION;
-      last_rotating_renew_sent = utime_t();
-      while (!waiting_for_session.empty()) {
-       _send_mon_message(waiting_for_session.front());
-       waiting_for_session.pop_front();
-      }
+  if (!auth_err) {
+    last_rotating_renew_sent = utime_t();
+    while (!waiting_for_session.empty()) {
+      _send_mon_message(waiting_for_session.front());
+      waiting_for_session.pop_front();
+    }
 
-      _resend_mon_commands();
+    _resend_mon_commands();
 
-      if (log_client) {
-       log_client->reset_session();
-       send_log();
-      }
-      if (session_established_context) {
-        cb = session_established_context.release();
-      }
+    if (log_client) {
+      log_client->reset_session();
+      send_log();
     }
-  
-    _check_auth_tickets();
+    if (active_con)
+      std::swap(auth, active_con->get_auth());
   }
-  auth_cond.SignalAll();
-  if (cb) {
-    monc_lock.Unlock();
-    cb->complete(0);
-    monc_lock.Lock();
+  _finish_auth(auth_err);
+  if (!auth_err) {
+    Context *cb = nullptr;
+    if (session_established_context) {
+      cb = session_established_context.release();
+    }
+    if (cb) {
+      monc_lock.Unlock();
+      cb->complete(0);
+      monc_lock.Lock();
+    }
   }
 }
 
+void MonClient::_finish_auth(int auth_err)
+{
+  authenticate_err = auth_err;
+  // _resend_mon_commands() could _reopen_session() if the connected mon is not
+  // the one the MonCommand is targeting.
+  if (!auth_err && active_con) {
+    assert(auth);
+    _check_auth_tickets();
+  }
+  auth_cond.SignalAll();
+}
 
 // ---------
 
-void MonClient::_send_mon_message(Message *m, bool force)
+void MonClient::_send_mon_message(Message *m)
 {
   assert(monc_lock.is_locked());
-  assert(!cur_mon.empty());
-  if (force || state == MC_STATE_HAVE_SESSION) {
-    assert(cur_con);
-    ldout(cct, 10) << "_send_mon_message to mon." << cur_mon
+  if (active_con) {
+    auto cur_con = active_con->get_con();
+    ldout(cct, 10) << "_send_mon_message to mon."
+                  << monmap.get_name(cur_con->get_peer_addr())
                   << " at " << cur_con->get_peer_addr() << dendl;
     cur_con->send_message(m);
   } else {
@@ -573,49 +566,24 @@ void MonClient::_send_mon_message(Message *m, bool force)
   }
 }
 
-string MonClient::_pick_random_mon()
-{
-  assert(monmap.size() > 0);
-  if (monmap.size() == 1) {
-    return monmap.get_name(0);
-  } else {
-    int max = monmap.size();
-    int o = -1;
-    if (!cur_mon.empty()) {
-      o = monmap.get_rank(cur_mon);
-      if (o >= 0)
-       max--;
-    }
-
-    int32_t n = rng() % max;
-    if (o >= 0 && n >= o)
-      n++;
-    return monmap.get_name(n);
-  }
-}
-
 void MonClient::_reopen_session(int rank, string name)
 {
   assert(monc_lock.is_locked());
   ldout(cct, 10) << __func__ << " rank " << rank << " name " << name << dendl;
 
-  if (rank < 0 && name.length() == 0) {
-    cur_mon = _pick_random_mon();
-  } else if (name.length()) {
-    cur_mon = name;
+  active_con.reset();
+  pending_cons.clear();
+
+  _start_hunting();
+  
+  if (name.length()) {
+    _add_conn(monmap.get_rank(name));
+  } else if (rank >= 0) {
+    _add_conn(rank);
   } else {
-    cur_mon = monmap.get_name(rank);
+    _add_conns();
   }
 
-  if (cur_con) {
-    cur_con->mark_down();
-  }
-  cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
-       
-  ldout(cct, 10) << "picked mon." << cur_mon << " con " << cur_con
-                << " addr " << cur_con->get_peer_addr()
-                << dendl;
-
   // throw out old queued messages
   while (!waiting_for_session.empty()) {
     waiting_for_session.front()->put();
@@ -629,34 +597,10 @@ void MonClient::_reopen_session(int rank, string name)
     version_requests.erase(version_requests.begin());
   }
 
-  // adjust timeouts if necessary
-  if (had_a_connection) {
-    reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
-    if (reopen_interval_multiplier >
-          cct->_conf->mon_client_hunt_interval_max_multiple)
-      reopen_interval_multiplier =
-          cct->_conf->mon_client_hunt_interval_max_multiple;
+  for (auto& c : pending_cons) {
+    c.second.start(monmap.get_epoch(), entity_name, *auth_supported);
   }
 
-  // restart authentication handshake
-  state = MC_STATE_NEGOTIATING;
-  hunting = true;
-
-  // send an initial keepalive to ensure our timestamp is valid by the
-  // time we are in an OPENED state (by sequencing this before
-  // authentication).
-  cur_con->send_keepalive();
-
-  MAuth *m = new MAuth;
-  m->protocol = 0;
-  m->monmap_epoch = monmap.get_epoch();
-  __u8 struct_v = 1;
-  ::encode(struct_v, m->auth_payload);
-  ::encode(auth_supported->get_supported_set(), m->auth_payload);
-  ::encode(entity_name, m->auth_payload);
-  ::encode(global_id, m->auth_payload);
-  _send_mon_message(m, true);
-
   for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin();
        p != sub_sent.end();
        ++p) {
@@ -667,38 +611,106 @@ void MonClient::_reopen_session(int rank, string name)
     _renew_subs();
 }
 
+MonConnection& MonClient::_add_conn(unsigned rank)
+{
+  auto peer = monmap.get_addr(rank);
+  auto conn = messenger->get_connection(monmap.get_inst(rank));
+  MonConnection mc(cct, conn);
+  auto inserted = pending_cons.insert(move(make_pair(peer, move(mc))));
+  ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
+                 << " con " << conn
+                 << " addr " << conn->get_peer_addr()
+                 << dendl;
+  return inserted.first->second;
+}
+
+void MonClient::_add_conns()
+{
+  unsigned n = cct->_conf->mon_client_hunt_parallel;
+  if (n == 0 || n > monmap.size()) {
+    n = monmap.size();
+  }
+  vector<unsigned> ranks(n);
+  for (unsigned i = 0; i < n; i++) {
+    ranks[i] = i;
+  }
+  std::random_device rd;
+  std::mt19937 rng(rd());
+  std::shuffle(ranks.begin(), ranks.end(), rng);
+  for (unsigned i = 0; i < n; i++) {
+    _add_conn(ranks[i]);
+  }
+}
 
 bool MonClient::ms_handle_reset(Connection *con)
 {
   Mutex::Locker lock(monc_lock);
 
-  if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
-    if (cur_mon.empty() || con != cur_con) {
-      ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl;
-      return true;
+  if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
+    return false;
+
+  if (_hunting()) {
+    if (pending_cons.count(con->get_peer_addr())) {
+      ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addr() << dendl;
     } else {
+      ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl;
+    }
+    return true;
+  } else {
+    if (active_con && con == active_con->get_con()) {
       ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addr() << dendl;
-      if (hunting)
-       return true;
-      
-      ldout(cct, 0) << "hunting for new mon" << dendl;
       _reopen_session();
+      return false;
+    } else {
+      ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl;
+      return true;
     }
   }
-  return false;
+}
+
+bool MonClient::_opened() const
+{
+  assert(monc_lock.is_locked());
+  return active_con || _hunting();
+}
+
+bool MonClient::_hunting() const
+{
+  return !pending_cons.empty();
+}
+
+void MonClient::_start_hunting()
+{
+  assert(!_hunting());
+  // adjust timeouts if necessary
+  if (!had_a_connection)
+    return;
+  reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
+  if (reopen_interval_multiplier >
+      cct->_conf->mon_client_hunt_interval_max_multiple) {
+    reopen_interval_multiplier =
+      cct->_conf->mon_client_hunt_interval_max_multiple;
+  }
 }
 
 void MonClient::_finish_hunting()
 {
   assert(monc_lock.is_locked());
-  if (hunting) {
-    ldout(cct, 1) << "found mon." << cur_mon << dendl; 
-    hunting = false;
-    had_a_connection = true;
-    reopen_interval_multiplier /= 2.0;
-    if (reopen_interval_multiplier < 1.0)
-      reopen_interval_multiplier = 1.0;
+  // the pending conns have been cleaned.
+  assert(!_hunting());
+  if (active_con) {
+    auto con = active_con->get_con();
+    ldout(cct, 1) << "found mon."
+                 << monmap.get_name(con->get_peer_addr())
+                 << dendl;
+  } else {
+    ldout(cct, 1) << "no mon sessions established" << dendl;
   }
+
+  had_a_connection = true;
+  reopen_interval_multiplier /= 2.0;
+  if (reopen_interval_multiplier < 1.0)
+    reopen_interval_multiplier = 1.0;
 }
 
 void MonClient::tick()
@@ -707,12 +719,13 @@ void MonClient::tick()
 
   _check_auth_tickets();
   
-  if (hunting) {
+  if (_hunting()) {
     ldout(cct, 1) << "continuing hunt" << dendl;
     _reopen_session();
-  } else if (!cur_mon.empty()) {
+  } else if (active_con) {
     // just renew as needed
     utime_t now = ceph_clock_now();
+    auto cur_con = active_con->get_con();
     if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
       ldout(cct, 10) << "renew subs? (now: " << now
                     << "; renew after: " << sub_renew_after << ") -- "
@@ -724,16 +737,14 @@ void MonClient::tick()
 
     cur_con->send_keepalive();
 
-    if (state == MC_STATE_HAVE_SESSION) {
-      if (cct->_conf->mon_client_ping_timeout > 0 &&
-         cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
-       utime_t lk = cur_con->get_last_keepalive_ack();
-       utime_t interval = now - lk;
-       if (interval > cct->_conf->mon_client_ping_timeout) {
-         ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
-                       << " seconds), reconnecting" << dendl;
-         _reopen_session();
-       }
+    if (cct->_conf->mon_client_ping_timeout > 0 &&
+       cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+      utime_t lk = cur_con->get_last_keepalive_ack();
+      utime_t interval = now - lk;
+      if (interval > cct->_conf->mon_client_ping_timeout) {
+       ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
+                     << " seconds), reconnecting" << dendl;
+       _reopen_session();
       }
 
       send_log();
@@ -753,10 +764,11 @@ void MonClient::schedule_tick()
     }
   };
 
-  if (hunting)
+  if (_hunting()) {
     timer.add_event_after(cct->_conf->mon_client_hunt_interval
-                          * reopen_interval_multiplier, new C_Tick(this));
-  else
+                         * reopen_interval_multiplier,
+                         new C_Tick(this));
+  } else
     timer.add_event_after(cct->_conf->mon_client_ping_interval, new C_Tick(this));
 }
 
@@ -771,7 +783,7 @@ void MonClient::_renew_subs()
   }
 
   ldout(cct, 10) << __func__ << dendl;
-  if (cur_mon.empty())
+  if (!_opened())
     _reopen_session();
   else {
     if (sub_renew_sent == utime_t())
@@ -807,7 +819,7 @@ void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
 int MonClient::_check_auth_tickets()
 {
   assert(monc_lock.is_locked());
-  if (state == MC_STATE_HAVE_SESSION && auth) {
+  if (active_con && auth) {
     if (auth->need_tickets()) {
       ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
       MAuth *m = new MAuth;
@@ -831,7 +843,7 @@ int MonClient::_check_auth_rotating()
     return 0;
   }
 
-  if (!auth || state != MC_STATE_HAVE_SESSION) {
+  if (!active_con || !auth) {
     ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
     return 0;
   }
@@ -902,8 +914,13 @@ int MonClient::wait_auth_rotating(double timeout)
 
 void MonClient::_send_command(MonCommand *r)
 {
+  entity_addr_t peer;
+  if (active_con) {
+    peer = active_con->get_con()->get_peer_addr();
+  }
+
   if (r->target_rank >= 0 &&
-      r->target_rank != monmap.get_rank(cur_mon)) {
+      r->target_rank != monmap.get_rank(peer)) {
     ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
                   << " wants rank " << r->target_rank
                   << ", reopening session"
@@ -918,7 +935,7 @@ void MonClient::_send_command(MonCommand *r)
   }
 
   if (r->target_name.length() &&
-      r->target_name != cur_mon) {
+      r->target_name != monmap.get_name(peer)) {
     ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
                   << " wants mon " << r->target_name
                   << ", reopening session"
@@ -1111,3 +1128,138 @@ void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
   }
   m->put();
 }
+
+AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
+  Mutex::Locker l(monc_lock);
+  assert(auth || active_con->get_auth());
+  if (auth)
+    return auth->build_authorizer(service_id);
+  else
+    return active_con->get_auth()->build_authorizer(service_id);
+}
+
+#define dout_subsys ceph_subsys_monc
+#undef dout_prefix
+#define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
+
+MonConnection::MonConnection(CephContext *cct, ConnectionRef con)
+  : cct(cct), con(con)
+{}
+
+MonConnection::~MonConnection()
+{
+  if (con) {
+    con->mark_down();
+    con.reset();
+  }
+}
+
+bool MonConnection::have_session() const
+{
+  return state == State::HAVE_SESSION;
+}
+
+void MonConnection::start(epoch_t epoch,
+                         const EntityName& entity_name,
+                         const AuthMethodList& auth_supported)
+{
+  // restart authentication handshake
+  state = State::NEGOTIATING;
+
+  // send an initial keepalive to ensure our timestamp is valid by the
+  // time we are in an OPENED state (by sequencing this before
+  // authentication).
+  con->send_keepalive();
+
+  auto m = new MAuth;
+  m->protocol = 0;
+  m->monmap_epoch = epoch;
+  __u8 struct_v = 1;
+  ::encode(struct_v, m->auth_payload);
+  ::encode(auth_supported.get_supported_set(), m->auth_payload);
+  ::encode(entity_name, m->auth_payload);
+  ::encode(global_id, m->auth_payload);
+  con->send_message(m);
+}
+
+int MonConnection::handle_auth(MAuthReply* m,
+                              const EntityName& entity_name,
+                              uint32_t want_keys,
+                              RotatingKeyRing* keyring)
+{
+  if (state == State::NEGOTIATING) {
+    int r = _negotiate(m, entity_name, want_keys, keyring);
+    if (r) {
+      return r;
+    }
+    state = State::AUTHENTICATING;
+  }
+  int r = authenticate(m);
+  if (!r) {
+    state = State::HAVE_SESSION;
+  }
+  return r;
+}
+
+int MonConnection::_negotiate(MAuthReply *m,
+                             const EntityName& entity_name,
+                             uint32_t want_keys,
+                             RotatingKeyRing* keyring)
+{
+  if (auth && (int)m->protocol == auth->get_protocol()) {
+    // good, negotiation completed
+    auth->reset();
+    return 0;
+  }
+
+  auth.reset(get_auth_client_handler(cct, m->protocol, keyring));
+  if (!auth) {
+    ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl;
+    if (m->result == -ENOTSUP) {
+      ldout(cct, 10) << "none of our auth protocols are supported by the server"
+                    << dendl;
+    }
+    return m->result;
+  }
+
+  // do not request MGR key unless the mon has the SERVER_KRAKEN
+  // feature.  otherwise it will give us an auth error.  note that
+  // we have to use the FEATUREMASK because pre-jewel the kraken
+  // feature bit was used for something else.
+  if ((want_keys & CEPH_ENTITY_TYPE_MGR) &&
+      !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
+    ldout(cct, 1) << __func__
+                 << " not requesting MGR keys from pre-kraken monitor"
+                 << dendl;
+    want_keys &= ~CEPH_ENTITY_TYPE_MGR;
+  }
+  auth->set_want_keys(want_keys);
+  auth->init(entity_name);
+  auth->set_global_id(global_id);
+  return 0;
+}
+
+int MonConnection::authenticate(MAuthReply *m)
+{
+  assert(auth);
+  if (!m->global_id) {
+    ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
+  }
+  if (m->global_id != global_id) {
+    // it's a new session
+    auth->reset();
+    global_id = m->global_id;
+    auth->set_global_id(global_id);
+    ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
+  }
+  auto p = m->result_bl.begin();
+  int ret = auth->handle_response(m->result, p);
+  if (ret == -EAGAIN) {
+    auto ma = new MAuth;
+    ma->protocol = auth->get_protocol();
+    auth->prepare_build_request();
+    auth->build_request(ma->auth_payload);
+    con->send_message(ma);
+  }
+  return ret;
+}
index 7e1d9da9b82dbe6d0bbbfe3bd62fdadf5cb9b20c..6081779721be02dba6178ccd27dad4d76ea0a550 100644 (file)
@@ -15,6 +15,8 @@
 #ifndef CEPH_MONCLIENT_H
 #define CEPH_MONCLIENT_H
 
+#include <memory>
+
 #include "msg/Messenger.h"
 
 #include "MonMap.h"
 #include "common/config.h"
 #include "auth/AuthClientHandler.h"
 #include "auth/RotatingKeyRing.h"
-#include "common/SimpleRNG.h"
 
 
 class MMonMap;
-class MMonGetVersion;
 class MMonGetVersionReply;
 struct MMonSubscribeAck;
 class MMonCommandAck;
-class MCommandReply;
 struct MAuthReply;
-class MPing;
+class MAuthRotating;
 class LogClient;
+struct AuthAuthorizer;
 class AuthMethodList;
-class Messenger;
+class AuthClientHandler;
 // class RotatingKeyRing;
 class KeyRing;
-enum MonClientState {
-  MC_STATE_NONE,
-  MC_STATE_NEGOTIATING,
-  MC_STATE_AUTHENTICATING,
-  MC_STATE_HAVE_SESSION,
-};
 
 struct MonClientPinger : public Dispatcher {
 
@@ -102,18 +96,63 @@ struct MonClientPinger : public Dispatcher {
   }
 };
 
+class MonConnection {
+public:
+  MonConnection(CephContext *cct,
+               ConnectionRef conn);
+  ~MonConnection();
+  MonConnection(MonConnection&& rhs) = default;
+  MonConnection& operator=(MonConnection&&) = default;
+  MonConnection(const MonConnection& rhs) = delete;
+  MonConnection& operator=(const MonConnection&) = delete;
+  int handle_auth(MAuthReply *m,
+                 const EntityName& entity_name,
+                 uint32_t want_keys,
+                 RotatingKeyRing* keyring);
+  int authenticate(MAuthReply *m);
+  void start(epoch_t epoch,
+             const EntityName& entity_name,
+             const AuthMethodList& auth_supported);
+  bool have_session() const;
+  uint64_t get_global_id() const {
+    return global_id;
+  }
+  ConnectionRef get_con() {
+    return con;
+  }
+  std::unique_ptr<AuthClientHandler>& get_auth() {
+    return auth;
+  }
+
+private:
+  int _negotiate(MAuthReply *m,
+                const EntityName& entity_name,
+                uint32_t want_keys,
+                RotatingKeyRing* keyring);
+
+private:
+  CephContext *cct;
+  enum class State {
+    NONE,
+    NEGOTIATING,
+    AUTHENTICATING,
+    HAVE_SESSION,
+  };
+  State state = State::NONE;
+  ConnectionRef con;
+
+  std::unique_ptr<AuthClientHandler> auth;
+  uint64_t global_id = 0;
+};
+
 class MonClient : public Dispatcher {
 public:
   MonMap monmap;
 private:
-  MonClientState state;
-
   Messenger *messenger;
 
-  string cur_mon;
-  ConnectionRef cur_con;
-
-  SimpleRNG rng;
+  std::unique_ptr<MonConnection> active_con;
+  std::map<entity_addr_t, MonConnection> pending_cons;
 
   EntityName entity_name;
 
@@ -143,24 +182,18 @@ private:
   void handle_auth(MAuthReply *m);
 
   // monitor session
-  bool hunting;
-
   void tick();
   void schedule_tick();
 
-  Cond auth_cond;
-
   // monclient
   bool want_monmap;
-
-  uint32_t want_keys;
-
-  uint64_t global_id;
-
-  // authenticate
-private:
   Cond map_cond;
-  int authenticate_err;
+private:
+  // authenticate
+  std::unique_ptr<AuthClientHandler> auth;
+  uint32_t want_keys = 0;
+  Cond auth_cond;
+  int authenticate_err = 0;
 
   list<Message*> waiting_for_session;
   utime_t last_rotating_renew_sent;
@@ -168,13 +201,18 @@ private:
   bool had_a_connection;
   double reopen_interval_multiplier;
 
-  string _pick_random_mon();
+  bool _opened() const;
+  bool _hunting() const;
+  void _start_hunting();
   void _finish_hunting();
+  void _finish_auth(int auth_err);
   void _reopen_session(int rank, string name);
   void _reopen_session() {
     _reopen_session(-1, string());
   }
-  void _send_mon_message(Message *m, bool force=false);
+  MonConnection& _add_conn(unsigned rank);
+  void _add_conns();
+  void _send_mon_message(Message *m);
 
 public:
   void set_entity_name(EntityName name) { entity_name = name; }
@@ -241,9 +279,6 @@ private:
     sub_new.erase(what);
   }
 
-  // auth tickets
-public:
-  std::unique_ptr<AuthClientHandler> auth;
 public:
   void renew_subs() {
     Mutex::Locker l(monc_lock);
@@ -362,20 +397,20 @@ public:
   }
 
   uint64_t get_global_id() const {
-    return global_id;
+    Mutex::Locker l(monc_lock);
+    if (active_con) {
+      return active_con->get_global_id();
+    } else {
+      return 0;
+    }
   }
 
   void set_messenger(Messenger *m) { messenger = m; }
   entity_addr_t get_myaddr() const { return messenger->get_myaddr(); }
-
-  void send_auth_message(Message *m) {
-    _send_mon_message(m, true);
-  }
+  AuthAuthorizer* build_authorizer(int service_id) const;
 
   void set_want_keys(uint32_t want) {
     want_keys = want;
-    if (auth)
-      auth->set_want_keys(want | CEPH_ENTITY_TYPE_MON);
   }
 
   // admin commands
@@ -404,6 +439,7 @@ private:
   void _resend_mon_commands();
   int _cancel_mon_command(uint64_t tid, int r);
   void _finish_command(MonCommand *r, int ret, string rs);
+  void _finish_auth();
   void handle_mon_command_ack(MMonCommandAck *ack);
 
 public:
index c101a2b8d1cac3f1a8cd2021394c5fe677d2032d..cc0329a811af9a1f4c465eb001ac3297ec3c1848 100644 (file)
@@ -6206,7 +6206,7 @@ bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool for
     }
   }
 
-  *authorizer = monc->auth->build_authorizer(dest_type);
+  *authorizer = monc->build_authorizer(dest_type);
   return *authorizer != NULL;
 }
 
index cdeff1038346456164987346cebcf30bb9b165ef..fb8fcc862219ede42c7ad893137103bc4f375731 100644 (file)
@@ -4420,7 +4420,7 @@ bool Objecter::ms_get_authorizer(int dest_type,
     return false;
   if (dest_type == CEPH_ENTITY_TYPE_MON)
     return true;
-  *authorizer = monc->auth->build_authorizer(dest_type);
+  *authorizer = monc->build_authorizer(dest_type);
   return *authorizer != NULL;
 }
 
index 6ee89e8c86b2d44ea4fd44680aeb91bce8fdf96d..1af0b788bfc86acfdc1ecd47b35e3cd7da8e7728 100644 (file)
@@ -165,6 +165,6 @@ bool MDSUtility::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
       return false;
   }
 
-  *authorizer = monc->auth->build_authorizer(dest_type);
+  *authorizer = monc->build_authorizer(dest_type);
   return *authorizer != NULL;
 }