From 6be6625024de8a8e71d577194f2f20073b58f85f Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Fri, 3 Mar 2017 16:06:25 -0500 Subject: [PATCH] rbd-mirror: correct race conditions within leader watcher It was possible for multiple, concurrent acquire attempts to be running for a single instance due to notification messages. Now only a single acquire state machine can be running. Signed-off-by: Jason Dillaman --- src/librbd/managed_lock/Types.h | 12 +- .../rbd_mirror/test_mock_LeaderWatcher.cc | 16 +- src/tools/rbd_mirror/LeaderWatcher.cc | 231 +++++++++++++----- src/tools/rbd_mirror/LeaderWatcher.h | 44 +++- 4 files changed, 230 insertions(+), 73 deletions(-) diff --git a/src/librbd/managed_lock/Types.h b/src/librbd/managed_lock/Types.h index c03aa7c9a5b67..319789c83659e 100644 --- a/src/librbd/managed_lock/Types.h +++ b/src/librbd/managed_lock/Types.h @@ -14,7 +14,14 @@ struct Locker { entity_name_t entity; std::string cookie; std::string address; - uint64_t handle; + uint64_t handle = 0; + + Locker() { + } + Locker(const entity_name_t& entity, const std::string &cookie, + const std::string &address, uint64_t handle) + : entity(entity), cookie(cookie), address(address), handle(handle) { + } inline bool operator==(const Locker &rhs) const { return (entity == rhs.entity && @@ -22,6 +29,9 @@ struct Locker { address == rhs.address && handle == rhs.handle); } + inline bool operator!=(const Locker &rhs) const { + return !(*this == rhs); + } }; enum Mode { diff --git a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc index 4ac1298fee05d..7adc2737b79af 100644 --- a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc +++ b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc @@ -434,6 +434,7 @@ TEST_F(TestMockLeaderWatcher, InitShutdown) { // Inint C_SaferCond on_heartbeat_finish; + expect_is_leader(mock_managed_lock, false, false); expect_try_acquire_lock(mock_managed_lock, 0); expect_init(mock_mirror_status_watcher, 0); expect_init(mock_instances, 0); @@ -450,6 +451,7 @@ TEST_F(TestMockLeaderWatcher, InitShutdown) { expect_is_leader(mock_managed_lock, false, false); expect_release_lock(mock_managed_lock, 0); expect_shut_down(mock_managed_lock, true, 0); + expect_is_leader(mock_managed_lock, false, false); leader_watcher.shut_down(); } @@ -470,6 +472,7 @@ TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) { // Inint C_SaferCond on_heartbeat_finish; + expect_is_leader(mock_managed_lock, false, false); expect_try_acquire_lock(mock_managed_lock, 0); expect_init(mock_mirror_status_watcher, 0); expect_init(mock_instances, 0); @@ -493,6 +496,7 @@ TEST_F(TestMockLeaderWatcher, InitReleaseShutdown) { // Shutdown expect_shut_down(mock_managed_lock, false, 0); + expect_is_leader(mock_managed_lock, false, false); leader_watcher.shut_down(); } @@ -514,6 +518,7 @@ TEST_F(TestMockLeaderWatcher, AcquireError) { // Inint C_SaferCond on_get_locker_finish; + expect_is_leader(mock_managed_lock, false, false); expect_try_acquire_lock(mock_managed_lock, -EAGAIN); expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), 0, &on_get_locker_finish); @@ -522,6 +527,7 @@ TEST_F(TestMockLeaderWatcher, AcquireError) { // Shutdown expect_shut_down(mock_managed_lock, false, 0); + expect_is_leader(mock_managed_lock, false, false); leader_watcher.shut_down(); } @@ -542,6 +548,7 @@ TEST_F(TestMockLeaderWatcher, ReleaseError) { // Inint C_SaferCond on_heartbeat_finish; + expect_is_leader(mock_managed_lock, false, false); expect_try_acquire_lock(mock_managed_lock, 0); expect_init(mock_mirror_status_watcher, 0); expect_init(mock_instances, 0); @@ -565,6 +572,7 @@ TEST_F(TestMockLeaderWatcher, ReleaseError) { // Shutdown expect_shut_down(mock_managed_lock, false, 0); + expect_is_leader(mock_managed_lock, false, false); leader_watcher.shut_down(); } @@ -594,11 +602,10 @@ TEST_F(TestMockLeaderWatcher, Break) { MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener); // Init - for (int i = 0; i <= max_acquire_attempts; i++) { + expect_is_leader(mock_managed_lock, false, false); + for (int i = 0; i < max_acquire_attempts; i++) { expect_try_acquire_lock(mock_managed_lock, -EAGAIN); - if (i < max_acquire_attempts) { - expect_get_locker(mock_managed_lock, locker, 0); - } + expect_get_locker(mock_managed_lock, locker, 0); } C_SaferCond on_break; expect_break_lock(mock_managed_lock, locker, 0, &on_break); @@ -619,6 +626,7 @@ TEST_F(TestMockLeaderWatcher, Break) { expect_is_leader(mock_managed_lock, false, false); expect_release_lock(mock_managed_lock, 0); expect_shut_down(mock_managed_lock, true, 0); + expect_is_leader(mock_managed_lock, false, false); leader_watcher.shut_down(); } diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc index 9b8404577d6fd..8376bbe6fe0ea 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.cc +++ b/src/tools/rbd_mirror/LeaderWatcher.cc @@ -119,13 +119,14 @@ void LeaderWatcher::handle_register_watch(int r) { Context *on_finish = nullptr; { + Mutex::Locker timer_locker(m_threads->timer_lock); Mutex::Locker locker(m_lock); if (r < 0) { derr << "error registering leader watcher for " << m_oid << " object: " << cpp_strerror(r) << dendl; } else { - acquire_leader_lock(true); + schedule_acquire_leader_lock(0); } std::swap(on_finish, m_on_finish); @@ -197,19 +198,45 @@ template void LeaderWatcher::handle_unregister_watch(int r) { dout(20) << "r=" << r << dendl; - Context *on_finish = nullptr; - { - Mutex::Locker locker(m_lock); + if (r < 0) { + derr << "error unregistering leader watcher for " << m_oid << " object: " + << cpp_strerror(r) << dendl; + } + wait_for_tasks(); +} - if (r < 0) { - derr << "error unregistering leader watcher for " << m_oid << " object: " - << cpp_strerror(r) << dendl; - } +template +void LeaderWatcher::wait_for_tasks() { + dout(20) << dendl; - assert(m_on_shut_down_finish != nullptr); - std::swap(on_finish, m_on_shut_down_finish); - } - on_finish->complete(0); + Mutex::Locker timer_locker(m_threads->timer_lock); + Mutex::Locker locker(m_lock); + schedule_timer_task("wait for tasks", 0, false, + &LeaderWatcher::handle_wait_for_tasks, true); +} + +template +void LeaderWatcher::handle_wait_for_tasks() { + dout(20) << dendl; + + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + assert(m_on_shut_down_finish != nullptr); + + assert(!m_timer_op_tracker.empty()); + m_timer_op_tracker.finish_op(); + + auto ctx = new FunctionContext([this](int r) { + Context *on_finish; + { + // ensure lock isn't held when completing shut down + Mutex::Locker locker(m_lock); + assert(m_on_shut_down_finish != nullptr); + on_finish = m_on_shut_down_finish; + } + on_finish->complete(0); + }); + m_work_queue->queue(ctx, 0); } template @@ -271,25 +298,36 @@ void LeaderWatcher::cancel_timer_task() { template void LeaderWatcher::schedule_timer_task(const std::string &name, int delay_factor, bool leader, - void (LeaderWatcher::*cb)()) { + TimerCallback timer_callback, + bool shutting_down) { assert(m_threads->timer_lock.is_locked()); assert(m_lock.is_locked()); - if (m_on_shut_down_finish != nullptr) { + if (!shutting_down && m_on_shut_down_finish != nullptr) { return; } cancel_timer_task(); m_timer_task = new FunctionContext( - [this, cb, leader](int r) { + [this, leader, timer_callback](int r) { assert(m_threads->timer_lock.is_locked()); m_timer_task = nullptr; - Mutex::Locker locker(m_lock); - if (is_leader(m_lock) != leader) { + + if (m_timer_op_tracker.empty()) { + Mutex::Locker locker(m_lock); + execute_timer_task(leader, timer_callback); return; } - (this->*cb)(); + + // old timer task is still running -- do not start next + // task until the previous task completes + if (m_timer_gate == nullptr) { + m_timer_gate = new C_TimerGate(this); + m_timer_op_tracker.wait_for_ops(m_timer_gate); + } + m_timer_gate->leader = leader; + m_timer_gate->timer_callback = timer_callback; }); int after = delay_factor * @@ -300,6 +338,23 @@ void LeaderWatcher::schedule_timer_task(const std::string &name, m_threads->timer->add_event_after(after, m_timer_task); } +template +void LeaderWatcher::execute_timer_task(bool leader, + TimerCallback timer_callback) { + dout(20) << dendl; + + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + assert(m_timer_op_tracker.empty()); + + if (is_leader(m_lock) != leader) { + return; + } + + m_timer_op_tracker.start_op(); + (this->*timer_callback)(); +} + template void LeaderWatcher::handle_post_acquire_leader_lock(int r, Context *on_finish) { @@ -356,10 +411,12 @@ template void LeaderWatcher::break_leader_lock() { dout(20) << dendl; + assert(m_threads->timer_lock.is_locked()); assert(m_lock.is_locked()); + assert(!m_timer_op_tracker.empty()); if (m_locker.cookie.empty()) { - acquire_leader_lock(true); + get_locker(); return; } @@ -376,27 +433,50 @@ void LeaderWatcher::handle_break_leader_lock(int r) { Mutex::Locker timer_locker(m_threads->timer_lock); Mutex::Locker locker(m_lock); + assert(!m_timer_op_tracker.empty()); if (m_leader_lock->is_shutdown()) { dout(20) << "canceling due to shutdown" << dendl; + m_timer_op_tracker.finish_op(); return; } if (r < 0 && r != -ENOENT) { derr << "error beaking leader lock: " << cpp_strerror(r) << dendl; - - schedule_timer_task("get locker", 1, false, &LeaderWatcher::get_locker); + schedule_acquire_leader_lock(1); + m_timer_op_tracker.finish_op(); return; } - acquire_leader_lock(true); + m_locker = {}; + m_acquire_attempts = 0; + acquire_leader_lock(); +} + +template +void LeaderWatcher::schedule_get_locker(bool reset_leader, + uint32_t delay_factor) { + dout(20) << dendl; + + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + + if (reset_leader) { + m_locker = {}; + m_acquire_attempts = 0; + } + + schedule_timer_task("get locker", delay_factor, false, + &LeaderWatcher::get_locker, false); } template void LeaderWatcher::get_locker() { dout(20) << dendl; + assert(m_threads->timer_lock.is_locked()); assert(m_lock.is_locked()); + assert(!m_timer_op_tracker.empty()); C_GetLocker *get_locker_ctx = new C_GetLocker(this); Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx); @@ -411,52 +491,77 @@ void LeaderWatcher::handle_get_locker(int r, Mutex::Locker timer_locker(m_threads->timer_lock); Mutex::Locker mutex_locker(m_lock); + assert(!m_timer_op_tracker.empty()); if (m_leader_lock->is_shutdown()) { dout(20) << "canceling due to shutdown" << dendl; + m_timer_op_tracker.finish_op(); return; } if (is_leader(m_lock)) { m_locker = {}; - } else { - if (r == -ENOENT) { - acquire_leader_lock(true); - } else { - if (r < 0) { - derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl; - } else { - m_locker = locker; - } + m_timer_op_tracker.finish_op(); + return; + } - schedule_timer_task("acquire leader lock", - m_cct->_conf->rbd_mirror_leader_max_missed_heartbeats, - false, &LeaderWatcher::acquire_leader_lock); + if (r == -ENOENT) { + m_locker = {}; + m_acquire_attempts = 0; + acquire_leader_lock(); + return; + } else if (r < 0) { + derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl; + schedule_get_locker(true, 1); + m_timer_op_tracker.finish_op(); + return; + } + + if (m_locker != locker) { + m_locker = locker; + if (m_acquire_attempts > 1) { + dout(10) << "new lock owner detected -- resetting heartbeat counter" + << dendl; + m_acquire_attempts = 0; } } -} -template -void LeaderWatcher::acquire_leader_lock() { - return acquire_leader_lock(false); + if (m_acquire_attempts >= + m_cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break) { + dout(0) << "breaking leader lock after " << m_acquire_attempts << " " + << "failed attempts to acquire" << dendl; + break_leader_lock(); + } else { + schedule_acquire_leader_lock(1); + m_timer_op_tracker.finish_op(); + } } template -void LeaderWatcher::acquire_leader_lock(bool reset_attempt_counter) { - dout(20) << "reset_attempt_counter=" << reset_attempt_counter << dendl; +void LeaderWatcher::schedule_acquire_leader_lock(uint32_t delay_factor) { + dout(20) << dendl; + assert(m_threads->timer_lock.is_locked()); assert(m_lock.is_locked()); - if (reset_attempt_counter) { - m_acquire_attempts = 0; - } + schedule_timer_task("acquire leader lock", + delay_factor * + m_cct->_conf->rbd_mirror_leader_max_missed_heartbeats, + false, &LeaderWatcher::acquire_leader_lock, false); +} + +template +void LeaderWatcher::acquire_leader_lock() { + assert(m_threads->timer_lock.is_locked()); + assert(m_lock.is_locked()); + assert(!m_timer_op_tracker.empty()); + ++m_acquire_attempts; dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl; Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< LeaderWatcher, &LeaderWatcher::handle_acquire_leader_lock>(this)); - m_leader_lock->try_acquire_lock(ctx); } @@ -464,10 +569,13 @@ template void LeaderWatcher::handle_acquire_leader_lock(int r) { dout(20) << "r=" << r << dendl; + Mutex::Locker timer_locker(m_threads->timer_lock); Mutex::Locker locker(m_lock); + assert(!m_timer_op_tracker.empty()); if (m_leader_lock->is_shutdown()) { dout(20) << "canceling due to shutdown" << dendl; + m_timer_op_tracker.finish_op(); return; } @@ -477,22 +585,18 @@ void LeaderWatcher::handle_acquire_leader_lock(int r) { } else { derr << "error acquiring lock: " << cpp_strerror(r) << dendl; } - if (++m_acquire_attempts > - m_cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break) { - dout(0) << "breaking leader lock after failed attemts to acquire" - << dendl; - break_leader_lock(); - } else { - get_locker(); - } + + get_locker(); return; } + m_locker = {}; m_acquire_attempts = 0; if (m_ret_val) { dout(5) << "releasing due to error on notify" << dendl; release_leader_lock(); + m_timer_op_tracker.finish_op(); return; } @@ -524,7 +628,7 @@ void LeaderWatcher::handle_release_leader_lock(int r) { return; } - schedule_timer_task("get locker", 1, false, &LeaderWatcher::get_locker); + schedule_acquire_leader_lock(1); } template @@ -783,10 +887,13 @@ template void LeaderWatcher::notify_heartbeat() { dout(20) << dendl; + assert(m_threads->timer_lock.is_locked()); assert(m_lock.is_locked()); + assert(!m_timer_op_tracker.empty()); if (!is_leader(m_lock)) { dout(5) << "not leader, canceling" << dendl; + m_timer_op_tracker.finish_op(); return; } @@ -806,8 +913,13 @@ void LeaderWatcher::handle_notify_heartbeat(int r) { Mutex::Locker timer_locker(m_threads->timer_lock); Mutex::Locker locker(m_lock); + assert(!m_timer_op_tracker.empty()); - if (!is_leader(m_lock)) { + m_timer_op_tracker.finish_op(); + if (m_leader_lock->is_shutdown()) { + dout(20) << "canceling due to shutdown" << dendl; + return; + } else if (!is_leader(m_lock)) { return; } @@ -846,7 +958,7 @@ void LeaderWatcher::handle_notify_heartbeat(int r) { } schedule_timer_task("heartbeat", 1, true, - &LeaderWatcher::notify_heartbeat); + &LeaderWatcher::notify_heartbeat, false); } template @@ -859,9 +971,9 @@ void LeaderWatcher::handle_heartbeat(Context *on_notify_ack) { if (is_leader(m_lock)) { dout(5) << "got another leader heartbeat, ignoring" << dendl; } else { - m_acquire_attempts = 0; cancel_timer_task(); - get_locker(); + m_acquire_attempts = 0; + schedule_acquire_leader_lock(1); } } @@ -879,8 +991,7 @@ void LeaderWatcher::handle_lock_acquired(Context *on_notify_ack) { dout(5) << "got another leader lock_acquired, ignoring" << dendl; } else { cancel_timer_task(); - m_acquire_attempts = 0; - get_locker(); + schedule_get_locker(true, 0); } } @@ -898,7 +1009,7 @@ void LeaderWatcher::handle_lock_released(Context *on_notify_ack) { dout(5) << "got another leader lock_released, ignoring" << dendl; } else { cancel_timer_task(); - acquire_leader_lock(true); + schedule_get_locker(true, 0); } } diff --git a/src/tools/rbd_mirror/LeaderWatcher.h b/src/tools/rbd_mirror/LeaderWatcher.h index e2e7cc8a0e9d2..a3a662a8f6405 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.h +++ b/src/tools/rbd_mirror/LeaderWatcher.h @@ -8,6 +8,7 @@ #include #include +#include "common/AsyncOpTracker.h" #include "librbd/ManagedLock.h" #include "librbd/managed_lock/Types.h" #include "librbd/Watcher.h" @@ -50,17 +51,17 @@ private: /** * @verbatim * - * <------------------------------ UNREGISTER_WATCH + * <------------------------------ WAIT_FOR_TASKS * | (init) ^ ^ * v * | - * CREATE_OBJECT * * (error) SHUT_DOWN_LEADER_LOCK + * CREATE_OBJECT * * (error) UNREGISTER_WATCH * | * ^ * v * | - * REGISTER_WATCH * * | (shut_down) - * | | + * REGISTER_WATCH * * SHUT_DOWN_LEADER_LOCK + * | ^ * | (no leader heartbeat and acquire failed) | * | BREAK_LOCK <-------------------------------------\ | - * | | (no leader heartbeat) | | + * | | (no leader heartbeat) | | (shut down) * | | /----------------------------------------\ | | * | | | (lock_released received) | | * | | | /-------------------------------------\ | | @@ -105,7 +106,7 @@ private: } bool is_leader() const { - Mutex::Locker loker(Parent::m_lock); + Mutex::Locker locker(Parent::m_lock); return Parent::is_state_post_acquiring() || Parent::is_state_locked(); } @@ -157,6 +158,24 @@ private: } }; + typedef void (LeaderWatcher::*TimerCallback)(); + + struct C_TimerGate : public Context { + LeaderWatcher *leader_watcher; + + bool leader = false; + TimerCallback timer_callback = nullptr; + + C_TimerGate(LeaderWatcher *leader_watcher) + : leader_watcher(leader_watcher) { + } + + void finish(int r) override { + leader_watcher->m_timer_gate = nullptr; + leader_watcher->execute_timer_task(leader, timer_callback); + } + }; + Threads *m_threads; Listener *m_listener; @@ -170,7 +189,11 @@ private: MirrorStatusWatcher *m_status_watcher = nullptr; Instances *m_instances = nullptr; librbd::managed_lock::Locker m_locker; + + AsyncOpTracker m_timer_op_tracker; Context *m_timer_task = nullptr; + C_TimerGate *m_timer_gate = nullptr; + bufferlist m_heartbeat_ack_bl; bool is_leader(Mutex &m_lock); @@ -178,7 +201,8 @@ private: void cancel_timer_task(); void schedule_timer_task(const std::string &name, int delay_factor, bool leader, - void (LeaderWatcher::*callback)()); + TimerCallback callback, bool shutting_down); + void execute_timer_task(bool leader, TimerCallback timer_callback); void create_leader_object(); void handle_create_leader_object(int r); @@ -192,13 +216,17 @@ private: void unregister_watch(); void handle_unregister_watch(int r); + void wait_for_tasks(); + void handle_wait_for_tasks(); + void break_leader_lock(); void handle_break_leader_lock(int r); + void schedule_get_locker(bool reset_leader, uint32_t delay_factor); void get_locker(); void handle_get_locker(int r, librbd::managed_lock::Locker& locker); - void acquire_leader_lock(bool reset_attempt_counter); + void schedule_acquire_leader_lock(uint32_t delay_factor); void acquire_leader_lock(); void handle_acquire_leader_lock(int r); -- 2.39.5