*
*/
+#include <random>
+
#include "messages/MMonGetMap.h"
#include "messages/MMonGetVersion.h"
#include "messages/MMonGetVersionReply.h"
#define dout_subsys ceph_subsys_monc
#undef dout_prefix
-#define dout_prefix *_dout << "monclient" << (hunting ? "(hunting)":"") << ": "
+#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
MonClient::MonClient(CephContext *cct_) :
Dispatcher(cct_),
- state(MC_STATE_NONE),
messenger(NULL),
- cur_con(NULL),
- rng(getpid()),
monc_lock("MonClient::monc_lock"),
timer(cct_, monc_lock), finisher(cct_),
initialized(false),
no_keyring_disabled_cephx(false),
log_client(NULL),
more_log_pending(false),
- hunting(true),
want_monmap(true),
- want_keys(0), global_id(0),
- authenticate_err(0),
had_a_connection(false),
reopen_interval_multiplier(1.0),
last_mon_command_tid(0),
Mutex::Locker l(monc_lock);
_sub_want("monmap", 0, 0);
- if (cur_mon.empty())
+ if (!_opened())
_reopen_session();
while (want_monmap)
ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl;
+ std::random_device rd;
+ std::mt19937 rng(rd());
+ assert(monmap.size() > 0);
+ std::uniform_int_distribution<unsigned> ranks(0, monmap.size() - 1);
while (monmap.fsid.is_zero()) {
- cur_mon = _pick_random_mon();
- cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
- if (cur_con) {
- ldout(cct, 10) << "querying mon." << cur_mon << " "
- << cur_con->get_peer_addr() << dendl;
- cur_con->send_message(new MMonGetMap);
- }
+ auto rank = ranks(rng);
+ auto& pending_con = _add_conn(rank);
+ auto con = pending_con.get_con();
+ ldout(cct, 10) << "querying mon." << monmap.get_name(rank) << " "
+ << con->get_peer_addr() << dendl;
+ con->send_message(new MMonGetMap);
if (--attempt == 0)
break;
interval.set_from_double(cct->_conf->mon_client_hunt_interval);
map_cond.WaitInterval(monc_lock, interval);
- if (monmap.fsid.is_zero() && cur_con) {
- cur_con->mark_down(); // nope, clean that connection up
+ if (monmap.fsid.is_zero() && con) {
+ con->mark_down(); // nope, clean that connection up
}
}
if (temp_msgr) {
- if (cur_con) {
- cur_con->mark_down();
- cur_con.reset(NULL);
- cur_mon.clear();
- }
+ pending_cons.clear();
monc_lock.Unlock();
messenger->shutdown();
if (smessenger)
monc_lock.Lock();
}
- hunting = true; // reset this to true!
- cur_mon.clear();
- cur_con.reset(NULL);
+ pending_cons.clear();
if (!monmap.fsid.is_zero())
return 0;
Mutex::Locker lock(monc_lock);
- // ignore any messages outside our current session
- if (m->get_connection() != cur_con) {
+ if (_hunting()) {
+ if (!pending_cons.count(m->get_source_addr())) {
+ // ignore any messages outside hunting sessions
+ ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
+ m->put();
+ return true;
+ }
+ } else if (!active_con || active_con->get_con() != m->get_connection()) {
+ // ignore any messages outside our session(s)
ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
m->put();
return true;
void MonClient::handle_monmap(MMonMap *m)
{
ldout(cct, 10) << __func__ << " " << *m << dendl;
+ auto peer = m->get_source_addr();
+ string cur_mon = monmap.get_name(peer);
+
bufferlist::iterator p = m->monmapbl.begin();
::decode(monmap, p);
- assert(!cur_mon.empty());
ldout(cct, 10) << " got monmap " << monmap.epoch
<< ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon)
<< dendl;
_sub_got("monmap", monmap.get_epoch());
- if (!monmap.get_addr_name(cur_con->get_peer_addr(), cur_mon)) {
+ if (!monmap.get_addr_name(peer, cur_mon)) {
ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl;
- _reopen_session(); // can't find the mon we were talking to (above)
+ // can't find the mon we were talking to (above)
+ _reopen_session();
}
map_cond.Signal();
waiting_for_session.pop_front();
}
- if (cur_con)
- cur_con->mark_down();
- cur_con.reset(NULL);
- cur_mon.clear();
+ active_con.reset();
+ pending_cons.clear();
+ auth.reset();
monc_lock.Unlock();
{
Mutex::Locker lock(monc_lock);
- if (state == MC_STATE_HAVE_SESSION) {
+ if (active_con) {
ldout(cct, 5) << "already authenticated" << dendl;
return 0;
}
_sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
- if (cur_mon.empty())
+ if (!_opened())
_reopen_session();
utime_t until = ceph_clock_now();
until += timeout;
if (timeout > 0.0)
ldout(cct, 10) << "authenticate will time out at " << until << dendl;
- while (state != MC_STATE_HAVE_SESSION && !authenticate_err) {
+ while (!active_con && !authenticate_err) {
if (timeout > 0.0) {
int r = auth_cond.WaitUntil(monc_lock, until);
if (r == ETIMEDOUT) {
}
}
- if (state == MC_STATE_HAVE_SESSION) {
- ldout(cct, 5) << __func__ << " success, global_id " << global_id << dendl;
+ if (active_con) {
+ ldout(cct, 5) << __func__ << " success, global_id "
+ << active_con->get_global_id() << dendl;
}
if (authenticate_err < 0 && no_keyring_disabled_cephx) {
void MonClient::handle_auth(MAuthReply *m)
{
- Context *cb = NULL;
- bufferlist::iterator p = m->result_bl.begin();
- if (state == MC_STATE_NEGOTIATING) {
- if (!auth || (int)m->protocol != auth->get_protocol()) {
- auth.reset(get_auth_client_handler(cct, m->protocol,
- rotating_secrets.get()));
- if (!auth) {
- ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl;
- if (m->result == -ENOTSUP) {
- ldout(cct, 10) << "none of our auth protocols are supported by the server"
- << dendl;
- authenticate_err = m->result;
- auth_cond.SignalAll();
- }
- m->put();
- return;
- }
- // do not request MGR key unless the mon has the SERVER_KRAKEN
- // feature. otherwise it will give us an auth error. note that
- // we have to use the FEATUREMASK because pre-jewel the kraken
- // feature bit was used for something else.
- if ((want_keys & CEPH_ENTITY_TYPE_MGR) &&
- !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
- ldout(cct, 1) << __func__
- << " not requesting MGR keys from pre-kraken monitor"
- << dendl;
- want_keys &= ~CEPH_ENTITY_TYPE_MGR;
- }
- auth->set_want_keys(want_keys);
- auth->init(entity_name);
- auth->set_global_id(global_id);
- } else {
- auth->reset();
+ assert(monc_lock.is_locked());
+ if (!_hunting()) {
+ std::swap(active_con->get_auth(), auth);
+ int ret = active_con->authenticate(m);
+ m->put();
+ std::swap(auth, active_con->get_auth());
+ if (ret != -EAGAIN) {
+ _finish_auth(ret);
}
- state = MC_STATE_AUTHENTICATING;
- }
- assert(auth);
- if (m->global_id && m->global_id != global_id) {
- global_id = m->global_id;
- auth->set_global_id(global_id);
- ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
+ return;
}
- int ret = auth->handle_response(m->result, p);
+ // hunting
+ auto found = pending_cons.find(m->get_source_addr());
+ assert(found != pending_cons.end());
+ int auth_err = found->second.handle_auth(m, entity_name, want_keys,
+ rotating_secrets.get());
m->put();
-
- if (ret == -EAGAIN) {
- MAuth *ma = new MAuth;
- ma->protocol = auth->get_protocol();
- auth->prepare_build_request();
- ret = auth->build_request(ma->auth_payload);
- _send_mon_message(ma, true);
+ if (auth_err == -EAGAIN) {
return;
}
+ if (auth_err) {
+ pending_cons.erase(found);
+ if (!pending_cons.empty()) {
+ // keep trying with pending connections
+ return;
+ }
+ // the last try just failed, give up.
+ } else {
+ auto& mc = found->second;
+ assert(mc.have_session());
+ active_con.reset(new MonConnection(std::move(mc)));
+ pending_cons.clear();
+ }
_finish_hunting();
- authenticate_err = ret;
- if (ret == 0) {
- if (state != MC_STATE_HAVE_SESSION) {
- state = MC_STATE_HAVE_SESSION;
- last_rotating_renew_sent = utime_t();
- while (!waiting_for_session.empty()) {
- _send_mon_message(waiting_for_session.front());
- waiting_for_session.pop_front();
- }
+ if (!auth_err) {
+ last_rotating_renew_sent = utime_t();
+ while (!waiting_for_session.empty()) {
+ _send_mon_message(waiting_for_session.front());
+ waiting_for_session.pop_front();
+ }
- _resend_mon_commands();
+ _resend_mon_commands();
- if (log_client) {
- log_client->reset_session();
- send_log();
- }
- if (session_established_context) {
- cb = session_established_context.release();
- }
+ if (log_client) {
+ log_client->reset_session();
+ send_log();
}
-
- _check_auth_tickets();
+ if (active_con)
+ std::swap(auth, active_con->get_auth());
}
- auth_cond.SignalAll();
- if (cb) {
- monc_lock.Unlock();
- cb->complete(0);
- monc_lock.Lock();
+ _finish_auth(auth_err);
+ if (!auth_err) {
+ Context *cb = nullptr;
+ if (session_established_context) {
+ cb = session_established_context.release();
+ }
+ if (cb) {
+ monc_lock.Unlock();
+ cb->complete(0);
+ monc_lock.Lock();
+ }
}
}
+void MonClient::_finish_auth(int auth_err)
+{
+ authenticate_err = auth_err;
+ // _resend_mon_commands() could _reopen_session() if the connected mon is not
+ // the one the MonCommand is targeting.
+ if (!auth_err && active_con) {
+ assert(auth);
+ _check_auth_tickets();
+ }
+ auth_cond.SignalAll();
+}
// ---------
-void MonClient::_send_mon_message(Message *m, bool force)
+void MonClient::_send_mon_message(Message *m)
{
assert(monc_lock.is_locked());
- assert(!cur_mon.empty());
- if (force || state == MC_STATE_HAVE_SESSION) {
- assert(cur_con);
- ldout(cct, 10) << "_send_mon_message to mon." << cur_mon
+ if (active_con) {
+ auto cur_con = active_con->get_con();
+ ldout(cct, 10) << "_send_mon_message to mon."
+ << monmap.get_name(cur_con->get_peer_addr())
<< " at " << cur_con->get_peer_addr() << dendl;
cur_con->send_message(m);
} else {
}
}
-string MonClient::_pick_random_mon()
-{
- assert(monmap.size() > 0);
- if (monmap.size() == 1) {
- return monmap.get_name(0);
- } else {
- int max = monmap.size();
- int o = -1;
- if (!cur_mon.empty()) {
- o = monmap.get_rank(cur_mon);
- if (o >= 0)
- max--;
- }
-
- int32_t n = rng() % max;
- if (o >= 0 && n >= o)
- n++;
- return monmap.get_name(n);
- }
-}
-
void MonClient::_reopen_session(int rank, string name)
{
assert(monc_lock.is_locked());
ldout(cct, 10) << __func__ << " rank " << rank << " name " << name << dendl;
- if (rank < 0 && name.length() == 0) {
- cur_mon = _pick_random_mon();
- } else if (name.length()) {
- cur_mon = name;
+ active_con.reset();
+ pending_cons.clear();
+
+ _start_hunting();
+
+ if (name.length()) {
+ _add_conn(monmap.get_rank(name));
+ } else if (rank >= 0) {
+ _add_conn(rank);
} else {
- cur_mon = monmap.get_name(rank);
+ _add_conns();
}
- if (cur_con) {
- cur_con->mark_down();
- }
- cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
-
- ldout(cct, 10) << "picked mon." << cur_mon << " con " << cur_con
- << " addr " << cur_con->get_peer_addr()
- << dendl;
-
// throw out old queued messages
while (!waiting_for_session.empty()) {
waiting_for_session.front()->put();
version_requests.erase(version_requests.begin());
}
- // adjust timeouts if necessary
- if (had_a_connection) {
- reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
- if (reopen_interval_multiplier >
- cct->_conf->mon_client_hunt_interval_max_multiple)
- reopen_interval_multiplier =
- cct->_conf->mon_client_hunt_interval_max_multiple;
+ for (auto& c : pending_cons) {
+ c.second.start(monmap.get_epoch(), entity_name, *auth_supported);
}
- // restart authentication handshake
- state = MC_STATE_NEGOTIATING;
- hunting = true;
-
- // send an initial keepalive to ensure our timestamp is valid by the
- // time we are in an OPENED state (by sequencing this before
- // authentication).
- cur_con->send_keepalive();
-
- MAuth *m = new MAuth;
- m->protocol = 0;
- m->monmap_epoch = monmap.get_epoch();
- __u8 struct_v = 1;
- ::encode(struct_v, m->auth_payload);
- ::encode(auth_supported->get_supported_set(), m->auth_payload);
- ::encode(entity_name, m->auth_payload);
- ::encode(global_id, m->auth_payload);
- _send_mon_message(m, true);
-
for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin();
p != sub_sent.end();
++p) {
_renew_subs();
}
+MonConnection& MonClient::_add_conn(unsigned rank)
+{
+ auto peer = monmap.get_addr(rank);
+ auto conn = messenger->get_connection(monmap.get_inst(rank));
+ MonConnection mc(cct, conn);
+ auto inserted = pending_cons.insert(move(make_pair(peer, move(mc))));
+ ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
+ << " con " << conn
+ << " addr " << conn->get_peer_addr()
+ << dendl;
+ return inserted.first->second;
+}
+
+void MonClient::_add_conns()
+{
+ unsigned n = cct->_conf->mon_client_hunt_parallel;
+ if (n == 0 || n > monmap.size()) {
+ n = monmap.size();
+ }
+ vector<unsigned> ranks(n);
+ for (unsigned i = 0; i < n; i++) {
+ ranks[i] = i;
+ }
+ std::random_device rd;
+ std::mt19937 rng(rd());
+ std::shuffle(ranks.begin(), ranks.end(), rng);
+ for (unsigned i = 0; i < n; i++) {
+ _add_conn(ranks[i]);
+ }
+}
bool MonClient::ms_handle_reset(Connection *con)
{
Mutex::Locker lock(monc_lock);
- if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
- if (cur_mon.empty() || con != cur_con) {
- ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl;
- return true;
+ if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
+ return false;
+
+ if (_hunting()) {
+ if (pending_cons.count(con->get_peer_addr())) {
+ ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addr() << dendl;
} else {
+ ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl;
+ }
+ return true;
+ } else {
+ if (active_con && con == active_con->get_con()) {
ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addr() << dendl;
- if (hunting)
- return true;
-
- ldout(cct, 0) << "hunting for new mon" << dendl;
_reopen_session();
+ return false;
+ } else {
+ ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl;
+ return true;
}
}
- return false;
+}
+
+bool MonClient::_opened() const
+{
+ assert(monc_lock.is_locked());
+ return active_con || _hunting();
+}
+
+bool MonClient::_hunting() const
+{
+ return !pending_cons.empty();
+}
+
+void MonClient::_start_hunting()
+{
+ assert(!_hunting());
+ // adjust timeouts if necessary
+ if (!had_a_connection)
+ return;
+ reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
+ if (reopen_interval_multiplier >
+ cct->_conf->mon_client_hunt_interval_max_multiple) {
+ reopen_interval_multiplier =
+ cct->_conf->mon_client_hunt_interval_max_multiple;
+ }
}
void MonClient::_finish_hunting()
{
assert(monc_lock.is_locked());
- if (hunting) {
- ldout(cct, 1) << "found mon." << cur_mon << dendl;
- hunting = false;
- had_a_connection = true;
- reopen_interval_multiplier /= 2.0;
- if (reopen_interval_multiplier < 1.0)
- reopen_interval_multiplier = 1.0;
+ // the pending conns have been cleaned.
+ assert(!_hunting());
+ if (active_con) {
+ auto con = active_con->get_con();
+ ldout(cct, 1) << "found mon."
+ << monmap.get_name(con->get_peer_addr())
+ << dendl;
+ } else {
+ ldout(cct, 1) << "no mon sessions established" << dendl;
}
+
+ had_a_connection = true;
+ reopen_interval_multiplier /= 2.0;
+ if (reopen_interval_multiplier < 1.0)
+ reopen_interval_multiplier = 1.0;
}
void MonClient::tick()
_check_auth_tickets();
- if (hunting) {
+ if (_hunting()) {
ldout(cct, 1) << "continuing hunt" << dendl;
_reopen_session();
- } else if (!cur_mon.empty()) {
+ } else if (active_con) {
// just renew as needed
utime_t now = ceph_clock_now();
+ auto cur_con = active_con->get_con();
if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
ldout(cct, 10) << "renew subs? (now: " << now
<< "; renew after: " << sub_renew_after << ") -- "
cur_con->send_keepalive();
- if (state == MC_STATE_HAVE_SESSION) {
- if (cct->_conf->mon_client_ping_timeout > 0 &&
- cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
- utime_t lk = cur_con->get_last_keepalive_ack();
- utime_t interval = now - lk;
- if (interval > cct->_conf->mon_client_ping_timeout) {
- ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
- << " seconds), reconnecting" << dendl;
- _reopen_session();
- }
+ if (cct->_conf->mon_client_ping_timeout > 0 &&
+ cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+ utime_t lk = cur_con->get_last_keepalive_ack();
+ utime_t interval = now - lk;
+ if (interval > cct->_conf->mon_client_ping_timeout) {
+ ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
+ << " seconds), reconnecting" << dendl;
+ _reopen_session();
}
send_log();
}
};
- if (hunting)
+ if (_hunting()) {
timer.add_event_after(cct->_conf->mon_client_hunt_interval
- * reopen_interval_multiplier, new C_Tick(this));
- else
+ * reopen_interval_multiplier,
+ new C_Tick(this));
+ } else
timer.add_event_after(cct->_conf->mon_client_ping_interval, new C_Tick(this));
}
}
ldout(cct, 10) << __func__ << dendl;
- if (cur_mon.empty())
+ if (!_opened())
_reopen_session();
else {
if (sub_renew_sent == utime_t())
int MonClient::_check_auth_tickets()
{
assert(monc_lock.is_locked());
- if (state == MC_STATE_HAVE_SESSION && auth) {
+ if (active_con && auth) {
if (auth->need_tickets()) {
ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
MAuth *m = new MAuth;
return 0;
}
- if (!auth || state != MC_STATE_HAVE_SESSION) {
+ if (!active_con || !auth) {
ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
return 0;
}
void MonClient::_send_command(MonCommand *r)
{
+ entity_addr_t peer;
+ if (active_con) {
+ peer = active_con->get_con()->get_peer_addr();
+ }
+
if (r->target_rank >= 0 &&
- r->target_rank != monmap.get_rank(cur_mon)) {
+ r->target_rank != monmap.get_rank(peer)) {
ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
<< " wants rank " << r->target_rank
<< ", reopening session"
}
if (r->target_name.length() &&
- r->target_name != cur_mon) {
+ r->target_name != monmap.get_name(peer)) {
ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
<< " wants mon " << r->target_name
<< ", reopening session"
}
m->put();
}
+
+AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
+ Mutex::Locker l(monc_lock);
+ assert(auth || active_con->get_auth());
+ if (auth)
+ return auth->build_authorizer(service_id);
+ else
+ return active_con->get_auth()->build_authorizer(service_id);
+}
+
+#define dout_subsys ceph_subsys_monc
+#undef dout_prefix
+#define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
+
+MonConnection::MonConnection(CephContext *cct, ConnectionRef con)
+ : cct(cct), con(con)
+{}
+
+MonConnection::~MonConnection()
+{
+ if (con) {
+ con->mark_down();
+ con.reset();
+ }
+}
+
+bool MonConnection::have_session() const
+{
+ return state == State::HAVE_SESSION;
+}
+
+void MonConnection::start(epoch_t epoch,
+ const EntityName& entity_name,
+ const AuthMethodList& auth_supported)
+{
+ // restart authentication handshake
+ state = State::NEGOTIATING;
+
+ // send an initial keepalive to ensure our timestamp is valid by the
+ // time we are in an OPENED state (by sequencing this before
+ // authentication).
+ con->send_keepalive();
+
+ auto m = new MAuth;
+ m->protocol = 0;
+ m->monmap_epoch = epoch;
+ __u8 struct_v = 1;
+ ::encode(struct_v, m->auth_payload);
+ ::encode(auth_supported.get_supported_set(), m->auth_payload);
+ ::encode(entity_name, m->auth_payload);
+ ::encode(global_id, m->auth_payload);
+ con->send_message(m);
+}
+
+int MonConnection::handle_auth(MAuthReply* m,
+ const EntityName& entity_name,
+ uint32_t want_keys,
+ RotatingKeyRing* keyring)
+{
+ if (state == State::NEGOTIATING) {
+ int r = _negotiate(m, entity_name, want_keys, keyring);
+ if (r) {
+ return r;
+ }
+ state = State::AUTHENTICATING;
+ }
+ int r = authenticate(m);
+ if (!r) {
+ state = State::HAVE_SESSION;
+ }
+ return r;
+}
+
+int MonConnection::_negotiate(MAuthReply *m,
+ const EntityName& entity_name,
+ uint32_t want_keys,
+ RotatingKeyRing* keyring)
+{
+ if (auth && (int)m->protocol == auth->get_protocol()) {
+ // good, negotiation completed
+ auth->reset();
+ return 0;
+ }
+
+ auth.reset(get_auth_client_handler(cct, m->protocol, keyring));
+ if (!auth) {
+ ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl;
+ if (m->result == -ENOTSUP) {
+ ldout(cct, 10) << "none of our auth protocols are supported by the server"
+ << dendl;
+ }
+ return m->result;
+ }
+
+ // do not request MGR key unless the mon has the SERVER_KRAKEN
+ // feature. otherwise it will give us an auth error. note that
+ // we have to use the FEATUREMASK because pre-jewel the kraken
+ // feature bit was used for something else.
+ if ((want_keys & CEPH_ENTITY_TYPE_MGR) &&
+ !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
+ ldout(cct, 1) << __func__
+ << " not requesting MGR keys from pre-kraken monitor"
+ << dendl;
+ want_keys &= ~CEPH_ENTITY_TYPE_MGR;
+ }
+ auth->set_want_keys(want_keys);
+ auth->init(entity_name);
+ auth->set_global_id(global_id);
+ return 0;
+}
+
+int MonConnection::authenticate(MAuthReply *m)
+{
+ assert(auth);
+ if (!m->global_id) {
+ ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
+ }
+ if (m->global_id != global_id) {
+ // it's a new session
+ auth->reset();
+ global_id = m->global_id;
+ auth->set_global_id(global_id);
+ ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
+ }
+ auto p = m->result_bl.begin();
+ int ret = auth->handle_response(m->result, p);
+ if (ret == -EAGAIN) {
+ auto ma = new MAuth;
+ ma->protocol = auth->get_protocol();
+ auth->prepare_build_request();
+ auth->build_request(ma->auth_payload);
+ con->send_message(ma);
+ }
+ return ret;
+}