// Inint
C_SaferCond on_heartbeat_finish;
+ expect_is_leader(mock_managed_lock, false, false);
expect_try_acquire_lock(mock_managed_lock, 0);
expect_init(mock_mirror_status_watcher, 0);
expect_init(mock_instances, 0);
expect_is_leader(mock_managed_lock, false, false);
expect_release_lock(mock_managed_lock, 0);
expect_shut_down(mock_managed_lock, true, 0);
+ expect_is_leader(mock_managed_lock, false, false);
leader_watcher.shut_down();
}
// Inint
C_SaferCond on_heartbeat_finish;
+ expect_is_leader(mock_managed_lock, false, false);
expect_try_acquire_lock(mock_managed_lock, 0);
expect_init(mock_mirror_status_watcher, 0);
expect_init(mock_instances, 0);
// Shutdown
expect_shut_down(mock_managed_lock, false, 0);
+ expect_is_leader(mock_managed_lock, false, false);
leader_watcher.shut_down();
}
// Inint
C_SaferCond on_get_locker_finish;
+ expect_is_leader(mock_managed_lock, false, false);
expect_try_acquire_lock(mock_managed_lock, -EAGAIN);
expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), 0,
&on_get_locker_finish);
// Shutdown
expect_shut_down(mock_managed_lock, false, 0);
+ expect_is_leader(mock_managed_lock, false, false);
leader_watcher.shut_down();
}
// Inint
C_SaferCond on_heartbeat_finish;
+ expect_is_leader(mock_managed_lock, false, false);
expect_try_acquire_lock(mock_managed_lock, 0);
expect_init(mock_mirror_status_watcher, 0);
expect_init(mock_instances, 0);
// Shutdown
expect_shut_down(mock_managed_lock, false, 0);
+ expect_is_leader(mock_managed_lock, false, false);
leader_watcher.shut_down();
}
MockLeaderWatcher leader_watcher(m_threads, m_local_io_ctx, &listener);
// Init
- for (int i = 0; i <= max_acquire_attempts; i++) {
+ expect_is_leader(mock_managed_lock, false, false);
+ for (int i = 0; i < max_acquire_attempts; i++) {
expect_try_acquire_lock(mock_managed_lock, -EAGAIN);
- if (i < max_acquire_attempts) {
- expect_get_locker(mock_managed_lock, locker, 0);
- }
+ expect_get_locker(mock_managed_lock, locker, 0);
}
C_SaferCond on_break;
expect_break_lock(mock_managed_lock, locker, 0, &on_break);
expect_is_leader(mock_managed_lock, false, false);
expect_release_lock(mock_managed_lock, 0);
expect_shut_down(mock_managed_lock, true, 0);
+ expect_is_leader(mock_managed_lock, false, false);
leader_watcher.shut_down();
}
Context *on_finish = nullptr;
{
+ Mutex::Locker timer_locker(m_threads->timer_lock);
Mutex::Locker locker(m_lock);
if (r < 0) {
derr << "error registering leader watcher for " << m_oid << " object: "
<< cpp_strerror(r) << dendl;
} else {
- acquire_leader_lock(true);
+ schedule_acquire_leader_lock(0);
}
std::swap(on_finish, m_on_finish);
void LeaderWatcher<I>::handle_unregister_watch(int r) {
dout(20) << "r=" << r << dendl;
- Context *on_finish = nullptr;
- {
- Mutex::Locker locker(m_lock);
+ if (r < 0) {
+ derr << "error unregistering leader watcher for " << m_oid << " object: "
+ << cpp_strerror(r) << dendl;
+ }
+ wait_for_tasks();
+}
- if (r < 0) {
- derr << "error unregistering leader watcher for " << m_oid << " object: "
- << cpp_strerror(r) << dendl;
- }
+template <typename I>
+void LeaderWatcher<I>::wait_for_tasks() {
+ dout(20) << dendl;
- assert(m_on_shut_down_finish != nullptr);
- std::swap(on_finish, m_on_shut_down_finish);
- }
- on_finish->complete(0);
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+ schedule_timer_task("wait for tasks", 0, false,
+ &LeaderWatcher<I>::handle_wait_for_tasks, true);
+}
+
+template <typename I>
+void LeaderWatcher<I>::handle_wait_for_tasks() {
+ dout(20) << dendl;
+
+ assert(m_threads->timer_lock.is_locked());
+ assert(m_lock.is_locked());
+ assert(m_on_shut_down_finish != nullptr);
+
+ assert(!m_timer_op_tracker.empty());
+ m_timer_op_tracker.finish_op();
+
+ auto ctx = new FunctionContext([this](int r) {
+ Context *on_finish;
+ {
+ // ensure lock isn't held when completing shut down
+ Mutex::Locker locker(m_lock);
+ assert(m_on_shut_down_finish != nullptr);
+ on_finish = m_on_shut_down_finish;
+ }
+ on_finish->complete(0);
+ });
+ m_work_queue->queue(ctx, 0);
}
template <typename I>
template <typename I>
void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
int delay_factor, bool leader,
- void (LeaderWatcher<I>::*cb)()) {
+ TimerCallback timer_callback,
+ bool shutting_down) {
assert(m_threads->timer_lock.is_locked());
assert(m_lock.is_locked());
- if (m_on_shut_down_finish != nullptr) {
+ if (!shutting_down && m_on_shut_down_finish != nullptr) {
return;
}
cancel_timer_task();
m_timer_task = new FunctionContext(
- [this, cb, leader](int r) {
+ [this, leader, timer_callback](int r) {
assert(m_threads->timer_lock.is_locked());
m_timer_task = nullptr;
- Mutex::Locker locker(m_lock);
- if (is_leader(m_lock) != leader) {
+
+ if (m_timer_op_tracker.empty()) {
+ Mutex::Locker locker(m_lock);
+ execute_timer_task(leader, timer_callback);
return;
}
- (this->*cb)();
+
+ // old timer task is still running -- do not start next
+ // task until the previous task completes
+ if (m_timer_gate == nullptr) {
+ m_timer_gate = new C_TimerGate(this);
+ m_timer_op_tracker.wait_for_ops(m_timer_gate);
+ }
+ m_timer_gate->leader = leader;
+ m_timer_gate->timer_callback = timer_callback;
});
int after = delay_factor *
m_threads->timer->add_event_after(after, m_timer_task);
}
+template <typename I>
+void LeaderWatcher<I>::execute_timer_task(bool leader,
+ TimerCallback timer_callback) {
+ dout(20) << dendl;
+
+ assert(m_threads->timer_lock.is_locked());
+ assert(m_lock.is_locked());
+ assert(m_timer_op_tracker.empty());
+
+ if (is_leader(m_lock) != leader) {
+ return;
+ }
+
+ m_timer_op_tracker.start_op();
+ (this->*timer_callback)();
+}
+
template <typename I>
void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
Context *on_finish) {
void LeaderWatcher<I>::break_leader_lock() {
dout(20) << dendl;
+ assert(m_threads->timer_lock.is_locked());
assert(m_lock.is_locked());
+ assert(!m_timer_op_tracker.empty());
if (m_locker.cookie.empty()) {
- acquire_leader_lock(true);
+ get_locker();
return;
}
Mutex::Locker timer_locker(m_threads->timer_lock);
Mutex::Locker locker(m_lock);
+ assert(!m_timer_op_tracker.empty());
if (m_leader_lock->is_shutdown()) {
dout(20) << "canceling due to shutdown" << dendl;
+ m_timer_op_tracker.finish_op();
return;
}
if (r < 0 && r != -ENOENT) {
derr << "error beaking leader lock: " << cpp_strerror(r) << dendl;
-
- schedule_timer_task("get locker", 1, false, &LeaderWatcher<I>::get_locker);
+ schedule_acquire_leader_lock(1);
+ m_timer_op_tracker.finish_op();
return;
}
- acquire_leader_lock(true);
+ m_locker = {};
+ m_acquire_attempts = 0;
+ acquire_leader_lock();
+}
+
+template <typename I>
+void LeaderWatcher<I>::schedule_get_locker(bool reset_leader,
+ uint32_t delay_factor) {
+ dout(20) << dendl;
+
+ assert(m_threads->timer_lock.is_locked());
+ assert(m_lock.is_locked());
+
+ if (reset_leader) {
+ m_locker = {};
+ m_acquire_attempts = 0;
+ }
+
+ schedule_timer_task("get locker", delay_factor, false,
+ &LeaderWatcher<I>::get_locker, false);
}
template <typename I>
void LeaderWatcher<I>::get_locker() {
dout(20) << dendl;
+ assert(m_threads->timer_lock.is_locked());
assert(m_lock.is_locked());
+ assert(!m_timer_op_tracker.empty());
C_GetLocker *get_locker_ctx = new C_GetLocker(this);
Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
Mutex::Locker timer_locker(m_threads->timer_lock);
Mutex::Locker mutex_locker(m_lock);
+ assert(!m_timer_op_tracker.empty());
if (m_leader_lock->is_shutdown()) {
dout(20) << "canceling due to shutdown" << dendl;
+ m_timer_op_tracker.finish_op();
return;
}
if (is_leader(m_lock)) {
m_locker = {};
- } else {
- if (r == -ENOENT) {
- acquire_leader_lock(true);
- } else {
- if (r < 0) {
- derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
- } else {
- m_locker = locker;
- }
+ m_timer_op_tracker.finish_op();
+ return;
+ }
- schedule_timer_task("acquire leader lock",
- m_cct->_conf->rbd_mirror_leader_max_missed_heartbeats,
- false, &LeaderWatcher<I>::acquire_leader_lock);
+ if (r == -ENOENT) {
+ m_locker = {};
+ m_acquire_attempts = 0;
+ acquire_leader_lock();
+ return;
+ } else if (r < 0) {
+ derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
+ schedule_get_locker(true, 1);
+ m_timer_op_tracker.finish_op();
+ return;
+ }
+
+ if (m_locker != locker) {
+ m_locker = locker;
+ if (m_acquire_attempts > 1) {
+ dout(10) << "new lock owner detected -- resetting heartbeat counter"
+ << dendl;
+ m_acquire_attempts = 0;
}
}
-}
-template <typename I>
-void LeaderWatcher<I>::acquire_leader_lock() {
- return acquire_leader_lock(false);
+ if (m_acquire_attempts >=
+ m_cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break) {
+ dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
+ << "failed attempts to acquire" << dendl;
+ break_leader_lock();
+ } else {
+ schedule_acquire_leader_lock(1);
+ m_timer_op_tracker.finish_op();
+ }
}
template <typename I>
-void LeaderWatcher<I>::acquire_leader_lock(bool reset_attempt_counter) {
- dout(20) << "reset_attempt_counter=" << reset_attempt_counter << dendl;
+void LeaderWatcher<I>::schedule_acquire_leader_lock(uint32_t delay_factor) {
+ dout(20) << dendl;
+ assert(m_threads->timer_lock.is_locked());
assert(m_lock.is_locked());
- if (reset_attempt_counter) {
- m_acquire_attempts = 0;
- }
+ schedule_timer_task("acquire leader lock",
+ delay_factor *
+ m_cct->_conf->rbd_mirror_leader_max_missed_heartbeats,
+ false, &LeaderWatcher<I>::acquire_leader_lock, false);
+}
+
+template <typename I>
+void LeaderWatcher<I>::acquire_leader_lock() {
+ assert(m_threads->timer_lock.is_locked());
+ assert(m_lock.is_locked());
+ assert(!m_timer_op_tracker.empty());
+ ++m_acquire_attempts;
dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl;
Context *ctx = create_async_context_callback(
m_work_queue, create_context_callback<
LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
-
m_leader_lock->try_acquire_lock(ctx);
}
void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
dout(20) << "r=" << r << dendl;
+ Mutex::Locker timer_locker(m_threads->timer_lock);
Mutex::Locker locker(m_lock);
+ assert(!m_timer_op_tracker.empty());
if (m_leader_lock->is_shutdown()) {
dout(20) << "canceling due to shutdown" << dendl;
+ m_timer_op_tracker.finish_op();
return;
}
} else {
derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
}
- if (++m_acquire_attempts >
- m_cct->_conf->rbd_mirror_leader_max_acquire_attempts_before_break) {
- dout(0) << "breaking leader lock after failed attemts to acquire"
- << dendl;
- break_leader_lock();
- } else {
- get_locker();
- }
+
+ get_locker();
return;
}
+ m_locker = {};
m_acquire_attempts = 0;
if (m_ret_val) {
dout(5) << "releasing due to error on notify" << dendl;
release_leader_lock();
+ m_timer_op_tracker.finish_op();
return;
}
return;
}
- schedule_timer_task("get locker", 1, false, &LeaderWatcher<I>::get_locker);
+ schedule_acquire_leader_lock(1);
}
template <typename I>
void LeaderWatcher<I>::notify_heartbeat() {
dout(20) << dendl;
+ assert(m_threads->timer_lock.is_locked());
assert(m_lock.is_locked());
+ assert(!m_timer_op_tracker.empty());
if (!is_leader(m_lock)) {
dout(5) << "not leader, canceling" << dendl;
+ m_timer_op_tracker.finish_op();
return;
}
Mutex::Locker timer_locker(m_threads->timer_lock);
Mutex::Locker locker(m_lock);
+ assert(!m_timer_op_tracker.empty());
- if (!is_leader(m_lock)) {
+ m_timer_op_tracker.finish_op();
+ if (m_leader_lock->is_shutdown()) {
+ dout(20) << "canceling due to shutdown" << dendl;
+ return;
+ } else if (!is_leader(m_lock)) {
return;
}
}
schedule_timer_task("heartbeat", 1, true,
- &LeaderWatcher<I>::notify_heartbeat);
+ &LeaderWatcher<I>::notify_heartbeat, false);
}
template <typename I>
if (is_leader(m_lock)) {
dout(5) << "got another leader heartbeat, ignoring" << dendl;
} else {
- m_acquire_attempts = 0;
cancel_timer_task();
- get_locker();
+ m_acquire_attempts = 0;
+ schedule_acquire_leader_lock(1);
}
}
dout(5) << "got another leader lock_acquired, ignoring" << dendl;
} else {
cancel_timer_task();
- m_acquire_attempts = 0;
- get_locker();
+ schedule_get_locker(true, 0);
}
}
dout(5) << "got another leader lock_released, ignoring" << dendl;
} else {
cancel_timer_task();
- acquire_leader_lock(true);
+ schedule_get_locker(true, 0);
}
}
#include <memory>
#include <string>
+#include "common/AsyncOpTracker.h"
#include "librbd/ManagedLock.h"
#include "librbd/managed_lock/Types.h"
#include "librbd/Watcher.h"
/**
* @verbatim
*
- * <uninitialized> <------------------------------ UNREGISTER_WATCH
+ * <uninitialized> <------------------------------ WAIT_FOR_TASKS
* | (init) ^ ^
* v * |
- * CREATE_OBJECT * * (error) SHUT_DOWN_LEADER_LOCK
+ * CREATE_OBJECT * * (error) UNREGISTER_WATCH
* | * ^
* v * |
- * REGISTER_WATCH * * | (shut_down)
- * | |
+ * REGISTER_WATCH * * SHUT_DOWN_LEADER_LOCK
+ * | ^
* | (no leader heartbeat and acquire failed) |
* | BREAK_LOCK <-------------------------------------\ |
- * | | (no leader heartbeat) | |
+ * | | (no leader heartbeat) | | (shut down)
* | | /----------------------------------------\ | |
* | | | (lock_released received) | |
* | | | /-------------------------------------\ | |
}
bool is_leader() const {
- Mutex::Locker loker(Parent::m_lock);
+ Mutex::Locker locker(Parent::m_lock);
return Parent::is_state_post_acquiring() || Parent::is_state_locked();
}
}
};
+ typedef void (LeaderWatcher<ImageCtxT>::*TimerCallback)();
+
+ struct C_TimerGate : public Context {
+ LeaderWatcher *leader_watcher;
+
+ bool leader = false;
+ TimerCallback timer_callback = nullptr;
+
+ C_TimerGate(LeaderWatcher *leader_watcher)
+ : leader_watcher(leader_watcher) {
+ }
+
+ void finish(int r) override {
+ leader_watcher->m_timer_gate = nullptr;
+ leader_watcher->execute_timer_task(leader, timer_callback);
+ }
+ };
+
Threads *m_threads;
Listener *m_listener;
MirrorStatusWatcher<ImageCtxT> *m_status_watcher = nullptr;
Instances<ImageCtxT> *m_instances = nullptr;
librbd::managed_lock::Locker m_locker;
+
+ AsyncOpTracker m_timer_op_tracker;
Context *m_timer_task = nullptr;
+ C_TimerGate *m_timer_gate = nullptr;
+
bufferlist m_heartbeat_ack_bl;
bool is_leader(Mutex &m_lock);
void cancel_timer_task();
void schedule_timer_task(const std::string &name,
int delay_factor, bool leader,
- void (LeaderWatcher<ImageCtxT>::*callback)());
+ TimerCallback callback, bool shutting_down);
+ void execute_timer_task(bool leader, TimerCallback timer_callback);
void create_leader_object();
void handle_create_leader_object(int r);
void unregister_watch();
void handle_unregister_watch(int r);
+ void wait_for_tasks();
+ void handle_wait_for_tasks();
+
void break_leader_lock();
void handle_break_leader_lock(int r);
+ void schedule_get_locker(bool reset_leader, uint32_t delay_factor);
void get_locker();
void handle_get_locker(int r, librbd::managed_lock::Locker& locker);
- void acquire_leader_lock(bool reset_attempt_counter);
+ void schedule_acquire_leader_lock(uint32_t delay_factor);
void acquire_leader_lock();
void handle_acquire_leader_lock(int r);