From: Guang Yang Date: Tue, 22 Sep 2015 20:59:28 +0000 (+0000) Subject: osd: move the monitor report to OSD::tick_without_osd_lock X-Git-Tag: v10.0.1~26^2~40 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8b5b6c85cc6225d961debfb98621927fc50e81b5;p=ceph.git osd: move the monitor report to OSD::tick_without_osd_lock Fixes: #12722 Reviewed-by: Guang Yang --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 43bd5602adf8..cc1755a81b28 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4014,6 +4014,49 @@ void OSD::tick() heartbeat_check(); heartbeat_lock.Unlock(); + map_lock.put_read(); + } + + if (is_waiting_for_healthy()) { + if (_is_healthy()) { + dout(1) << "healthy again, booting" << dendl; + set_state(STATE_BOOTING); + start_boot(); + } + } + + if (is_active()) { + // periodically kick recovery work queue + recovery_tp.wake(); + + check_replay_queue(); + } + + // only do waiters if dispatch() isn't currently running. (if it is, + // it'll do the waiters, and doing them here may screw up ordering + // of op_queue vs handle_osd_map.) + if (!dispatch_running) { + dispatch_running = true; + do_waiters(); + dispatch_running = false; + dispatch_cond.Signal(); + } + + check_ops_in_flight(); + + tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this)); +} + +void OSD::tick_without_osd_lock() +{ + assert(tick_timer_lock.is_locked()); + dout(5) << "tick_without_osd_lock" << dendl; + + // osd_lock is not being held, which means the OSD state + // might change when doing the monitor report + if (is_active() || is_waiting_for_healthy()) { + map_lock.get_read(); + // mon report? bool reset = false; bool report = false; @@ -4061,44 +4104,9 @@ void OSD::tick() send_failures(); send_pg_stats(now); } - map_lock.put_read(); } - if (is_waiting_for_healthy()) { - if (_is_healthy()) { - dout(1) << "healthy again, booting" << dendl; - start_boot(); - } - } - - if (is_active()) { - // periodically kick recovery work queue - recovery_tp.wake(); - - check_replay_queue(); - } - - // only do waiters if dispatch() isn't currently running. (if it is, - // it'll do the waiters, and doing them here may screw up ordering - // of op_queue vs handle_osd_map.) - if (!dispatch_running) { - dispatch_running = true; - do_waiters(); - dispatch_running = false; - dispatch_cond.Signal(); - } - - check_ops_in_flight(); - - tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this)); -} - -void OSD::tick_without_osd_lock() -{ - assert(tick_timer_lock.is_locked()); - dout(5) << "tick_without_osd_lock" << dendl; - if (!scrub_random_backoff()) { sched_scrub(); } @@ -4408,12 +4416,14 @@ void OSD::ms_handle_connect(Connection *con) last_mon_report = now; // resend everything, it's a new session + map_lock.get_read(); send_alive(); service.requeue_pg_temp(); service.send_pg_temp(); requeue_failures(); send_failures(); send_pg_stats(now); + map_lock.put_read(); monc->sub_want("osd_pg_creates", 0, CEPH_SUBSCRIBE_ONETIME); monc->sub_want("osdmap", osdmap->get_epoch(), CEPH_SUBSCRIBE_ONETIME); @@ -4756,7 +4766,6 @@ void OSD::got_full_map(epoch_t e) void OSD::requeue_failures() { - assert(osd_lock.is_locked()); Mutex::Locker l(heartbeat_lock); unsigned old_queue = failure_queue.size(); unsigned old_pending = failure_pending.size(); @@ -4772,7 +4781,7 @@ void OSD::requeue_failures() void OSD::send_failures() { - assert(osd_lock.is_locked()); + assert(map_lock.is_locked()); Mutex::Locker l(heartbeat_lock); utime_t now = ceph_clock_now(cct); while (!failure_queue.empty()) { @@ -4797,8 +4806,7 @@ void OSD::send_still_alive(epoch_t epoch, const entity_inst_t &i) void OSD::send_pg_stats(const utime_t &now) { - assert(osd_lock.is_locked()); - + assert(map_lock.is_locked()); dout(20) << "send_pg_stats" << dendl; osd_stat_t cur_stat = service.get_osd_stat(); @@ -4919,10 +4927,12 @@ void OSD::handle_pg_stats_ack(MPGStatsAck *ack) void OSD::flush_pg_stats() { dout(10) << "flush_pg_stats" << dendl; + osd_lock.Unlock(); utime_t now = ceph_clock_now(cct); + map_lock.get_read(); send_pg_stats(now); + map_lock.put_read(); - osd_lock.Unlock(); pg_stat_queue_lock.Lock(); uint64_t tid = pg_stat_tid;