]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: move the monitor report to OSD::tick_without_osd_lock
authorGuang Yang <yguang@yahoo-inc.com>
Tue, 22 Sep 2015 20:59:28 +0000 (20:59 +0000)
committerSage Weil <sage@redhat.com>
Mon, 23 Nov 2015 13:36:15 +0000 (08:36 -0500)
Fixes: #12722
Reviewed-by: Guang Yang <yguang@yahoo-inc.com>
src/osd/OSD.cc

index 43bd5602adf8c9d93fa6d35d27fb0454b7585fb8..cc1755a81b28123658b5af7db4bae9894e42b2d4 100644 (file)
@@ -4014,6 +4014,49 @@ void OSD::tick()
     heartbeat_check();
     heartbeat_lock.Unlock();
 
+    map_lock.put_read();
+  }
+
+  if (is_waiting_for_healthy()) {
+    if (_is_healthy()) {
+      dout(1) << "healthy again, booting" << dendl;
+      set_state(STATE_BOOTING);
+      start_boot();
+    }
+  }
+
+  if (is_active()) {
+    // periodically kick recovery work queue
+    recovery_tp.wake();
+
+    check_replay_queue();
+  }
+
+  // only do waiters if dispatch() isn't currently running.  (if it is,
+  // it'll do the waiters, and doing them here may screw up ordering
+  // of op_queue vs handle_osd_map.)
+  if (!dispatch_running) {
+    dispatch_running = true;
+    do_waiters();
+    dispatch_running = false;
+    dispatch_cond.Signal();
+  }
+
+  check_ops_in_flight();
+
+  tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this));
+}
+
+void OSD::tick_without_osd_lock()
+{
+  assert(tick_timer_lock.is_locked());
+  dout(5) << "tick_without_osd_lock" << dendl;
+
+  // osd_lock is not being held, which means the OSD state
+  // might change when doing the monitor report
+  if (is_active() || is_waiting_for_healthy()) {
+    map_lock.get_read();
+
     // mon report?
     bool reset = false;
     bool report = false;
@@ -4061,44 +4104,9 @@ void OSD::tick()
       send_failures();
       send_pg_stats(now);
     }
-
     map_lock.put_read();
   }
 
-  if (is_waiting_for_healthy()) {
-    if (_is_healthy()) {
-      dout(1) << "healthy again, booting" << dendl;
-      start_boot();
-    }
-  }
-
-  if (is_active()) {
-    // periodically kick recovery work queue
-    recovery_tp.wake();
-
-    check_replay_queue();
-  }
-
-  // only do waiters if dispatch() isn't currently running.  (if it is,
-  // it'll do the waiters, and doing them here may screw up ordering
-  // of op_queue vs handle_osd_map.)
-  if (!dispatch_running) {
-    dispatch_running = true;
-    do_waiters();
-    dispatch_running = false;
-    dispatch_cond.Signal();
-  }
-
-  check_ops_in_flight();
-
-  tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this));
-}
-
-void OSD::tick_without_osd_lock()
-{
-  assert(tick_timer_lock.is_locked());
-  dout(5) << "tick_without_osd_lock" << dendl;
-
   if (!scrub_random_backoff()) {
     sched_scrub();
   }
@@ -4408,12 +4416,14 @@ void OSD::ms_handle_connect(Connection *con)
       last_mon_report = now;
 
       // resend everything, it's a new session
+      map_lock.get_read();
       send_alive();
       service.requeue_pg_temp();
       service.send_pg_temp();
       requeue_failures();
       send_failures();
       send_pg_stats(now);
+      map_lock.put_read();
 
       monc->sub_want("osd_pg_creates", 0, CEPH_SUBSCRIBE_ONETIME);
       monc->sub_want("osdmap", osdmap->get_epoch(), CEPH_SUBSCRIBE_ONETIME);
@@ -4756,7 +4766,6 @@ void OSD::got_full_map(epoch_t e)
 
 void OSD::requeue_failures()
 {
-  assert(osd_lock.is_locked());
   Mutex::Locker l(heartbeat_lock);
   unsigned old_queue = failure_queue.size();
   unsigned old_pending = failure_pending.size();
@@ -4772,7 +4781,7 @@ void OSD::requeue_failures()
 
 void OSD::send_failures()
 {
-  assert(osd_lock.is_locked());
+  assert(map_lock.is_locked());
   Mutex::Locker l(heartbeat_lock);
   utime_t now = ceph_clock_now(cct);
   while (!failure_queue.empty()) {
@@ -4797,8 +4806,7 @@ void OSD::send_still_alive(epoch_t epoch, const entity_inst_t &i)
 
 void OSD::send_pg_stats(const utime_t &now)
 {
-  assert(osd_lock.is_locked());
-
+  assert(map_lock.is_locked());
   dout(20) << "send_pg_stats" << dendl;
 
   osd_stat_t cur_stat = service.get_osd_stat();
@@ -4919,10 +4927,12 @@ void OSD::handle_pg_stats_ack(MPGStatsAck *ack)
 void OSD::flush_pg_stats()
 {
   dout(10) << "flush_pg_stats" << dendl;
+  osd_lock.Unlock();
   utime_t now = ceph_clock_now(cct);
+  map_lock.get_read();
   send_pg_stats(now);
+  map_lock.put_read();
 
-  osd_lock.Unlock();
 
   pg_stat_queue_lock.Lock();
   uint64_t tid = pg_stat_tid;