#include "librbd/internal.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageWatcher.h"
+#include "librbd/WatchNotifyTypes.h"
#include "test/librados/test.h"
#include "gtest/gtest.h"
#include <boost/assign/list_of.hpp>
#include <boost/assign/std/set.hpp>
#include <boost/assign/std/map.hpp>
#include <boost/bind.hpp>
+#include <boost/thread/thread.hpp>
#include <iostream>
#include <map>
#include <set>
using namespace ceph;
using namespace boost::assign;
+using namespace librbd::WatchNotify;
void register_test_image_watcher() {
}
-enum NotifyOp {
- NOTIFY_OP_ACQUIRED_LOCK = 0,
- NOTIFY_OP_RELEASED_LOCK = 1,
- NOTIFY_OP_REQUEST_LOCK = 2,
- NOTIFY_OP_HEADER_UPDATE = 3
-};
-
-std::ostream& operator<<(std::ostream& os, NotifyOp op) {
- switch (op) {
- case NOTIFY_OP_ACQUIRED_LOCK:
- os << "acquired lock";
- break;
- case NOTIFY_OP_RELEASED_LOCK:
- os << "released lock";
- break;
- case NOTIFY_OP_REQUEST_LOCK:
- os << "request lock";
- break;
- case NOTIFY_OP_HEADER_UPDATE:
- os << "header update";
- break;
- default:
- os << "unknown (" << static_cast<uint32_t>(op) << ")";
- break;
- }
- return os;
-}
-
class TestImageWatcher : public TestFixture {
public:
iter.copy_all(payload);
DECODE_FINISH(iter);
- std::cout << "NOTIFY: " << static_cast<NotifyOp>(op) << ", " << notify_id
+ NotifyOp notify_op = static_cast<NotifyOp>(op);
+ std::cout << "NOTIFY: " << notify_op << ", " << notify_id
<< ", " << cookie << ", " << notifier_id << std::endl;
Mutex::Locker l(m_parent.m_callback_lock);
- NotifyOp notify_op = static_cast<NotifyOp>(op);
+ m_parent.m_notify_payloads[notify_op] = payload;
bufferlist reply;
if (m_parent.m_notify_acks.count(notify_op) > 0) {
return (r == 0);
}
+ bufferlist create_response_message(int r) {
+ bufferlist bl;
+ ::encode(ResponseMessage(r), bl);
+ return bl;
+ }
+
+ bool extract_async_request_id(NotifyOp op, AsyncRequestId *id) {
+ if (m_notify_payloads.count(op) == 0) {
+ return false;
+ }
+
+ bufferlist payload = m_notify_payloads[op];
+ bufferlist::iterator iter = payload.begin();
+
+ switch (op) {
+ case NOTIFY_OP_FLATTEN:
+ {
+ FlattenPayload payload;
+ payload.decode(2, iter);
+ *id = payload.async_request_id;
+ }
+ return true;
+ case NOTIFY_OP_RESIZE:
+ {
+ ResizePayload payload;
+ payload.decode(2, iter);
+ *id = payload.async_request_id;
+ }
+ return true;
+ default:
+ break;
+ }
+ return false;
+ }
+
+ int notify_async_progress(librbd::ImageCtx *ictx, const AsyncRequestId &id,
+ uint64_t offset, uint64_t total) {
+ bufferlist bl;
+ ::encode(NotifyMessage(AsyncProgressPayload(id, offset, total)), bl);
+ return m_ioctx.notify2(ictx->header_oid, bl, 5000, NULL);
+ }
+
+ int notify_async_complete(librbd::ImageCtx *ictx, const AsyncRequestId &id,
+ int r) {
+ bufferlist bl;
+ ::encode(NotifyMessage(AsyncCompletePayload(id, r)), bl);
+ return m_ioctx.notify2(ictx->header_oid, bl, 5000, NULL);
+ }
+
typedef std::map<NotifyOp, bufferlist> NotifyOpPayloads;
typedef std::set<NotifyOp> NotifyOps;
WatchCtx *m_watch_ctx;
NotifyOps m_notifies;
+ NotifyOpPayloads m_notify_payloads;
NotifyOpPayloads m_notify_acks;
+ AsyncRequestId m_async_request_id;
+
std::set<librbd::AioCompletion *> m_aio_completions;
uint32_t m_aio_completion_restarts;
uint32_t m_expected_aio_restarts;
};
+struct ProgressContext : public librbd::ProgressContext {
+ Mutex mutex;
+ Cond cond;
+ bool received;
+ uint64_t offset;
+ uint64_t total;
+
+ ProgressContext() : mutex("ProgressContext::mutex"), received(false),
+ offset(0), total(0) {}
+
+ virtual int update_progress(uint64_t offset_, uint64_t total_) {
+ Mutex::Locker l(mutex);
+ offset = offset_;
+ total = total_;
+ received = true;
+ cond.Signal();
+ return 0;
+ }
+
+ bool wait(librbd::ImageCtx *ictx, uint64_t offset_, uint64_t total_) {
+ Mutex::Locker l(mutex);
+ while (!received) {
+ int r = cond.WaitInterval(ictx->cct, mutex, utime_t(10, 0));
+ if (r != 0) {
+ break;
+ }
+ }
+ return (received && offset == offset_ && total == total_);
+ }
+};
+
+struct FlattenTask {
+ librbd::ImageCtx *ictx;
+ ProgressContext *progress_context;
+ int result;
+
+ FlattenTask(librbd::ImageCtx *ictx_, ProgressContext *ctx)
+ : ictx(ictx_), progress_context(ctx), result(0) {}
+
+ void operator()() {
+ RWLock::RLocker l(ictx->owner_lock);
+ result = ictx->image_watcher->notify_flatten(0, *progress_context);
+ }
+};
+
+struct ResizeTask {
+ librbd::ImageCtx *ictx;
+ ProgressContext *progress_context;
+ int result;
+
+ ResizeTask(librbd::ImageCtx *ictx_, ProgressContext *ctx)
+ : ictx(ictx_), progress_context(ctx), result(0) {}
+
+ void operator()() {
+ RWLock::RLocker l(ictx->owner_lock);
+ result = ictx->image_watcher->notify_resize(0, 0, *progress_context);
+ }
+};
+
TEST_F(TestImageWatcher, IsLockSupported) {
REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
"auto " + stringify(m_watch_ctx->get_handle())));
- bufferlist bl;
- {
- ENCODE_START(1, 1, bl);
- ::encode(0, bl);
- ENCODE_FINISH(bl);
- }
-
m_notify_acks = boost::assign::list_of(
- std::make_pair(NOTIFY_OP_REQUEST_LOCK, bl));
+ std::make_pair(NOTIFY_OP_REQUEST_LOCK, create_response_message(0)));
{
RWLock::WLocker l(ictx->owner_lock);
std::make_pair(NOTIFY_OP_RELEASED_LOCK, bufferlist()))(
std::make_pair(NOTIFY_OP_ACQUIRED_LOCK, bufferlist()));
- bl.clear();
+ bufferlist bl;
{
ENCODE_START(1, 1, bl);
::encode(NOTIFY_OP_RELEASED_LOCK, bl);
ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
"auto " + stringify(m_watch_ctx->get_handle())));
- bufferlist bl;
- {
- ENCODE_START(1, 1, bl);
- ::encode(0, bl);
- ENCODE_FINISH(bl);
- }
-
m_notify_acks = boost::assign::list_of(
- std::make_pair(NOTIFY_OP_REQUEST_LOCK, bl));
+ std::make_pair(NOTIFY_OP_REQUEST_LOCK, create_response_message(0)));
{
RWLock::WLocker l(ictx->owner_lock);
m_notify_acks = boost::assign::list_of(
std::make_pair(NOTIFY_OP_RELEASED_LOCK, bufferlist()));
- bl.clear();
+ bufferlist bl;
{
ENCODE_START(1, 1, bl);
::encode(NOTIFY_OP_RELEASED_LOCK, bl);
ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
"auto " + stringify(m_watch_ctx->get_handle())));
- bufferlist bl;
- {
- ENCODE_START(1, 1, bl);
- ::encode(0, bl);
- ENCODE_FINISH(bl);
- }
-
m_notify_acks = boost::assign::list_of(
- std::make_pair(NOTIFY_OP_REQUEST_LOCK, bl));
+ std::make_pair(NOTIFY_OP_REQUEST_LOCK, create_response_message(0)));
m_expected_aio_restarts = 1;
{
m_notify_acks = boost::assign::list_of(
std::make_pair(NOTIFY_OP_RELEASED_LOCK, bufferlist()));
- bl.clear();
+ bufferlist bl;
{
ENCODE_START(1, 1, bl);
::encode(NOTIFY_OP_RELEASED_LOCK, bl);
expected_notify_ops += NOTIFY_OP_HEADER_UPDATE;
ASSERT_EQ(expected_notify_ops, m_notifies);
}
+
+TEST_F(TestImageWatcher, NotifyFlatten) {
+ REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ ASSERT_EQ(0, register_image_watch(*ictx));
+ ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
+ "auto " + stringify(m_watch_ctx->get_handle())));
+
+ m_notify_acks = boost::assign::list_of(
+ std::make_pair(NOTIFY_OP_FLATTEN, create_response_message(0)));
+
+ ProgressContext progress_context;
+ FlattenTask flatten_task(ictx, &progress_context);
+ boost::thread thread(boost::ref(flatten_task));
+
+ ASSERT_TRUE(wait_for_notifies(*ictx));
+
+ NotifyOps expected_notify_ops;
+ expected_notify_ops += NOTIFY_OP_FLATTEN;
+ ASSERT_EQ(expected_notify_ops, m_notifies);
+
+ AsyncRequestId async_request_id;
+ ASSERT_TRUE(extract_async_request_id(NOTIFY_OP_FLATTEN, &async_request_id));
+
+ ASSERT_EQ(0, notify_async_progress(ictx, async_request_id, 10, 20));
+ ASSERT_TRUE(progress_context.wait(ictx, 10, 20));
+
+ ASSERT_EQ(0, notify_async_complete(ictx, async_request_id, 0));
+
+ ASSERT_TRUE(thread.timed_join(boost::posix_time::seconds(10)));
+ ASSERT_EQ(0, flatten_task.result);
+}
+
+TEST_F(TestImageWatcher, NotifyResize) {
+ REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ ASSERT_EQ(0, register_image_watch(*ictx));
+ ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
+ "auto " + stringify(m_watch_ctx->get_handle())));
+
+ m_notify_acks = boost::assign::list_of(
+ std::make_pair(NOTIFY_OP_RESIZE, create_response_message(0)));
+
+ ProgressContext progress_context;
+ ResizeTask resize_task(ictx, &progress_context);
+ boost::thread thread(boost::ref(resize_task));
+
+ ASSERT_TRUE(wait_for_notifies(*ictx));
+
+ NotifyOps expected_notify_ops;
+ expected_notify_ops += NOTIFY_OP_RESIZE;
+ ASSERT_EQ(expected_notify_ops, m_notifies);
+
+ AsyncRequestId async_request_id;
+ ASSERT_TRUE(extract_async_request_id(NOTIFY_OP_RESIZE, &async_request_id));
+
+ ASSERT_EQ(0, notify_async_progress(ictx, async_request_id, 10, 20));
+ ASSERT_TRUE(progress_context.wait(ictx, 10, 20));
+
+ ASSERT_EQ(0, notify_async_complete(ictx, async_request_id, 0));
+
+ ASSERT_TRUE(thread.timed_join(boost::posix_time::seconds(10)));
+ ASSERT_EQ(0, resize_task.result);
+}
+
+TEST_F(TestImageWatcher, NotifySnapCreate) {
+ REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ ASSERT_EQ(0, register_image_watch(*ictx));
+ ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
+ "auto " + stringify(m_watch_ctx->get_handle())));
+
+ m_notify_acks = boost::assign::list_of(
+ std::make_pair(NOTIFY_OP_SNAP_CREATE, create_response_message(0)));
+
+ RWLock::RLocker l(ictx->owner_lock);
+ ASSERT_EQ(0, ictx->image_watcher->notify_snap_create("snap"));
+
+ NotifyOps expected_notify_ops;
+ expected_notify_ops += NOTIFY_OP_SNAP_CREATE;
+ ASSERT_EQ(expected_notify_ops, m_notifies);
+}
+
+TEST_F(TestImageWatcher, NotifySnapCreateError) {
+ REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ ASSERT_EQ(0, register_image_watch(*ictx));
+ ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
+ "auto " + stringify(m_watch_ctx->get_handle())));
+
+ m_notify_acks = boost::assign::list_of(
+ std::make_pair(NOTIFY_OP_SNAP_CREATE, create_response_message(-EEXIST)));
+
+ RWLock::RLocker l(ictx->owner_lock);
+ ASSERT_EQ(-EEXIST, ictx->image_watcher->notify_snap_create("snap"));
+
+ NotifyOps expected_notify_ops;
+ expected_notify_ops += NOTIFY_OP_SNAP_CREATE;
+ ASSERT_EQ(expected_notify_ops, m_notifies);
+}
+
+TEST_F(TestImageWatcher, NotifyAsyncTimedOut) {
+ REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ ASSERT_EQ(0, register_image_watch(*ictx));
+ ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
+ "auto " + stringify(m_watch_ctx->get_handle())));
+
+ m_notify_acks = boost::assign::list_of(
+ std::make_pair(NOTIFY_OP_FLATTEN, bufferlist()));
+
+ ProgressContext progress_context;
+ FlattenTask flatten_task(ictx, &progress_context);
+ boost::thread thread(boost::ref(flatten_task));
+
+ ASSERT_TRUE(thread.timed_join(boost::posix_time::seconds(10)));
+ ASSERT_EQ(-ETIMEDOUT, flatten_task.result);
+}
+
+TEST_F(TestImageWatcher, NotifyAsyncError) {
+ REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ ASSERT_EQ(0, register_image_watch(*ictx));
+ ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
+ "auto " + stringify(m_watch_ctx->get_handle())));
+
+ m_notify_acks = boost::assign::list_of(
+ std::make_pair(NOTIFY_OP_FLATTEN, create_response_message(-EIO)));
+
+ ProgressContext progress_context;
+ FlattenTask flatten_task(ictx, &progress_context);
+ boost::thread thread(boost::ref(flatten_task));
+
+ ASSERT_TRUE(thread.timed_join(boost::posix_time::seconds(10)));
+ ASSERT_EQ(-EIO, flatten_task.result);
+}
+
+TEST_F(TestImageWatcher, NotifyAsyncCompleteError) {
+ REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ ASSERT_EQ(0, register_image_watch(*ictx));
+ ASSERT_EQ(0, lock_image(*ictx, LOCK_EXCLUSIVE,
+ "auto " + stringify(m_watch_ctx->get_handle())));
+
+ m_notify_acks = boost::assign::list_of(
+ std::make_pair(NOTIFY_OP_FLATTEN, create_response_message(0)));
+
+ ProgressContext progress_context;
+ FlattenTask flatten_task(ictx, &progress_context);
+ boost::thread thread(boost::ref(flatten_task));
+
+ ASSERT_TRUE(wait_for_notifies(*ictx));
+
+ NotifyOps expected_notify_ops;
+ expected_notify_ops += NOTIFY_OP_FLATTEN;
+ ASSERT_EQ(expected_notify_ops, m_notifies);
+
+ AsyncRequestId async_request_id;
+ ASSERT_TRUE(extract_async_request_id(NOTIFY_OP_FLATTEN, &async_request_id));
+
+ ASSERT_EQ(0, notify_async_complete(ictx, async_request_id, -ESHUTDOWN));
+
+ ASSERT_TRUE(thread.timed_join(boost::posix_time::seconds(10)));
+ ASSERT_EQ(-ESHUTDOWN, flatten_task.result);
+}