]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
monclient: maintain explicit session connection; ignore stray messages
authorSage Weil <sage@newdream.net>
Tue, 10 May 2011 23:54:23 +0000 (16:54 -0700)
committerSage Weil <sage@newdream.net>
Tue, 10 May 2011 23:54:23 +0000 (16:54 -0700)
Maintain an explicit Connection handle to send messages and mark_down old
monitor connections.  Ignore any incoming message that is not part of that
session.  This fixes problems with incoming messages that race with
session restarts.

Fixes: #1033
Reported-by: Wido den Hollander <wido@widodh.nl>
Signed-off-by: Sage Weil <sage@newdream.net>
src/mon/MonClient.cc
src/mon/MonClient.h

index c932cd560559fec74631ddd01d3d60cdb01dcfbb..d96e619075feb241920cf4eeb815372ed474bf9a 100644 (file)
@@ -199,8 +199,9 @@ int MonClient::get_monmap_privately()
   
   while (monmap.epoch == 0) {
     cur_mon = monmap.pick_random_mon();
+    cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
     dout(10) << "querying mon." << cur_mon << " " << monmap.get_inst(cur_mon) << dendl;
-    messenger->send_message(new MMonGetMap, monmap.get_inst(cur_mon));
+    messenger->send_message(new MMonGetMap, cur_con);
     
     if (--attempt == 0)
       break;
@@ -208,8 +209,10 @@ int MonClient::get_monmap_privately()
     utime_t interval(1, 0);
     map_cond.WaitInterval(monc_lock, interval);
 
-    if (monmap.epoch == 0)
-      messenger->mark_down(monmap.get_addr(cur_mon));  // nope, clean that connection up
+    if (monmap.epoch == 0) {
+      messenger->mark_down(cur_con);  // nope, clean that connection up
+      cur_con->put();
+    }
   }
 
   if (temp_msgr) {
@@ -224,6 +227,9 @@ int MonClient::get_monmap_privately()
   hunting = true;  // reset this to true!
   cur_mon.clear();
 
+  cur_con->put();
+  cur_con = NULL;
+
   if (monmap.epoch)
     return 0;
   return -1;
@@ -235,22 +241,35 @@ bool MonClient::ms_dispatch(Message *m)
   if (my_addr == entity_addr_t())
     my_addr = messenger->get_myaddr();
 
+  // we only care about these message types
   switch (m->get_type()) {
   case CEPH_MSG_MON_MAP:
-    handle_monmap((MMonMap*)m);
+  case CEPH_MSG_AUTH_REPLY:
+  case CEPH_MSG_MON_SUBSCRIBE_ACK:
+    break;
+  default:
+    return false;
+  }
+
+  // ignore any messages outside our current session
+  if (m->get_connection() != cur_con) {
+    dout(0) << "discarding stray montior message " << *m << dendl;
+    m->put();
     return true;
+  }
 
+  switch (m->get_type()) {
+  case CEPH_MSG_MON_MAP:
+    handle_monmap((MMonMap*)m);
+    break;
   case CEPH_MSG_AUTH_REPLY:
     handle_auth((MAuthReply*)m);
-    return true;
-
+    break;
   case CEPH_MSG_MON_SUBSCRIBE_ACK:
     handle_subscribe_ack((MMonSubscribeAck*)m);
-    return true;
+    break;
   }
-
-
-  return false;
+  return true;
 }
 
 void MonClient::handle_monmap(MMonMap *m)
@@ -330,6 +349,10 @@ void MonClient::shutdown()
 {
   monc_lock.Lock();
   timer.shutdown();
+
+  cur_con->put();
+  cur_con = NULL;
+
   monc_lock.Unlock();
 }
 
@@ -433,7 +456,7 @@ void MonClient::_send_mon_message(Message *m, bool force)
   assert(!cur_mon.empty());
   if (force || state == MC_STATE_HAVE_SESSION) {
     dout(10) << "_send_mon_message to mon." << cur_mon << " at " << monmap.get_inst(cur_mon) << dendl;
-    messenger->send_message(m, monmap.get_inst(cur_mon));
+    messenger->send_message(m, cur_con);
   } else {
     waiting_for_session.push_back(m);
   }
@@ -442,8 +465,6 @@ void MonClient::_send_mon_message(Message *m, bool force)
 void MonClient::_pick_new_mon()
 {
   assert(monc_lock.is_locked());
-  if (!cur_mon.empty())
-    messenger->mark_down(monmap.get_addr(cur_mon));
 
   if (!cur_mon.empty() && monmap.size() > 1) {
     // pick a _different_ mon
@@ -451,7 +472,14 @@ void MonClient::_pick_new_mon()
   } else {
     cur_mon = monmap.pick_random_mon();
   }
-  dout(10) << "_pick_new_mon picked mon." << cur_mon << dendl;
+
+  if (cur_con) {
+    messenger->mark_down(cur_con);
+    cur_con->put();
+  }
+  cur_con = messenger->get_connection(monmap.get_inst(cur_mon));
+
+  dout(10) << "_pick_new_mon picked mon." << cur_mon << " con " << cur_con << dendl;
 }
 
 
index cde1e78bc6e0512fe06d54b96b877e8b97c88757..c778d86f009516452de1906d8333617914053b76 100644 (file)
@@ -52,6 +52,7 @@ private:
   Messenger *messenger;
 
   string cur_mon;
+  Connection *cur_con;
 
   EntityName entity_name;
 
@@ -164,6 +165,7 @@ public:
   MonClient(RotatingKeyRing *rkeys=0) :
     state(MC_STATE_NONE),
     messenger(NULL),
+    cur_con(NULL),
     monc_lock("MonClient::monc_lock"),
     timer(monc_lock),
     hunting(true),