while (monmap.epoch == 0) {
cur_mon = monmap.pick_random_mon();
+ cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
dout(10) << "querying mon." << cur_mon << " " << monmap.get_inst(cur_mon) << dendl;
- messenger->send_message(new MMonGetMap, monmap.get_inst(cur_mon));
+ messenger->send_message(new MMonGetMap, cur_con);
if (--attempt == 0)
break;
utime_t interval(1, 0);
map_cond.WaitInterval(monc_lock, interval);
- if (monmap.epoch == 0)
- messenger->mark_down(monmap.get_addr(cur_mon)); // nope, clean that connection up
+ if (monmap.epoch == 0) {
+ messenger->mark_down(cur_con); // nope, clean that connection up
+ cur_con->put();
+ }
}
if (temp_msgr) {
hunting = true; // reset this to true!
cur_mon.clear();
+ cur_con->put();
+ cur_con = NULL;
+
if (monmap.epoch)
return 0;
return -1;
if (my_addr == entity_addr_t())
my_addr = messenger->get_myaddr();
+ // we only care about these message types
switch (m->get_type()) {
case CEPH_MSG_MON_MAP:
- handle_monmap((MMonMap*)m);
+ case CEPH_MSG_AUTH_REPLY:
+ case CEPH_MSG_MON_SUBSCRIBE_ACK:
+ break;
+ default:
+ return false;
+ }
+
+ // ignore any messages outside our current session
+ if (m->get_connection() != cur_con) {
+ dout(0) << "discarding stray montior message " << *m << dendl;
+ m->put();
return true;
+ }
+ switch (m->get_type()) {
+ case CEPH_MSG_MON_MAP:
+ handle_monmap((MMonMap*)m);
+ break;
case CEPH_MSG_AUTH_REPLY:
handle_auth((MAuthReply*)m);
- return true;
-
+ break;
case CEPH_MSG_MON_SUBSCRIBE_ACK:
handle_subscribe_ack((MMonSubscribeAck*)m);
- return true;
+ break;
}
-
-
- return false;
+ return true;
}
void MonClient::handle_monmap(MMonMap *m)
{
monc_lock.Lock();
timer.shutdown();
+
+ cur_con->put();
+ cur_con = NULL;
+
monc_lock.Unlock();
}
assert(!cur_mon.empty());
if (force || state == MC_STATE_HAVE_SESSION) {
dout(10) << "_send_mon_message to mon." << cur_mon << " at " << monmap.get_inst(cur_mon) << dendl;
- messenger->send_message(m, monmap.get_inst(cur_mon));
+ messenger->send_message(m, cur_con);
} else {
waiting_for_session.push_back(m);
}
void MonClient::_pick_new_mon()
{
assert(monc_lock.is_locked());
- if (!cur_mon.empty())
- messenger->mark_down(monmap.get_addr(cur_mon));
if (!cur_mon.empty() && monmap.size() > 1) {
// pick a _different_ mon
} else {
cur_mon = monmap.pick_random_mon();
}
- dout(10) << "_pick_new_mon picked mon." << cur_mon << dendl;
+
+ if (cur_con) {
+ messenger->mark_down(cur_con);
+ cur_con->put();
+ }
+ cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
+
+ dout(10) << "_pick_new_mon picked mon." << cur_mon << " con " << cur_con << dendl;
}