From: Kefu Chai Date: Wed, 14 Sep 2016 13:39:04 +0000 (+0800) Subject: mon/monclient: hunt for multiple monitor in parallel X-Git-Tag: v12.0.1~412^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a2eb6ae3fb57b09efdd4d7baac6871ca8dd8e79f;p=ceph-ci.git mon/monclient: hunt for multiple monitor in parallel * add an option "mon_client_hunt_parallel" for the maxmimum number of parallel hunting sessions. Fixes: http://tracker.ceph.com/issues/16091 Signed-off-by: Steven Dieffenbach Signed-off-by: Kefu Chai --- diff --git a/src/client/Client.cc b/src/client/Client.cc index d0ba9f9aa95..c18f393110c 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -12728,7 +12728,7 @@ bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool { if (dest_type == CEPH_ENTITY_TYPE_MON) return true; - *authorizer = monclient->auth->build_authorizer(dest_type); + *authorizer = monclient->build_authorizer(dest_type); return true; } diff --git a/src/common/SimpleRNG.h b/src/common/SimpleRNG.h deleted file mode 100644 index 70b008ebc96..00000000000 --- a/src/common/SimpleRNG.h +++ /dev/null @@ -1,24 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef __CEPH_COMMON_SIMPLERNG_H_ -#define __CEPH_COMMON_SIMPLERNG_H_ - -/* - * rand() is not thread-safe. random_r family segfaults. - * boost::random::* have build issues. - */ -class SimpleRNG { - unsigned m_z, m_w; - -public: - SimpleRNG(int seed) : m_z(seed), m_w(123) {} - - unsigned operator()() { - m_z = 36969 * (m_z & 65535) + (m_z >> 16); - m_w = 18000 * (m_w & 65535) + (m_w >> 16); - return (m_z << 16) + m_w; - } -}; - -#endif diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 76c3f0ddd16..85bb3c2702c 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -398,6 +398,7 @@ OPTION(cephx_sign_messages, OPT_BOOL, true) // Default to signing session messa OPTION(auth_mon_ticket_ttl, OPT_DOUBLE, 60*60*12) OPTION(auth_service_ticket_ttl, OPT_DOUBLE, 60*60) OPTION(auth_debug, OPT_BOOL, false) // if true, assert when weird things happen +OPTION(mon_client_hunt_parallel, OPT_U32, 2) // how many mons to try to connect to in parallel during hunt OPTION(mon_client_hunt_interval, OPT_DOUBLE, 3.0) // try new mon every N seconds until we connect OPTION(mon_client_ping_interval, OPT_DOUBLE, 10.0) // ping every N seconds OPTION(mon_client_ping_timeout, OPT_DOUBLE, 30.0) // fail if we don't hear back diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 2045516eafe..fc44ea9a40f 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -60,7 +60,7 @@ bool librados::RadosClient::ms_get_authorizer(int dest_type, /* monitor authorization is being handled on different layer */ if (dest_type == CEPH_ENTITY_TYPE_MON) return true; - *authorizer = monclient.auth->build_authorizer(dest_type); + *authorizer = monclient.build_authorizer(dest_type); return *authorizer != NULL; } diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index 58f59f60a63..cc7e12b8b16 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -1185,7 +1185,7 @@ bool MDSDaemon::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bo return false; } - *authorizer = monc->auth->build_authorizer(dest_type); + *authorizer = monc->build_authorizer(dest_type); return *authorizer != NULL; } diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index e3f327c0ecc..798b01ec140 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -117,7 +117,7 @@ bool DaemonServer::ms_get_authorizer(int dest_type, return false; } - *authorizer = monc->auth->build_authorizer(dest_type); + *authorizer = monc->build_authorizer(dest_type); dout(20) << "got authorizer " << *authorizer << dendl; return *authorizer != NULL; } diff --git a/src/mgr/MgrStandby.cc b/src/mgr/MgrStandby.cc index 7a87e60886b..92cb8013a7f 100644 --- a/src/mgr/MgrStandby.cc +++ b/src/mgr/MgrStandby.cc @@ -211,7 +211,7 @@ bool MgrStandby::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, return false; } - *authorizer = monc->auth->build_authorizer(dest_type); + *authorizer = monc->build_authorizer(dest_type); return *authorizer != NULL; } diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index a00fda322f2..1107e33eaed 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -12,6 +12,8 @@ * */ +#include + #include "messages/MMonGetMap.h" #include "messages/MMonGetVersion.h" #include "messages/MMonGetVersionReply.h" @@ -39,24 +41,18 @@ #define dout_subsys ceph_subsys_monc #undef dout_prefix -#define dout_prefix *_dout << "monclient" << (hunting ? "(hunting)":"") << ": " +#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": " MonClient::MonClient(CephContext *cct_) : Dispatcher(cct_), - state(MC_STATE_NONE), messenger(NULL), - cur_con(NULL), - rng(getpid()), monc_lock("MonClient::monc_lock"), timer(cct_, monc_lock), finisher(cct_), initialized(false), no_keyring_disabled_cephx(false), log_client(NULL), more_log_pending(false), - hunting(true), want_monmap(true), - want_keys(0), global_id(0), - authenticate_err(0), had_a_connection(false), reopen_interval_multiplier(1.0), last_mon_command_tid(0), @@ -80,7 +76,7 @@ int MonClient::get_monmap() Mutex::Locker l(monc_lock); _sub_want("monmap", 0, 0); - if (cur_mon.empty()) + if (!_opened()) _reopen_session(); while (want_monmap) @@ -111,14 +107,17 @@ int MonClient::get_monmap_privately() ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl; + std::random_device rd; + std::mt19937 rng(rd()); + assert(monmap.size() > 0); + std::uniform_int_distribution ranks(0, monmap.size() - 1); while (monmap.fsid.is_zero()) { - cur_mon = _pick_random_mon(); - cur_con = messenger->get_connection(monmap.get_inst(cur_mon)); - if (cur_con) { - ldout(cct, 10) << "querying mon." << cur_mon << " " - << cur_con->get_peer_addr() << dendl; - cur_con->send_message(new MMonGetMap); - } + auto rank = ranks(rng); + auto& pending_con = _add_conn(rank); + auto con = pending_con.get_con(); + ldout(cct, 10) << "querying mon." << monmap.get_name(rank) << " " + << con->get_peer_addr() << dendl; + con->send_message(new MMonGetMap); if (--attempt == 0) break; @@ -127,17 +126,13 @@ int MonClient::get_monmap_privately() interval.set_from_double(cct->_conf->mon_client_hunt_interval); map_cond.WaitInterval(monc_lock, interval); - if (monmap.fsid.is_zero() && cur_con) { - cur_con->mark_down(); // nope, clean that connection up + if (monmap.fsid.is_zero() && con) { + con->mark_down(); // nope, clean that connection up } } if (temp_msgr) { - if (cur_con) { - cur_con->mark_down(); - cur_con.reset(NULL); - cur_mon.clear(); - } + pending_cons.clear(); monc_lock.Unlock(); messenger->shutdown(); if (smessenger) @@ -147,9 +142,7 @@ int MonClient::get_monmap_privately() monc_lock.Lock(); } - hunting = true; // reset this to true! - cur_mon.clear(); - cur_con.reset(NULL); + pending_cons.clear(); if (!monmap.fsid.is_zero()) return 0; @@ -254,8 +247,15 @@ bool MonClient::ms_dispatch(Message *m) Mutex::Locker lock(monc_lock); - // ignore any messages outside our current session - if (m->get_connection() != cur_con) { + if (_hunting()) { + if (!pending_cons.count(m->get_source_addr())) { + // ignore any messages outside hunting sessions + ldout(cct, 10) << "discarding stray monitor message " << *m << dendl; + m->put(); + return true; + } + } else if (!active_con || active_con->get_con() != m->get_connection()) { + // ignore any messages outside our session(s) ldout(cct, 10) << "discarding stray monitor message " << *m << dendl; m->put(); return true; @@ -311,10 +311,12 @@ void MonClient::flush_log() void MonClient::handle_monmap(MMonMap *m) { ldout(cct, 10) << __func__ << " " << *m << dendl; + auto peer = m->get_source_addr(); + string cur_mon = monmap.get_name(peer); + bufferlist::iterator p = m->monmapbl.begin(); ::decode(monmap, p); - assert(!cur_mon.empty()); ldout(cct, 10) << " got monmap " << monmap.epoch << ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon) << dendl; @@ -324,9 +326,10 @@ void MonClient::handle_monmap(MMonMap *m) _sub_got("monmap", monmap.get_epoch()); - if (!monmap.get_addr_name(cur_con->get_peer_addr(), cur_mon)) { + if (!monmap.get_addr_name(peer, cur_mon)) { ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl; - _reopen_session(); // can't find the mon we were talking to (above) + // can't find the mon we were talking to (above) + _reopen_session(); } map_cond.Signal(); @@ -409,10 +412,9 @@ void MonClient::shutdown() waiting_for_session.pop_front(); } - if (cur_con) - cur_con->mark_down(); - cur_con.reset(NULL); - cur_mon.clear(); + active_con.reset(); + pending_cons.clear(); + auth.reset(); monc_lock.Unlock(); @@ -430,20 +432,20 @@ int MonClient::authenticate(double timeout) { Mutex::Locker lock(monc_lock); - if (state == MC_STATE_HAVE_SESSION) { + if (active_con) { ldout(cct, 5) << "already authenticated" << dendl; return 0; } _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0); - if (cur_mon.empty()) + if (!_opened()) _reopen_session(); utime_t until = ceph_clock_now(); until += timeout; if (timeout > 0.0) ldout(cct, 10) << "authenticate will time out at " << until << dendl; - while (state != MC_STATE_HAVE_SESSION && !authenticate_err) { + while (!active_con && !authenticate_err) { if (timeout > 0.0) { int r = auth_cond.WaitUntil(monc_lock, until); if (r == ETIMEDOUT) { @@ -455,8 +457,9 @@ int MonClient::authenticate(double timeout) } } - if (state == MC_STATE_HAVE_SESSION) { - ldout(cct, 5) << __func__ << " success, global_id " << global_id << dendl; + if (active_con) { + ldout(cct, 5) << __func__ << " success, global_id " + << active_con->get_global_id() << dendl; } if (authenticate_err < 0 && no_keyring_disabled_cephx) { @@ -468,104 +471,94 @@ int MonClient::authenticate(double timeout) void MonClient::handle_auth(MAuthReply *m) { - Context *cb = NULL; - bufferlist::iterator p = m->result_bl.begin(); - if (state == MC_STATE_NEGOTIATING) { - if (!auth || (int)m->protocol != auth->get_protocol()) { - auth.reset(get_auth_client_handler(cct, m->protocol, - rotating_secrets.get())); - if (!auth) { - ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl; - if (m->result == -ENOTSUP) { - ldout(cct, 10) << "none of our auth protocols are supported by the server" - << dendl; - authenticate_err = m->result; - auth_cond.SignalAll(); - } - m->put(); - return; - } - // do not request MGR key unless the mon has the SERVER_KRAKEN - // feature. otherwise it will give us an auth error. note that - // we have to use the FEATUREMASK because pre-jewel the kraken - // feature bit was used for something else. - if ((want_keys & CEPH_ENTITY_TYPE_MGR) && - !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) { - ldout(cct, 1) << __func__ - << " not requesting MGR keys from pre-kraken monitor" - << dendl; - want_keys &= ~CEPH_ENTITY_TYPE_MGR; - } - auth->set_want_keys(want_keys); - auth->init(entity_name); - auth->set_global_id(global_id); - } else { - auth->reset(); + assert(monc_lock.is_locked()); + if (!_hunting()) { + std::swap(active_con->get_auth(), auth); + int ret = active_con->authenticate(m); + m->put(); + std::swap(auth, active_con->get_auth()); + if (ret != -EAGAIN) { + _finish_auth(ret); } - state = MC_STATE_AUTHENTICATING; - } - assert(auth); - if (m->global_id && m->global_id != global_id) { - global_id = m->global_id; - auth->set_global_id(global_id); - ldout(cct, 10) << "my global_id is " << m->global_id << dendl; + return; } - int ret = auth->handle_response(m->result, p); + // hunting + auto found = pending_cons.find(m->get_source_addr()); + assert(found != pending_cons.end()); + int auth_err = found->second.handle_auth(m, entity_name, want_keys, + rotating_secrets.get()); m->put(); - - if (ret == -EAGAIN) { - MAuth *ma = new MAuth; - ma->protocol = auth->get_protocol(); - auth->prepare_build_request(); - ret = auth->build_request(ma->auth_payload); - _send_mon_message(ma, true); + if (auth_err == -EAGAIN) { return; } + if (auth_err) { + pending_cons.erase(found); + if (!pending_cons.empty()) { + // keep trying with pending connections + return; + } + // the last try just failed, give up. + } else { + auto& mc = found->second; + assert(mc.have_session()); + active_con.reset(new MonConnection(std::move(mc))); + pending_cons.clear(); + } _finish_hunting(); - authenticate_err = ret; - if (ret == 0) { - if (state != MC_STATE_HAVE_SESSION) { - state = MC_STATE_HAVE_SESSION; - last_rotating_renew_sent = utime_t(); - while (!waiting_for_session.empty()) { - _send_mon_message(waiting_for_session.front()); - waiting_for_session.pop_front(); - } + if (!auth_err) { + last_rotating_renew_sent = utime_t(); + while (!waiting_for_session.empty()) { + _send_mon_message(waiting_for_session.front()); + waiting_for_session.pop_front(); + } - _resend_mon_commands(); + _resend_mon_commands(); - if (log_client) { - log_client->reset_session(); - send_log(); - } - if (session_established_context) { - cb = session_established_context.release(); - } + if (log_client) { + log_client->reset_session(); + send_log(); } - - _check_auth_tickets(); + if (active_con) + std::swap(auth, active_con->get_auth()); } - auth_cond.SignalAll(); - if (cb) { - monc_lock.Unlock(); - cb->complete(0); - monc_lock.Lock(); + _finish_auth(auth_err); + if (!auth_err) { + Context *cb = nullptr; + if (session_established_context) { + cb = session_established_context.release(); + } + if (cb) { + monc_lock.Unlock(); + cb->complete(0); + monc_lock.Lock(); + } } } +void MonClient::_finish_auth(int auth_err) +{ + authenticate_err = auth_err; + // _resend_mon_commands() could _reopen_session() if the connected mon is not + // the one the MonCommand is targeting. + if (!auth_err && active_con) { + assert(auth); + _check_auth_tickets(); + } + auth_cond.SignalAll(); +} // --------- -void MonClient::_send_mon_message(Message *m, bool force) +void MonClient::_send_mon_message(Message *m) { assert(monc_lock.is_locked()); - assert(!cur_mon.empty()); - if (force || state == MC_STATE_HAVE_SESSION) { - assert(cur_con); - ldout(cct, 10) << "_send_mon_message to mon." << cur_mon + if (active_con) { + auto cur_con = active_con->get_con(); + ldout(cct, 10) << "_send_mon_message to mon." + << monmap.get_name(cur_con->get_peer_addr()) << " at " << cur_con->get_peer_addr() << dendl; cur_con->send_message(m); } else { @@ -573,49 +566,24 @@ void MonClient::_send_mon_message(Message *m, bool force) } } -string MonClient::_pick_random_mon() -{ - assert(monmap.size() > 0); - if (monmap.size() == 1) { - return monmap.get_name(0); - } else { - int max = monmap.size(); - int o = -1; - if (!cur_mon.empty()) { - o = monmap.get_rank(cur_mon); - if (o >= 0) - max--; - } - - int32_t n = rng() % max; - if (o >= 0 && n >= o) - n++; - return monmap.get_name(n); - } -} - void MonClient::_reopen_session(int rank, string name) { assert(monc_lock.is_locked()); ldout(cct, 10) << __func__ << " rank " << rank << " name " << name << dendl; - if (rank < 0 && name.length() == 0) { - cur_mon = _pick_random_mon(); - } else if (name.length()) { - cur_mon = name; + active_con.reset(); + pending_cons.clear(); + + _start_hunting(); + + if (name.length()) { + _add_conn(monmap.get_rank(name)); + } else if (rank >= 0) { + _add_conn(rank); } else { - cur_mon = monmap.get_name(rank); + _add_conns(); } - if (cur_con) { - cur_con->mark_down(); - } - cur_con = messenger->get_connection(monmap.get_inst(cur_mon)); - - ldout(cct, 10) << "picked mon." << cur_mon << " con " << cur_con - << " addr " << cur_con->get_peer_addr() - << dendl; - // throw out old queued messages while (!waiting_for_session.empty()) { waiting_for_session.front()->put(); @@ -629,34 +597,10 @@ void MonClient::_reopen_session(int rank, string name) version_requests.erase(version_requests.begin()); } - // adjust timeouts if necessary - if (had_a_connection) { - reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff; - if (reopen_interval_multiplier > - cct->_conf->mon_client_hunt_interval_max_multiple) - reopen_interval_multiplier = - cct->_conf->mon_client_hunt_interval_max_multiple; + for (auto& c : pending_cons) { + c.second.start(monmap.get_epoch(), entity_name, *auth_supported); } - // restart authentication handshake - state = MC_STATE_NEGOTIATING; - hunting = true; - - // send an initial keepalive to ensure our timestamp is valid by the - // time we are in an OPENED state (by sequencing this before - // authentication). - cur_con->send_keepalive(); - - MAuth *m = new MAuth; - m->protocol = 0; - m->monmap_epoch = monmap.get_epoch(); - __u8 struct_v = 1; - ::encode(struct_v, m->auth_payload); - ::encode(auth_supported->get_supported_set(), m->auth_payload); - ::encode(entity_name, m->auth_payload); - ::encode(global_id, m->auth_payload); - _send_mon_message(m, true); - for (map::iterator p = sub_sent.begin(); p != sub_sent.end(); ++p) { @@ -667,38 +611,106 @@ void MonClient::_reopen_session(int rank, string name) _renew_subs(); } +MonConnection& MonClient::_add_conn(unsigned rank) +{ + auto peer = monmap.get_addr(rank); + auto conn = messenger->get_connection(monmap.get_inst(rank)); + MonConnection mc(cct, conn); + auto inserted = pending_cons.insert(move(make_pair(peer, move(mc)))); + ldout(cct, 10) << "picked mon." << monmap.get_name(rank) + << " con " << conn + << " addr " << conn->get_peer_addr() + << dendl; + return inserted.first->second; +} + +void MonClient::_add_conns() +{ + unsigned n = cct->_conf->mon_client_hunt_parallel; + if (n == 0 || n > monmap.size()) { + n = monmap.size(); + } + vector ranks(n); + for (unsigned i = 0; i < n; i++) { + ranks[i] = i; + } + std::random_device rd; + std::mt19937 rng(rd()); + std::shuffle(ranks.begin(), ranks.end(), rng); + for (unsigned i = 0; i < n; i++) { + _add_conn(ranks[i]); + } +} bool MonClient::ms_handle_reset(Connection *con) { Mutex::Locker lock(monc_lock); - if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) { - if (cur_mon.empty() || con != cur_con) { - ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl; - return true; + if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) + return false; + + if (_hunting()) { + if (pending_cons.count(con->get_peer_addr())) { + ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addr() << dendl; } else { + ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl; + } + return true; + } else { + if (active_con && con == active_con->get_con()) { ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addr() << dendl; - if (hunting) - return true; - - ldout(cct, 0) << "hunting for new mon" << dendl; _reopen_session(); + return false; + } else { + ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl; + return true; } } - return false; +} + +bool MonClient::_opened() const +{ + assert(monc_lock.is_locked()); + return active_con || _hunting(); +} + +bool MonClient::_hunting() const +{ + return !pending_cons.empty(); +} + +void MonClient::_start_hunting() +{ + assert(!_hunting()); + // adjust timeouts if necessary + if (!had_a_connection) + return; + reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff; + if (reopen_interval_multiplier > + cct->_conf->mon_client_hunt_interval_max_multiple) { + reopen_interval_multiplier = + cct->_conf->mon_client_hunt_interval_max_multiple; + } } void MonClient::_finish_hunting() { assert(monc_lock.is_locked()); - if (hunting) { - ldout(cct, 1) << "found mon." << cur_mon << dendl; - hunting = false; - had_a_connection = true; - reopen_interval_multiplier /= 2.0; - if (reopen_interval_multiplier < 1.0) - reopen_interval_multiplier = 1.0; + // the pending conns have been cleaned. + assert(!_hunting()); + if (active_con) { + auto con = active_con->get_con(); + ldout(cct, 1) << "found mon." + << monmap.get_name(con->get_peer_addr()) + << dendl; + } else { + ldout(cct, 1) << "no mon sessions established" << dendl; } + + had_a_connection = true; + reopen_interval_multiplier /= 2.0; + if (reopen_interval_multiplier < 1.0) + reopen_interval_multiplier = 1.0; } void MonClient::tick() @@ -707,12 +719,13 @@ void MonClient::tick() _check_auth_tickets(); - if (hunting) { + if (_hunting()) { ldout(cct, 1) << "continuing hunt" << dendl; _reopen_session(); - } else if (!cur_mon.empty()) { + } else if (active_con) { // just renew as needed utime_t now = ceph_clock_now(); + auto cur_con = active_con->get_con(); if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) { ldout(cct, 10) << "renew subs? (now: " << now << "; renew after: " << sub_renew_after << ") -- " @@ -724,16 +737,14 @@ void MonClient::tick() cur_con->send_keepalive(); - if (state == MC_STATE_HAVE_SESSION) { - if (cct->_conf->mon_client_ping_timeout > 0 && - cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { - utime_t lk = cur_con->get_last_keepalive_ack(); - utime_t interval = now - lk; - if (interval > cct->_conf->mon_client_ping_timeout) { - ldout(cct, 1) << "no keepalive since " << lk << " (" << interval - << " seconds), reconnecting" << dendl; - _reopen_session(); - } + if (cct->_conf->mon_client_ping_timeout > 0 && + cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { + utime_t lk = cur_con->get_last_keepalive_ack(); + utime_t interval = now - lk; + if (interval > cct->_conf->mon_client_ping_timeout) { + ldout(cct, 1) << "no keepalive since " << lk << " (" << interval + << " seconds), reconnecting" << dendl; + _reopen_session(); } send_log(); @@ -753,10 +764,11 @@ void MonClient::schedule_tick() } }; - if (hunting) + if (_hunting()) { timer.add_event_after(cct->_conf->mon_client_hunt_interval - * reopen_interval_multiplier, new C_Tick(this)); - else + * reopen_interval_multiplier, + new C_Tick(this)); + } else timer.add_event_after(cct->_conf->mon_client_ping_interval, new C_Tick(this)); } @@ -771,7 +783,7 @@ void MonClient::_renew_subs() } ldout(cct, 10) << __func__ << dendl; - if (cur_mon.empty()) + if (!_opened()) _reopen_session(); else { if (sub_renew_sent == utime_t()) @@ -807,7 +819,7 @@ void MonClient::handle_subscribe_ack(MMonSubscribeAck *m) int MonClient::_check_auth_tickets() { assert(monc_lock.is_locked()); - if (state == MC_STATE_HAVE_SESSION && auth) { + if (active_con && auth) { if (auth->need_tickets()) { ldout(cct, 10) << __func__ << " getting new tickets!" << dendl; MAuth *m = new MAuth; @@ -831,7 +843,7 @@ int MonClient::_check_auth_rotating() return 0; } - if (!auth || state != MC_STATE_HAVE_SESSION) { + if (!active_con || !auth) { ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl; return 0; } @@ -902,8 +914,13 @@ int MonClient::wait_auth_rotating(double timeout) void MonClient::_send_command(MonCommand *r) { + entity_addr_t peer; + if (active_con) { + peer = active_con->get_con()->get_peer_addr(); + } + if (r->target_rank >= 0 && - r->target_rank != monmap.get_rank(cur_mon)) { + r->target_rank != monmap.get_rank(peer)) { ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << " wants rank " << r->target_rank << ", reopening session" @@ -918,7 +935,7 @@ void MonClient::_send_command(MonCommand *r) } if (r->target_name.length() && - r->target_name != cur_mon) { + r->target_name != monmap.get_name(peer)) { ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << " wants mon " << r->target_name << ", reopening session" @@ -1111,3 +1128,138 @@ void MonClient::handle_get_version_reply(MMonGetVersionReply* m) } m->put(); } + +AuthAuthorizer* MonClient::build_authorizer(int service_id) const { + Mutex::Locker l(monc_lock); + assert(auth || active_con->get_auth()); + if (auth) + return auth->build_authorizer(service_id); + else + return active_con->get_auth()->build_authorizer(service_id); +} + +#define dout_subsys ceph_subsys_monc +#undef dout_prefix +#define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ") + +MonConnection::MonConnection(CephContext *cct, ConnectionRef con) + : cct(cct), con(con) +{} + +MonConnection::~MonConnection() +{ + if (con) { + con->mark_down(); + con.reset(); + } +} + +bool MonConnection::have_session() const +{ + return state == State::HAVE_SESSION; +} + +void MonConnection::start(epoch_t epoch, + const EntityName& entity_name, + const AuthMethodList& auth_supported) +{ + // restart authentication handshake + state = State::NEGOTIATING; + + // send an initial keepalive to ensure our timestamp is valid by the + // time we are in an OPENED state (by sequencing this before + // authentication). + con->send_keepalive(); + + auto m = new MAuth; + m->protocol = 0; + m->monmap_epoch = epoch; + __u8 struct_v = 1; + ::encode(struct_v, m->auth_payload); + ::encode(auth_supported.get_supported_set(), m->auth_payload); + ::encode(entity_name, m->auth_payload); + ::encode(global_id, m->auth_payload); + con->send_message(m); +} + +int MonConnection::handle_auth(MAuthReply* m, + const EntityName& entity_name, + uint32_t want_keys, + RotatingKeyRing* keyring) +{ + if (state == State::NEGOTIATING) { + int r = _negotiate(m, entity_name, want_keys, keyring); + if (r) { + return r; + } + state = State::AUTHENTICATING; + } + int r = authenticate(m); + if (!r) { + state = State::HAVE_SESSION; + } + return r; +} + +int MonConnection::_negotiate(MAuthReply *m, + const EntityName& entity_name, + uint32_t want_keys, + RotatingKeyRing* keyring) +{ + if (auth && (int)m->protocol == auth->get_protocol()) { + // good, negotiation completed + auth->reset(); + return 0; + } + + auth.reset(get_auth_client_handler(cct, m->protocol, keyring)); + if (!auth) { + ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl; + if (m->result == -ENOTSUP) { + ldout(cct, 10) << "none of our auth protocols are supported by the server" + << dendl; + } + return m->result; + } + + // do not request MGR key unless the mon has the SERVER_KRAKEN + // feature. otherwise it will give us an auth error. note that + // we have to use the FEATUREMASK because pre-jewel the kraken + // feature bit was used for something else. + if ((want_keys & CEPH_ENTITY_TYPE_MGR) && + !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) { + ldout(cct, 1) << __func__ + << " not requesting MGR keys from pre-kraken monitor" + << dendl; + want_keys &= ~CEPH_ENTITY_TYPE_MGR; + } + auth->set_want_keys(want_keys); + auth->init(entity_name); + auth->set_global_id(global_id); + return 0; +} + +int MonConnection::authenticate(MAuthReply *m) +{ + assert(auth); + if (!m->global_id) { + ldout(cct, 1) << "peer sent an invalid global_id" << dendl; + } + if (m->global_id != global_id) { + // it's a new session + auth->reset(); + global_id = m->global_id; + auth->set_global_id(global_id); + ldout(cct, 10) << "my global_id is " << m->global_id << dendl; + } + auto p = m->result_bl.begin(); + int ret = auth->handle_response(m->result, p); + if (ret == -EAGAIN) { + auto ma = new MAuth; + ma->protocol = auth->get_protocol(); + auth->prepare_build_request(); + auth->build_request(ma->auth_payload); + con->send_message(ma); + } + return ret; +} diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index 7e1d9da9b82..6081779721b 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -15,6 +15,8 @@ #ifndef CEPH_MONCLIENT_H #define CEPH_MONCLIENT_H +#include + #include "msg/Messenger.h" #include "MonMap.h" @@ -24,28 +26,20 @@ #include "common/config.h" #include "auth/AuthClientHandler.h" #include "auth/RotatingKeyRing.h" -#include "common/SimpleRNG.h" class MMonMap; -class MMonGetVersion; class MMonGetVersionReply; struct MMonSubscribeAck; class MMonCommandAck; -class MCommandReply; struct MAuthReply; -class MPing; +class MAuthRotating; class LogClient; +struct AuthAuthorizer; class AuthMethodList; -class Messenger; +class AuthClientHandler; // class RotatingKeyRing; class KeyRing; -enum MonClientState { - MC_STATE_NONE, - MC_STATE_NEGOTIATING, - MC_STATE_AUTHENTICATING, - MC_STATE_HAVE_SESSION, -}; struct MonClientPinger : public Dispatcher { @@ -102,18 +96,63 @@ struct MonClientPinger : public Dispatcher { } }; +class MonConnection { +public: + MonConnection(CephContext *cct, + ConnectionRef conn); + ~MonConnection(); + MonConnection(MonConnection&& rhs) = default; + MonConnection& operator=(MonConnection&&) = default; + MonConnection(const MonConnection& rhs) = delete; + MonConnection& operator=(const MonConnection&) = delete; + int handle_auth(MAuthReply *m, + const EntityName& entity_name, + uint32_t want_keys, + RotatingKeyRing* keyring); + int authenticate(MAuthReply *m); + void start(epoch_t epoch, + const EntityName& entity_name, + const AuthMethodList& auth_supported); + bool have_session() const; + uint64_t get_global_id() const { + return global_id; + } + ConnectionRef get_con() { + return con; + } + std::unique_ptr& get_auth() { + return auth; + } + +private: + int _negotiate(MAuthReply *m, + const EntityName& entity_name, + uint32_t want_keys, + RotatingKeyRing* keyring); + +private: + CephContext *cct; + enum class State { + NONE, + NEGOTIATING, + AUTHENTICATING, + HAVE_SESSION, + }; + State state = State::NONE; + ConnectionRef con; + + std::unique_ptr auth; + uint64_t global_id = 0; +}; + class MonClient : public Dispatcher { public: MonMap monmap; private: - MonClientState state; - Messenger *messenger; - string cur_mon; - ConnectionRef cur_con; - - SimpleRNG rng; + std::unique_ptr active_con; + std::map pending_cons; EntityName entity_name; @@ -143,24 +182,18 @@ private: void handle_auth(MAuthReply *m); // monitor session - bool hunting; - void tick(); void schedule_tick(); - Cond auth_cond; - // monclient bool want_monmap; - - uint32_t want_keys; - - uint64_t global_id; - - // authenticate -private: Cond map_cond; - int authenticate_err; +private: + // authenticate + std::unique_ptr auth; + uint32_t want_keys = 0; + Cond auth_cond; + int authenticate_err = 0; list waiting_for_session; utime_t last_rotating_renew_sent; @@ -168,13 +201,18 @@ private: bool had_a_connection; double reopen_interval_multiplier; - string _pick_random_mon(); + bool _opened() const; + bool _hunting() const; + void _start_hunting(); void _finish_hunting(); + void _finish_auth(int auth_err); void _reopen_session(int rank, string name); void _reopen_session() { _reopen_session(-1, string()); } - void _send_mon_message(Message *m, bool force=false); + MonConnection& _add_conn(unsigned rank); + void _add_conns(); + void _send_mon_message(Message *m); public: void set_entity_name(EntityName name) { entity_name = name; } @@ -241,9 +279,6 @@ private: sub_new.erase(what); } - // auth tickets -public: - std::unique_ptr auth; public: void renew_subs() { Mutex::Locker l(monc_lock); @@ -362,20 +397,20 @@ public: } uint64_t get_global_id() const { - return global_id; + Mutex::Locker l(monc_lock); + if (active_con) { + return active_con->get_global_id(); + } else { + return 0; + } } void set_messenger(Messenger *m) { messenger = m; } entity_addr_t get_myaddr() const { return messenger->get_myaddr(); } - - void send_auth_message(Message *m) { - _send_mon_message(m, true); - } + AuthAuthorizer* build_authorizer(int service_id) const; void set_want_keys(uint32_t want) { want_keys = want; - if (auth) - auth->set_want_keys(want | CEPH_ENTITY_TYPE_MON); } // admin commands @@ -404,6 +439,7 @@ private: void _resend_mon_commands(); int _cancel_mon_command(uint64_t tid, int r); void _finish_command(MonCommand *r, int ret, string rs); + void _finish_auth(); void handle_mon_command_ack(MMonCommandAck *ack); public: diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index c101a2b8d1c..cc0329a811a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -6206,7 +6206,7 @@ bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool for } } - *authorizer = monc->auth->build_authorizer(dest_type); + *authorizer = monc->build_authorizer(dest_type); return *authorizer != NULL; } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index cdeff103834..fb8fcc86221 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -4420,7 +4420,7 @@ bool Objecter::ms_get_authorizer(int dest_type, return false; if (dest_type == CEPH_ENTITY_TYPE_MON) return true; - *authorizer = monc->auth->build_authorizer(dest_type); + *authorizer = monc->build_authorizer(dest_type); return *authorizer != NULL; } diff --git a/src/tools/cephfs/MDSUtility.cc b/src/tools/cephfs/MDSUtility.cc index 6ee89e8c86b..1af0b788bfc 100644 --- a/src/tools/cephfs/MDSUtility.cc +++ b/src/tools/cephfs/MDSUtility.cc @@ -165,6 +165,6 @@ bool MDSUtility::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, return false; } - *authorizer = monc->auth->build_authorizer(dest_type); + *authorizer = monc->build_authorizer(dest_type); return *authorizer != NULL; }