]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: check for is_stopping after locking osd_lock or heartbeat_lock
authorSamuel Just <sam.just@inktank.com>
Tue, 19 Mar 2013 16:55:19 +0000 (09:55 -0700)
committerSamuel Just <sam.just@inktank.com>
Fri, 22 Mar 2013 01:37:34 +0000 (18:37 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h

index 3fd4b5451294863fe9a2710e8e3a546e6bf26824..2ce3463b4e16b5bc486b14f2b4c29587669b0018 100644 (file)
@@ -842,6 +842,8 @@ void OSD::handle_signal(int signum)
 int OSD::pre_init()
 {
   Mutex::Locker lock(osd_lock);
+  if (is_stopping())
+    return 0;
   
   assert(!store);
   store = create_object_store(dev_path, journal_path);
@@ -911,6 +913,8 @@ public:
 int OSD::init()
 {
   Mutex::Locker lock(osd_lock);
+  if (is_stopping())
+    return 0;
 
   tick_timer.init();
   service.backfill_request_timer.init();
@@ -1058,6 +1062,8 @@ int OSD::init()
     monc->shutdown();
     store->umount();
     osd_lock.Lock(); // locker is going to unlock this on function exit
+    if (is_stopping())
+      return 0;
     return r;
   }
 
@@ -1066,6 +1072,8 @@ int OSD::init()
   }
 
   osd_lock.Lock();
+  if (is_stopping())
+    return 0;
 
   dout(10) << "ensuring pgs have consumed prior maps" << dendl;
   consume_map();
@@ -2062,10 +2070,11 @@ void OSD::_add_heartbeat_peer(int p)
 
 void OSD::need_heartbeat_peer_update()
 {
-  heartbeat_lock.Lock();
+  Mutex::Locker l(heartbeat_lock);
+  if (is_stopping())
+    return;
   dout(20) << "need_heartbeat_peer_update" << dendl;
   heartbeat_need_update = true;
-  heartbeat_lock.Unlock();
 }
 
 void OSD::maybe_update_heartbeat_peers()
@@ -2117,15 +2126,15 @@ void OSD::maybe_update_heartbeat_peers()
 
 void OSD::reset_heartbeat_peers()
 {
+  assert(osd_lock.is_locked());
   dout(10) << "reset_heartbeat_peers" << dendl;
-  heartbeat_lock.Lock();
+  Mutex::Locker l(heartbeat_lock);
   while (!heartbeat_peers.empty()) {
     hbclient_messenger->mark_down(heartbeat_peers.begin()->second.con);
     heartbeat_peers.begin()->second.con->put();
     heartbeat_peers.erase(heartbeat_peers.begin());
   }
   failure_queue.clear();
-  heartbeat_lock.Unlock();
 }
 
 void OSD::handle_osd_ping(MOSDPing *m)
@@ -2140,6 +2149,10 @@ void OSD::handle_osd_ping(MOSDPing *m)
   int from = m->get_source().num();
 
   heartbeat_lock.Lock();
+  if (is_stopping()) {
+    heartbeat_lock.Unlock();
+    return;
+  }
 
   OSDMapRef curmap = service.get_osdmap();
   
@@ -2239,7 +2252,9 @@ void OSD::handle_osd_ping(MOSDPing *m)
 
 void OSD::heartbeat_entry()
 {
-  heartbeat_lock.Lock();
+  Mutex::Locker l(heartbeat_lock);
+  if (is_stopping())
+    return;
   while (!heartbeat_stop) {
     heartbeat();
 
@@ -2248,9 +2263,10 @@ void OSD::heartbeat_entry()
     w.set_from_double(wait);
     dout(30) << "heartbeat_entry sleeping for " << wait << dendl;
     heartbeat_cond.WaitInterval(g_ceph_context, heartbeat_lock, w);
+    if (is_stopping())
+      return;
     dout(30) << "heartbeat_entry woke up" << dendl;
   }
-  heartbeat_lock.Unlock();
 }
 
 void OSD::heartbeat_check()
@@ -2354,6 +2370,10 @@ bool OSD::heartbeat_reset(Connection *con)
   HeartbeatSession *s = static_cast<HeartbeatSession*>(con->get_priv());
   if (s) {
     heartbeat_lock.Lock();
+    if (is_stopping()) {
+      heartbeat_lock.Unlock();
+      return true;
+    }
     map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
     if (p != heartbeat_peers.end() &&
        p->second.con == con) {
@@ -2633,6 +2653,8 @@ void OSD::ms_handle_connect(Connection *con)
 {
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
     Mutex::Locker l(osd_lock);
+    if (is_stopping())
+      return;
     dout(10) << "ms_handle_connect on mon" << dendl;
     if (is_booting()) {
       start_boot();
@@ -2680,6 +2702,8 @@ void OSD::start_boot()
 void OSD::_maybe_boot(epoch_t oldest, epoch_t newest)
 {
   Mutex::Locker l(osd_lock);
+  if (is_stopping())
+    return;
   dout(10) << "_maybe_boot mon has osdmaps " << oldest << ".." << newest << dendl;
 
   if (is_initializing()) {
@@ -2842,6 +2866,7 @@ void OSDService::send_pg_temp()
 
 void OSD::send_failures()
 {
+  assert(osd_lock.is_locked());
   bool locked = false;
   if (!failure_queue.empty()) {
     heartbeat_lock.Lock();
@@ -3346,7 +3371,6 @@ void OSD::_share_map_outgoing(int peer, Connection *con, OSDMapRef map)
 bool OSD::heartbeat_dispatch(Message *m)
 {
   dout(30) << "heartbeat_dispatch " << m << dendl;
-
   switch (m->get_type()) {
     
   case CEPH_MSG_PING:
@@ -3379,6 +3403,10 @@ bool OSD::ms_dispatch(Message *m)
   // lock!
 
   osd_lock.Lock();
+  if (is_stopping()) {
+    osd_lock.Unlock();
+    return true;
+  }
 
   while (dispatch_running) {
     dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl;
@@ -3854,6 +3882,7 @@ void OSD::wait_for_new_map(OpRequestRef op)
 
 void OSD::note_down_osd(int peer)
 {
+  assert(osd_lock.is_locked());
   cluster_messenger->mark_down(osdmap->get_cluster_addr(peer));
 
   heartbeat_lock.Lock();
@@ -6285,6 +6314,8 @@ struct C_CompleteSplits : public Context {
     : osd(osd), pgs(in) {}
   void finish(int r) {
     Mutex::Locker l(osd->osd_lock);
+    if (osd->is_stopping())
+      return;
     PG::RecoveryCtx rctx = osd->create_context();
     set<pg_t> to_complete;
     for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin();
index 0a5386acacd012c3619af7918768b83f1fe38998..d41c9fec4d6f9592e4eb368af8790c6db3b9c031 100644 (file)
@@ -1099,6 +1099,10 @@ protected:
     }
     void _process(Command *c) {
       osd->osd_lock.Lock();
+      if (osd->is_stopping()) {
+       delete c;
+       return;
+      }
       osd->do_command(c->con, c->tid, c->cmd, c->indata);
       osd->osd_lock.Unlock();
       delete c;
@@ -1353,6 +1357,10 @@ protected:
     }
     void _process(MOSDRepScrub *msg) {
       osd->osd_lock.Lock();
+      if (osd->is_stopping()) {
+       osd->osd_lock.Unlock();
+       return;
+      }
       if (osd->_have_pg(msg->pgid)) {
        PG *pg = osd->_lookup_lock_pg(msg->pgid);
        osd->osd_lock.Unlock();