#include "cls/lock/cls_lock_client.h"
#include "cls/lock/cls_lock_types.h"
#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequestWQ.h"
#include "librbd/internal.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageWatcher.h"
class TestImageWatcher : public TestFixture {
public:
- TestImageWatcher() : m_watch_ctx(NULL), m_aio_completion_restarts(0),
- m_expected_aio_restarts(0),
- m_callback_lock("m_callback_lock")
+ TestImageWatcher() : m_watch_ctx(NULL), m_callback_lock("m_callback_lock")
{
}
DECODE_FINISH(iter);
NotifyOp notify_op = static_cast<NotifyOp>(op);
+ /*
std::cout << "NOTIFY: " << notify_op << ", " << notify_id
<< ", " << cookie << ", " << notifier_id << std::endl;
+ */
Mutex::Locker l(m_parent.m_callback_lock);
m_parent.m_notify_payloads[notify_op] = payload;
return (m_notifies.size() == m_notify_acks.size());
}
-
- librbd::AioCompletion *create_aio_completion(librbd::ImageCtx &ictx) {
- librbd::AioCompletion *aio_completion = new librbd::AioCompletion();
- aio_completion->complete_cb = &handle_aio_completion;
- aio_completion->complete_arg = this;
-
- aio_completion->init_time(&ictx, librbd::AIO_TYPE_NONE);
- m_aio_completions.insert(aio_completion);
- return aio_completion;
- }
-
- static void handle_aio_completion(void *arg1, void *arg2) {
- TestImageWatcher *test_image_watcher =
- reinterpret_cast<TestImageWatcher *>(arg2);
- assert(test_image_watcher->m_callback_lock.is_locked());
- test_image_watcher->m_callback_cond.Signal();
- }
-
- int handle_restart_aio(librbd::ImageCtx *ictx,
- librbd::AioCompletion *aio_completion) {
- assert(ictx->owner_lock.is_locked());
- Mutex::Locker callback_locker(m_callback_lock);
- ++m_aio_completion_restarts;
-
- if (!ictx->image_watcher->is_lock_owner() &&
- (m_expected_aio_restarts == 0 ||
- m_aio_completion_restarts < m_expected_aio_restarts)) {
- ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
- aio_completion);
- } else {
- {
- Mutex::Locker completion_locker(aio_completion->lock);
- aio_completion->complete(ictx->cct);
- }
-
- m_aio_completions.erase(aio_completion);
- aio_completion->release();
- }
-
- m_callback_cond.Signal();
- return 0;
- }
-
- bool wait_for_aio_completions(librbd::ImageCtx &ictx) {
- Mutex::Locker l(m_callback_lock);
- int r = 0;
- while (!m_aio_completions.empty() &&
- (m_expected_aio_restarts == 0 ||
- m_aio_completion_restarts < m_expected_aio_restarts)) {
- r = m_callback_cond.WaitInterval(ictx.cct, m_callback_lock,
- utime_t(10, 0));
- if (r != 0) {
- break;
- }
- }
- return (r == 0);
- }
-
bufferlist create_response_message(int r) {
bufferlist bl;
::encode(ResponseMessage(r), bl);
AsyncRequestId m_async_request_id;
- std::set<librbd::AioCompletion *> m_aio_completions;
- uint32_t m_aio_completion_restarts;
- uint32_t m_expected_aio_restarts;
-
Mutex m_callback_lock;
Cond m_callback_cond;
ictx->snap_id = CEPH_NOSNAP;
}
+TEST_F(TestImageWatcher, WritesSuspended) {
+ REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+ ASSERT_TRUE(ictx->aio_work_queue->writes_suspended());
+}
+
TEST_F(TestImageWatcher, TryLock) {
REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
m_notify_acks = {{NOTIFY_OP_REQUEST_LOCK, create_response_message(0)}};
+ ASSERT_TRUE(ictx->aio_work_queue->writes_suspended());
{
- RWLock::WLocker l(ictx->owner_lock);
- ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
- create_aio_completion(*ictx));
- ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
- create_aio_completion(*ictx));
+ RWLock::RLocker owner_locker(ictx->owner_lock);
+ ictx->image_watcher->request_lock();
}
ASSERT_TRUE(wait_for_notifies(*ictx));
ASSERT_EQ(0, unlock_image());
- m_notifies.clear();
- m_notify_acks = {{NOTIFY_OP_RELEASED_LOCK,{}}, {NOTIFY_OP_ACQUIRED_LOCK,{}}};
+ {
+ Mutex::Locker l(m_callback_lock);
+ m_notifies.clear();
+ m_notify_acks = {{NOTIFY_OP_RELEASED_LOCK,{}},
+ {NOTIFY_OP_ACQUIRED_LOCK,{}}};
+ }
bufferlist bl;
{
expected_notify_ops += NOTIFY_OP_RELEASED_LOCK, NOTIFY_OP_ACQUIRED_LOCK;
ASSERT_EQ(expected_notify_ops, m_notifies);
- ASSERT_TRUE(wait_for_aio_completions(*ictx));
+ ASSERT_FALSE(ictx->aio_work_queue->writes_suspended());
}
TEST_F(TestImageWatcher, RequestLockTimedOut) {
m_notify_acks = {{NOTIFY_OP_REQUEST_LOCK, {}}};
- m_expected_aio_restarts = 1;
+ ASSERT_TRUE(ictx->aio_work_queue->writes_suspended());
{
- RWLock::WLocker l(ictx->owner_lock);
- ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
- create_aio_completion(*ictx));
+ RWLock::RLocker owner_locker(ictx->owner_lock);
+ ictx->image_watcher->request_lock();
}
ASSERT_TRUE(wait_for_notifies(*ictx));
expected_notify_ops += NOTIFY_OP_REQUEST_LOCK;
ASSERT_EQ(expected_notify_ops, m_notifies);
- ASSERT_TRUE(wait_for_aio_completions(*ictx));
+ // should resend when empty ack returned
+ {
+ Mutex::Locker l(m_callback_lock);
+ m_notifies.clear();
+ }
+ ASSERT_TRUE(wait_for_notifies(*ictx));
+ ASSERT_TRUE(ictx->aio_work_queue->writes_suspended());
+
+ {
+ Mutex::Locker l(m_callback_lock);
+ ASSERT_EQ(0, unlock_image());
+ m_notifies.clear();
+ m_notify_acks = {{NOTIFY_OP_ACQUIRED_LOCK, bufferlist()}};
+ }
+
+ ASSERT_TRUE(wait_for_notifies(*ictx));
+ ASSERT_FALSE(ictx->aio_work_queue->writes_suspended());
}
TEST_F(TestImageWatcher, RequestLockIgnored) {
stringify(orig_notify_timeout));
} BOOST_SCOPE_EXIT_END;
+ ASSERT_TRUE(ictx->aio_work_queue->writes_suspended());
{
- RWLock::WLocker l(ictx->owner_lock);
- ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
- create_aio_completion(*ictx));
+ RWLock::RLocker owner_locker(ictx->owner_lock);
+ ictx->image_watcher->request_lock();
}
ASSERT_TRUE(wait_for_notifies(*ictx));
ASSERT_EQ(expected_notify_ops, m_notifies);
// after the request times out -- it will be resent
+ {
+ Mutex::Locker l(m_callback_lock);
+ m_notifies.clear();
+ }
ASSERT_TRUE(wait_for_notifies(*ictx));
ASSERT_EQ(expected_notify_ops, m_notifies);
- ASSERT_EQ(0, unlock_image());
- ASSERT_TRUE(wait_for_aio_completions(*ictx));
+ {
+ Mutex::Locker l(m_callback_lock);
+ ASSERT_EQ(0, unlock_image());
+ m_notifies.clear();
+ m_notify_acks = {{NOTIFY_OP_ACQUIRED_LOCK, bufferlist()}};
+ }
+
+ ASSERT_TRUE(wait_for_notifies(*ictx));
+ ASSERT_FALSE(ictx->aio_work_queue->writes_suspended());
}
TEST_F(TestImageWatcher, RequestLockTryLockRace) {
m_notify_acks = {{NOTIFY_OP_REQUEST_LOCK, create_response_message(0)}};
- m_expected_aio_restarts = 1;
{
- RWLock::WLocker l(ictx->owner_lock);
- ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
- create_aio_completion(*ictx));
+ RWLock::RLocker owner_locker(ictx->owner_lock);
+ ictx->image_watcher->flag_aio_ops_pending();
+ ictx->image_watcher->request_lock();
}
ASSERT_TRUE(wait_for_notifies(*ictx));
expected_notify_ops += NOTIFY_OP_REQUEST_LOCK;
ASSERT_EQ(expected_notify_ops, m_notifies);
- m_notifies.clear();
- m_notify_acks = {{NOTIFY_OP_RELEASED_LOCK, {}}};
+ {
+ Mutex::Locker l(m_callback_lock);
+ m_notifies.clear();
+ m_notify_acks = {{NOTIFY_OP_RELEASED_LOCK, {}}};
+ }
bufferlist bl;
{
ENCODE_FINISH(bl);
}
ASSERT_EQ(0, m_ioctx.notify2(ictx->header_oid, bl, 5000, NULL));
- ASSERT_TRUE(wait_for_aio_completions(*ictx));
- RWLock::RLocker l(ictx->owner_lock);
- ASSERT_FALSE(ictx->image_watcher->is_lock_owner());
-}
-TEST_F(TestImageWatcher, RequestLockPreTryLockFailed) {
- REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+ // after losing race -- it will re-request
+ ASSERT_TRUE(wait_for_notifies(*ictx));
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
- ASSERT_EQ(0, lock_image(*ictx, LOCK_SHARED, "manually 1234"));
+ {
+ RWLock::RLocker owner_locker(ictx->owner_lock);
+ ASSERT_FALSE(ictx->image_watcher->is_lock_owner());
+ }
- m_expected_aio_restarts = 1;
{
- RWLock::WLocker l(ictx->owner_lock);
- ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
- create_aio_completion(*ictx));
+ Mutex::Locker l(m_callback_lock);
+ ASSERT_EQ(0, unlock_image());
+ m_notifies.clear();
+ m_notify_acks = {
+ {NOTIFY_OP_RELEASED_LOCK, bufferlist()},
+ {NOTIFY_OP_ACQUIRED_LOCK, bufferlist()}};
}
- ASSERT_TRUE(wait_for_aio_completions(*ictx));
+
+ ASSERT_EQ(0, m_ioctx.notify2(ictx->header_oid, bl, 5000, NULL));
+ ASSERT_TRUE(wait_for_notifies(*ictx));
+ ASSERT_FALSE(ictx->aio_work_queue->writes_suspended());
}
-TEST_F(TestImageWatcher, RequestLockPostTryLockFailed) {
+TEST_F(TestImageWatcher, RequestLockTryLockFailed) {
REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, register_image_watch(*ictx));
- ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
- "auto " + stringify(m_watch_ctx->get_handle())));
+ ASSERT_EQ(0, lock_image(*ictx, LOCK_SHARED, "manually 1234"));
- m_notify_acks = {{NOTIFY_OP_REQUEST_LOCK, create_response_message(0)}};
+ m_notify_acks = {{NOTIFY_OP_REQUEST_LOCK, {}}};
- m_expected_aio_restarts = 1;
+ ASSERT_TRUE(ictx->aio_work_queue->writes_suspended());
{
- RWLock::WLocker l(ictx->owner_lock);
- ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
- create_aio_completion(*ictx));
+ RWLock::RLocker owner_locker(ictx->owner_lock);
+ ictx->image_watcher->request_lock();
}
ASSERT_TRUE(wait_for_notifies(*ictx));
expected_notify_ops += NOTIFY_OP_REQUEST_LOCK;
ASSERT_EQ(expected_notify_ops, m_notifies);
- ASSERT_EQ(0, unlock_image());
- ASSERT_EQ(0, lock_image(*ictx, LOCK_SHARED, "manually 1234"));
-
- m_notifies.clear();
- m_notify_acks = {{NOTIFY_OP_RELEASED_LOCK, bufferlist()}};
+ // should resend when error encountered
+ {
+ Mutex::Locker l(m_callback_lock);
+ m_notifies.clear();
+ }
+ ASSERT_TRUE(wait_for_notifies(*ictx));
+ ASSERT_TRUE(ictx->aio_work_queue->writes_suspended());
- bufferlist bl;
{
- ENCODE_START(1, 1, bl);
- ::encode(NOTIFY_OP_RELEASED_LOCK, bl);
- ENCODE_FINISH(bl);
+ Mutex::Locker l(m_callback_lock);
+ ASSERT_EQ(0, unlock_image());
+ m_notifies.clear();
+ m_notify_acks = {{NOTIFY_OP_ACQUIRED_LOCK, bufferlist()}};
}
- ASSERT_EQ(0, m_ioctx.notify2(ictx->header_oid, bl, 5000, NULL));
- ASSERT_TRUE(wait_for_aio_completions(*ictx));
+
+ ASSERT_TRUE(wait_for_notifies(*ictx));
+ ASSERT_FALSE(ictx->aio_work_queue->writes_suspended());
}
TEST_F(TestImageWatcher, NotifyHeaderUpdate) {