#include "test/rbd_mirror/test_mock_fixture.h"
#include "librbd/journal/TypeTraits.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
class ProgressContext;
template<>
-struct ImageSyncThrottler<librbd::MockTestImageCtx> {
- MOCK_METHOD10(start_sync, void(librbd::MockTestImageCtx *local_image_ctx,
- librbd::MockTestImageCtx *remote_image_ctx,
- SafeTimer *timer, Mutex *timer_lock,
- const std::string &mirror_uuid,
- ::journal::MockJournaler *journaler,
- librbd::journal::MirrorPeerClientMeta *client_meta,
- ContextWQ *work_queue, Context *on_finish,
- ProgressContext *progress_ctx));
- MOCK_METHOD1(cancel_sync, void(const std::string& mirror_uuid));
+struct ImageSync<librbd::MockTestImageCtx> {
+ static ImageSync* s_instance;
+ Context *on_finish = nullptr;
+
+ static ImageSync* create(
+ librbd::MockTestImageCtx *local_image_ctx,
+ librbd::MockTestImageCtx *remote_image_ctx,
+ SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid,
+ ::journal::MockJournaler *journaler,
+ librbd::journal::MirrorPeerClientMeta *client_meta, ContextWQ *work_queue,
+ InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
+ Context *on_finish, ProgressContext *progress_ctx) {
+ assert(s_instance != nullptr);
+ s_instance->on_finish = on_finish;
+ return s_instance;
+ }
+
+ ImageSync() {
+ assert(s_instance == nullptr);
+ s_instance = this;
+ }
+ ~ImageSync() {
+ s_instance = nullptr;
+ }
+
+ MOCK_METHOD0(get, void());
+ MOCK_METHOD0(put, void());
+ MOCK_METHOD0(send, void());
+ MOCK_METHOD0(cancel, void());
+};
+
+ImageSync<librbd::MockTestImageCtx>*
+ ImageSync<librbd::MockTestImageCtx>::s_instance = nullptr;
+
+template<>
+struct InstanceWatcher<librbd::MockTestImageCtx> {
};
namespace image_replayer {
class TestMockImageReplayerBootstrapRequest : public TestMockFixture {
public:
- typedef ImageSyncThrottlerRef<librbd::MockTestImageCtx> MockImageSyncThrottler;
typedef BootstrapRequest<librbd::MockTestImageCtx> MockBootstrapRequest;
typedef CloseImageRequest<librbd::MockTestImageCtx> MockCloseImageRequest;
typedef CreateImageRequest<librbd::MockTestImageCtx> MockCreateImageRequest;
+ typedef ImageSync<librbd::MockTestImageCtx> MockImageSync;
+ typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
typedef IsPrimaryRequest<librbd::MockTestImageCtx> MockIsPrimaryRequest;
typedef OpenImageRequest<librbd::MockTestImageCtx> MockOpenImageRequest;
typedef OpenLocalImageRequest<librbd::MockTestImageCtx> MockOpenLocalImageRequest;
}));
}
- void expect_image_sync(MockImageSyncThrottler image_sync_throttler,
- int r) {
- EXPECT_CALL(*image_sync_throttler, start_sync(_, _, _, _,
- StrEq("local mirror uuid"),
- _, _, _, _, _))
- .WillOnce(WithArg<8>(Invoke([this, r](Context *on_finish) {
- m_threads->work_queue->queue(on_finish, r);
- })));
+ void expect_image_sync(MockImageSync &mock_image_sync, int r) {
+ EXPECT_CALL(mock_image_sync, get());
+ EXPECT_CALL(mock_image_sync, send())
+ .WillOnce(Invoke([this, &mock_image_sync, r]() {
+ m_threads->work_queue->queue(mock_image_sync.on_finish, r);
+ }));
+ EXPECT_CALL(mock_image_sync, put());
}
bufferlist encode_tag_data(const librbd::journal::TagData &tag_data) {
return bl;
}
- MockBootstrapRequest *create_request(MockImageSyncThrottler mock_image_sync_throttler,
+ MockBootstrapRequest *create_request(MockInstanceWatcher *mock_instance_watcher,
::journal::MockJournaler &mock_journaler,
const std::string &local_image_id,
const std::string &remote_image_id,
Context *on_finish) {
return new MockBootstrapRequest(m_local_io_ctx,
m_remote_io_ctx,
- mock_image_sync_throttler,
+ mock_instance_watcher,
&m_local_test_image_ctx,
local_image_id,
remote_image_id,
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
- MockImageSyncThrottler mock_image_sync_throttler(
- new ImageSyncThrottler<librbd::MockTestImageCtx>());
+ MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
- mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+ &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
- MockImageSyncThrottler mock_image_sync_throttler(
- new ImageSyncThrottler<librbd::MockTestImageCtx>());
+ MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
- mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+ &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
- MockImageSyncThrottler mock_image_sync_throttler(
- new ImageSyncThrottler<librbd::MockTestImageCtx>());
+ MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
- mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+ &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
- MockImageSyncThrottler mock_image_sync_throttler(
- new ImageSyncThrottler<librbd::MockTestImageCtx>());
+ MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
- mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+ &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
- MockImageSyncThrottler mock_image_sync_throttler(
- new ImageSyncThrottler<librbd::MockTestImageCtx>());
+ MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
- mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+ &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
- MockImageSyncThrottler mock_image_sync_throttler(
- new ImageSyncThrottler<librbd::MockTestImageCtx>());
+ MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
- mock_image_sync_throttler, mock_journaler, mock_local_image_ctx.id,
+ &mock_instance_watcher, mock_journaler, mock_local_image_ctx.id,
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
m_do_resync = false;
expect_journaler_update_client(mock_journaler, client_data, 0);
// sync the remote image to the local image
- MockImageSyncThrottler mock_image_sync_throttler(
- new ImageSyncThrottler<librbd::MockTestImageCtx>());
- expect_image_sync(mock_image_sync_throttler, 0);
+ MockImageSync mock_image_sync;
+ expect_image_sync(mock_image_sync, 0);
MockCloseImageRequest mock_close_image_request;
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
+ MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
- mock_image_sync_throttler, mock_journaler, "",
+ &mock_instance_watcher, mock_journaler, "",
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
expect_journaler_update_client(mock_journaler, client_data, 0);
// sync the remote image to the local image
- MockImageSyncThrottler mock_image_sync_throttler(
- new ImageSyncThrottler<librbd::MockTestImageCtx>());
- expect_image_sync(mock_image_sync_throttler, 0);
+ MockImageSync mock_image_sync;
+ expect_image_sync(mock_image_sync, 0);
MockCloseImageRequest mock_close_image_request;
expect_close_image(mock_close_image_request, mock_remote_image_ctx, 0);
C_SaferCond ctx;
+ MockInstanceWatcher mock_instance_watcher;
MockBootstrapRequest *request = create_request(
- mock_image_sync_throttler, mock_journaler, "",
+ &mock_instance_watcher, mock_journaler, "",
mock_remote_image_ctx.id, "global image id", "local mirror uuid",
"remote mirror uuid", &ctx);
request->send();
#include "librbd/io/ReadResult.h"
#include "tools/rbd_mirror/types.h"
#include "tools/rbd_mirror/ImageReplayer.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
#include "tools/rbd_mirror/ImageDeleter.h"
m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_threads->work_queue,
m_threads->timer,
&m_threads->timer_lock));
- m_image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>());
+ m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
+ m_local_ioctx, m_threads->work_queue, nullptr);
+ m_instance_watcher->handle_acquire_leader();
}
- ~TestImageReplayer() override
+ ~TestImageReplayer() override
{
unwatch();
+ m_instance_watcher->handle_release_leader();
+
delete m_replayer;
+ delete m_instance_watcher;
delete m_threads;
EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
void create_replayer() {
- m_replayer = new ImageReplayerT(m_threads, m_image_deleter, m_image_sync_throttler,
- rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
- m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id");
+ m_replayer = new ImageReplayerT(
+ m_threads, m_image_deleter, m_instance_watcher,
+ rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
+ m_local_mirror_uuid, m_local_ioctx.get_id(), "global image id");
m_replayer->add_remote_image(m_remote_mirror_uuid, m_remote_image_id,
m_remote_ioctx);
}
std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
std::shared_ptr<librados::Rados> m_local_cluster;
librados::Rados m_remote_cluster;
- std::shared_ptr<rbd::mirror::ImageSyncThrottler<>> m_image_sync_throttler;
+ rbd::mirror::InstanceWatcher<> *m_instance_watcher;
std::string m_local_mirror_uuid = "local mirror uuid";
std::string m_remote_mirror_uuid = "remote mirror uuid";
std::string m_local_pool_name, m_remote_pool_name;
#include "librbd/io/ReadResult.h"
#include "librbd/journal/Types.h"
#include "tools/rbd_mirror/ImageSync.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
void register_test_image_sync() {
create_and_open(m_local_io_ctx, &m_local_image_ctx);
create_and_open(m_remote_io_ctx, &m_remote_image_ctx);
+ m_instance_watcher = rbd::mirror::InstanceWatcher<>::create(
+ m_local_io_ctx, m_threads->work_queue, nullptr);
+ m_instance_watcher->handle_acquire_leader();
+
m_remote_journaler = new ::journal::Journaler(
m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
m_remote_io_ctx, m_remote_image_ctx->id, "mirror-uuid", {});
void TearDown() override {
TestFixture::TearDown();
+
+ m_instance_watcher->handle_release_leader();
+
delete m_remote_journaler;
+ delete m_instance_watcher;
}
void create_and_open(librados::IoCtx &io_ctx, librbd::ImageCtx **image_ctx) {
return new ImageSync<>(m_local_image_ctx, m_remote_image_ctx,
m_threads->timer, &m_threads->timer_lock,
"mirror-uuid", m_remote_journaler, &m_client_meta,
- m_threads->work_queue, ctx);
+ m_threads->work_queue, m_instance_watcher, ctx);
}
librbd::ImageCtx *m_remote_image_ctx;
librbd::ImageCtx *m_local_image_ctx;
+ rbd::mirror::InstanceWatcher<> *m_instance_watcher;
::journal::Journaler *m_remote_journaler;
librbd::journal::MirrorPeerClientMeta m_client_meta;
};
}
}
+ void update_leader_handler(const std::string &leader_instance_id) override {
+ }
+
private:
mutable Mutex m_test_lock;
int m_acquire_count = 0;
#include "librbd/journal/Replay.h"
#include "librbd/journal/Types.h"
#include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "test/journal/mock/MockJournaler.h"
#include "test/librbd/mock/MockImageCtx.h"
namespace mirror {
template<>
-class ImageSyncThrottler<librbd::MockTestImageCtx> {
+class InstanceWatcher<librbd::MockTestImageCtx> {
};
namespace image_replayer {
Context *on_finish = nullptr;
bool *do_resync = nullptr;
- static BootstrapRequest* create(librados::IoCtx &local_io_ctx,
- librados::IoCtx &remote_io_ctx,
- rbd::mirror::ImageSyncThrottlerRef<librbd::MockTestImageCtx> image_sync_throttler,
- librbd::MockTestImageCtx **local_image_ctx,
- const std::string &local_image_name,
- const std::string &remote_image_id,
- const std::string &global_image_id,
- ContextWQ *work_queue, SafeTimer *timer,
- Mutex *timer_lock,
- const std::string &local_mirror_uuid,
- const std::string &remote_mirror_uuid,
- ::journal::MockJournalerProxy *journaler,
- librbd::journal::MirrorPeerClientMeta *client_meta,
- Context *on_finish,
- bool *do_resync,
- rbd::mirror::ProgressContext *progress_ctx = nullptr) {
+ static BootstrapRequest* create(
+ librados::IoCtx &local_io_ctx, librados::IoCtx &remote_io_ctx,
+ rbd::mirror::InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
+ librbd::MockTestImageCtx **local_image_ctx,
+ const std::string &local_image_name, const std::string &remote_image_id,
+ const std::string &global_image_id, ContextWQ *work_queue,
+ SafeTimer *timer, Mutex *timer_lock, const std::string &local_mirror_uuid,
+ const std::string &remote_mirror_uuid,
+ ::journal::MockJournalerProxy *journaler,
+ librbd::journal::MirrorPeerClientMeta *client_meta,
+ Context *on_finish, bool *do_resync,
+ rbd::mirror::ProgressContext *progress_ctx = nullptr) {
assert(s_instance != nullptr);
s_instance->image_ctx = local_image_ctx;
s_instance->on_finish = on_finish;
typedef ReplayStatusFormatter<librbd::MockTestImageCtx> MockReplayStatusFormatter;
typedef librbd::journal::Replay<librbd::MockTestImageCtx> MockReplay;
typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
+ typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
void SetUp() override {
TestMockFixture::SetUp();
m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_threads->work_queue,
m_threads->timer,
&m_threads->timer_lock));
- m_image_sync_throttler.reset(
- new rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>());
-
m_image_replayer = new MockImageReplayer(
- m_threads, m_image_deleter, m_image_sync_throttler,
+ m_threads, m_image_deleter, &m_instance_watcher,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
"local_mirror_uuid", m_local_io_ctx.get_id(), "global image id");
m_image_replayer->add_remote_image(
librbd::ImageCtx *m_remote_image_ctx;
librbd::ImageCtx *m_local_image_ctx = nullptr;
std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
- std::shared_ptr<rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>> m_image_sync_throttler;
+ MockInstanceWatcher m_instance_watcher;
MockImageReplayer *m_image_replayer;
};
namespace rbd {
namespace mirror {
+template<>
+struct InstanceWatcher<librbd::MockTestImageCtx> {
+ MOCK_METHOD2(notify_sync_request, void(const std::string, Context *));
+ MOCK_METHOD1(cancel_sync_request, bool(const std::string &));
+ MOCK_METHOD1(notify_sync_complete, void(const std::string &));
+};
+
namespace image_sync {
template <>
class TestMockImageSync : public TestMockFixture {
public:
typedef ImageSync<librbd::MockTestImageCtx> MockImageSync;
+ typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
typedef image_sync::ImageCopyRequest<librbd::MockTestImageCtx> MockImageCopyRequest;
typedef image_sync::SnapshotCopyRequest<librbd::MockTestImageCtx> MockSnapshotCopyRequest;
typedef image_sync::SyncPointCreateRequest<librbd::MockTestImageCtx> MockSyncPointCreateRequest;
ReturnNew<FunctionContext>([](int) {}));
}
+ void expect_notify_sync_request(MockInstanceWatcher &mock_instance_watcher,
+ const std::string &sync_id, int r) {
+ EXPECT_CALL(mock_instance_watcher, notify_sync_request(sync_id, _))
+ .WillOnce(Invoke([this, r](const std::string &, Context *on_sync_start) {
+ m_threads->work_queue->queue(on_sync_start, r);
+ }));
+ }
+
+ void expect_cancel_sync_request(MockInstanceWatcher &mock_instance_watcher,
+ const std::string &sync_id, bool canceled) {
+ EXPECT_CALL(mock_instance_watcher, cancel_sync_request(sync_id))
+ .WillOnce(Return(canceled));
+ }
+
+ void expect_notify_sync_complete(MockInstanceWatcher &mock_instance_watcher,
+ const std::string &sync_id) {
+ EXPECT_CALL(mock_instance_watcher, notify_sync_complete(sync_id));
+ }
+
void expect_create_sync_point(librbd::MockTestImageCtx &mock_local_image_ctx,
MockSyncPointCreateRequest &mock_sync_point_create_request,
int r) {
MockImageSync *create_request(librbd::MockTestImageCtx &mock_remote_image_ctx,
librbd::MockTestImageCtx &mock_local_image_ctx,
journal::MockJournaler &mock_journaler,
+ MockInstanceWatcher &mock_instance_watcher,
Context *ctx) {
return new MockImageSync(&mock_local_image_ctx, &mock_remote_image_ctx,
m_threads->timer, &m_threads->timer_lock,
"mirror-uuid", &mock_journaler, &m_client_meta,
- m_threads->work_queue, ctx);
+ m_threads->work_queue, &mock_instance_watcher,
+ ctx);
}
librbd::ImageCtx *m_remote_image_ctx;
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
+ MockInstanceWatcher mock_instance_watcher;
MockImageCopyRequest mock_image_copy_request;
MockSnapshotCopyRequest mock_snapshot_copy_request;
MockSyncPointCreateRequest mock_sync_point_create_request;
expect_test_features(mock_local_image_ctx);
InSequence seq;
+ expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
expect_copy_snapshots(mock_snapshot_copy_request, 0);
expect_copy_image(mock_image_copy_request, 0);
expect_create_object_map(mock_local_image_ctx, mock_object_map);
expect_open_object_map(mock_local_image_ctx, *mock_object_map);
expect_prune_sync_point(mock_sync_point_prune_request, true, 0);
+ expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
- mock_local_image_ctx,
- mock_journaler, &ctx);
+ mock_local_image_ctx, mock_journaler,
+ mock_instance_watcher, &ctx);
request->send();
ASSERT_EQ(0, ctx.wait());
}
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
+ MockInstanceWatcher mock_instance_watcher;
MockImageCopyRequest mock_image_copy_request;
MockSnapshotCopyRequest mock_snapshot_copy_request;
MockSyncPointCreateRequest mock_sync_point_create_request;
expect_test_features(mock_local_image_ctx);
InSequence seq;
+ expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
expect_prune_sync_point(mock_sync_point_prune_request, false, 0);
expect_copy_snapshots(mock_snapshot_copy_request, 0);
expect_copy_image(mock_image_copy_request, 0);
expect_create_object_map(mock_local_image_ctx, mock_object_map);
expect_open_object_map(mock_local_image_ctx, *mock_object_map);
expect_prune_sync_point(mock_sync_point_prune_request, true, 0);
+ expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
- mock_local_image_ctx,
- mock_journaler, &ctx);
+ mock_local_image_ctx, mock_journaler,
+ mock_instance_watcher, &ctx);
request->send();
ASSERT_EQ(0, ctx.wait());
}
+TEST_F(TestMockImageSync, CancelNotifySyncRequest) {
+ librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
+ librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
+ journal::MockJournaler mock_journaler;
+ MockInstanceWatcher mock_instance_watcher;
+
+ InSequence seq;
+ Context *on_sync_start = nullptr;
+ C_SaferCond notify_sync_ctx;
+ EXPECT_CALL(mock_instance_watcher,
+ notify_sync_request(mock_local_image_ctx.id, _))
+ .WillOnce(Invoke([this, &on_sync_start, ¬ify_sync_ctx](
+ const std::string &, Context *ctx) {
+ on_sync_start = ctx;
+ notify_sync_ctx.complete(0);
+ }));
+ EXPECT_CALL(mock_instance_watcher,
+ cancel_sync_request(mock_local_image_ctx.id))
+ .WillOnce(Invoke([this, &on_sync_start](const std::string &) {
+ EXPECT_NE(nullptr, on_sync_start);
+ on_sync_start->complete(-ECANCELED);
+ return true;
+ }));
+
+ C_SaferCond ctx;
+ MockImageSync *request = create_request(mock_remote_image_ctx,
+ mock_local_image_ctx, mock_journaler,
+ mock_instance_watcher, &ctx);
+ request->get();
+ request->send();
+
+ // cancel the notify sync request once it starts
+ ASSERT_EQ(0, notify_sync_ctx.wait());
+ request->cancel();
+ request->put();
+
+ ASSERT_EQ(-ECANCELED, ctx.wait());
+}
+
TEST_F(TestMockImageSync, CancelImageCopy) {
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
+ MockInstanceWatcher mock_instance_watcher;
MockImageCopyRequest mock_image_copy_request;
MockSnapshotCopyRequest mock_snapshot_copy_request;
MockSyncPointCreateRequest mock_sync_point_create_request;
m_client_meta.sync_points = {{cls::rbd::UserSnapshotNamespace(), "snap1", boost::none}};
InSequence seq;
+ expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
expect_prune_sync_point(mock_sync_point_prune_request, false, 0);
expect_copy_snapshots(mock_snapshot_copy_request, 0);
.WillOnce(Invoke([&image_copy_ctx]() {
image_copy_ctx.complete(0);
}));
+ expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id,
+ false);
EXPECT_CALL(mock_image_copy_request, cancel());
+ expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
- mock_local_image_ctx,
- mock_journaler, &ctx);
+ mock_local_image_ctx, mock_journaler,
+ mock_instance_watcher, &ctx);
request->get();
request->send();
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
+ MockInstanceWatcher mock_instance_watcher;
MockSnapshotCopyRequest mock_snapshot_copy_request;
MockSyncPointCreateRequest mock_sync_point_create_request;
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
- mock_local_image_ctx,
- mock_journaler, &ctx);
+ mock_local_image_ctx, mock_journaler,
+ mock_instance_watcher, &ctx);
InSequence seq;
+ expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
EXPECT_CALL(mock_snapshot_copy_request, send())
.WillOnce((DoAll(InvokeWithoutArgs([request]() {
Invoke([this, &mock_snapshot_copy_request]() {
m_threads->work_queue->queue(mock_snapshot_copy_request.on_finish, 0);
}))));
+ expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id,
+ false);
EXPECT_CALL(mock_snapshot_copy_request, cancel());
+ expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
request->send();
ASSERT_EQ(-ECANCELED, ctx.wait());
librbd::MockTestImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
librbd::MockTestImageCtx mock_local_image_ctx(*m_local_image_ctx);
journal::MockJournaler mock_journaler;
+ MockInstanceWatcher mock_instance_watcher;
MockImageCopyRequest mock_image_copy_request;
MockSnapshotCopyRequest mock_snapshot_copy_request;
MockSyncPointCreateRequest mock_sync_point_create_request;
C_SaferCond ctx;
MockImageSync *request = create_request(mock_remote_image_ctx,
- mock_local_image_ctx,
- mock_journaler, &ctx);
+ mock_local_image_ctx, mock_journaler,
+ mock_instance_watcher, &ctx);
InSequence seq;
+ expect_notify_sync_request(mock_instance_watcher, mock_local_image_ctx.id, 0);
expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
expect_copy_snapshots(mock_snapshot_copy_request, 0);
EXPECT_CALL(mock_image_copy_request, send())
Invoke([this, &mock_image_copy_request]() {
m_threads->work_queue->queue(mock_image_copy_request.on_finish, 0);
}))));
+ expect_cancel_sync_request(mock_instance_watcher, mock_local_image_ctx.id,
+ false);
EXPECT_CALL(mock_image_copy_request, cancel());
+ expect_notify_sync_complete(mock_instance_watcher, mock_local_image_ctx.id);
request->send();
ASSERT_EQ(-ECANCELED, ctx.wait());
*/
#include "test/rbd_mirror/test_mock_fixture.h"
-#include "librbd/journal/TypeTraits.h"
-#include "test/journal/mock/MockJournaler.h"
#include "test/librbd/mock/MockImageCtx.h"
-#include "librbd/ImageState.h"
-#include "tools/rbd_mirror/Threads.h"
-#include "tools/rbd_mirror/ImageSync.h"
namespace librbd {
} // anonymous namespace
-namespace journal {
-
-template <>
-struct TypeTraits<librbd::MockTestImageCtx> {
- typedef ::journal::MockJournaler Journaler;
-};
-
-} // namespace journal
} // namespace librbd
-namespace rbd {
-namespace mirror {
-
-using ::testing::Invoke;
-
-typedef ImageSync<librbd::MockTestImageCtx> MockImageSync;
-
-template<>
-class ImageSync<librbd::MockTestImageCtx> {
-public:
- static std::vector<MockImageSync *> instances;
-
- Context *on_finish;
- bool syncing = false;
-
- static ImageSync* create(librbd::MockTestImageCtx *local_image_ctx,
- librbd::MockTestImageCtx *remote_image_ctx,
- SafeTimer *timer, Mutex *timer_lock,
- const std::string &mirror_uuid,
- journal::MockJournaler *journaler,
- librbd::journal::MirrorPeerClientMeta *client_meta,
- ContextWQ *work_queue, Context *on_finish,
- ProgressContext *progress_ctx = nullptr) {
- ImageSync *sync = new ImageSync();
- sync->on_finish = on_finish;
-
- EXPECT_CALL(*sync, send())
- .WillRepeatedly(Invoke([sync]() {
- sync->syncing = true;
- }));
-
- return sync;
- }
-
- void finish(int r) {
- on_finish->complete(r);
- }
-
- void get() {
- instances.push_back(this);
- }
-
- void put() { delete this; }
-
- MOCK_METHOD0(cancel, void());
- MOCK_METHOD0(send, void());
-
-};
-
-
-std::vector<MockImageSync *> MockImageSync::instances;
-
-} // namespace mirror
-} // namespace rbd
-
-
// template definitions
#include "tools/rbd_mirror/ImageSyncThrottler.cc"
public:
typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
- void SetUp() override {
- TestMockFixture::SetUp();
-
- librbd::RBD rbd;
- ASSERT_EQ(0, create_image(rbd, m_remote_io_ctx, m_image_name, m_image_size));
- ASSERT_EQ(0, open_image(m_remote_io_ctx, m_image_name, &m_remote_image_ctx));
-
- ASSERT_EQ(0, create_image(rbd, m_local_io_ctx, m_image_name, m_image_size));
- ASSERT_EQ(0, open_image(m_local_io_ctx, m_image_name, &m_local_image_ctx));
-
- mock_sync_throttler = new MockImageSyncThrottler();
-
- m_mock_local_image_ctx = new librbd::MockTestImageCtx(*m_local_image_ctx);
- m_mock_remote_image_ctx = new librbd::MockTestImageCtx(*m_remote_image_ctx);
- m_mock_journaler = new journal::MockJournaler();
- }
-
- void TearDown() override {
- MockImageSync::instances.clear();
- delete mock_sync_throttler;
- delete m_mock_local_image_ctx;
- delete m_mock_remote_image_ctx;
- delete m_mock_journaler;
- TestMockFixture::TearDown();
- }
-
- void start_sync(const std::string& image_id, Context *ctx) {
- m_mock_local_image_ctx->id = image_id;
- mock_sync_throttler->start_sync(m_mock_local_image_ctx,
- m_mock_remote_image_ctx,
- m_threads->timer,
- &m_threads->timer_lock,
- "mirror_uuid",
- m_mock_journaler,
- &m_client_meta,
- m_threads->work_queue,
- ctx);
- }
-
- void cancel(const std::string& mirror_uuid, MockImageSync *sync,
- bool running=true) {
- if (running) {
- EXPECT_CALL(*sync, cancel())
- .WillOnce(Invoke([sync]() {
- sync->finish(-ECANCELED);
- }));
- } else {
- EXPECT_CALL(*sync, cancel()).Times(0);
- }
- mock_sync_throttler->cancel_sync(mirror_uuid);
- }
-
- librbd::ImageCtx *m_remote_image_ctx;
- librbd::ImageCtx *m_local_image_ctx;
- librbd::MockTestImageCtx *m_mock_local_image_ctx;
- librbd::MockTestImageCtx *m_mock_remote_image_ctx;
- journal::MockJournaler *m_mock_journaler;
- librbd::journal::MirrorPeerClientMeta m_client_meta;
- MockImageSyncThrottler *mock_sync_throttler;
};
TEST_F(TestMockImageSyncThrottler, Single_Sync) {
- C_SaferCond ctx;
- start_sync("image_id", &ctx);
-
- ASSERT_EQ(1u, MockImageSync::instances.size());
- MockImageSync *sync = MockImageSync::instances[0];
- ASSERT_EQ(true, sync->syncing);
- sync->finish(0);
- ASSERT_EQ(0, ctx.wait());
+ MockImageSyncThrottler throttler;
+ C_SaferCond on_start;
+ throttler.start_op("id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+ throttler.finish_op("id");
}
TEST_F(TestMockImageSyncThrottler, Multiple_Syncs) {
- mock_sync_throttler->set_max_concurrent_syncs(2);
-
- C_SaferCond ctx1;
- start_sync("image_id_1", &ctx1);
- C_SaferCond ctx2;
- start_sync("image_id_2", &ctx2);
- C_SaferCond ctx3;
- start_sync("image_id_3", &ctx3);
- C_SaferCond ctx4;
- start_sync("image_id_4", &ctx4);
-
- ASSERT_EQ(4u, MockImageSync::instances.size());
-
- MockImageSync *sync1 = MockImageSync::instances[0];
- ASSERT_TRUE(sync1->syncing);
-
- MockImageSync *sync2 = MockImageSync::instances[1];
- ASSERT_TRUE(sync2->syncing);
-
- MockImageSync *sync3 = MockImageSync::instances[2];
- ASSERT_FALSE(sync3->syncing);
-
- MockImageSync *sync4 = MockImageSync::instances[3];
- ASSERT_FALSE(sync4->syncing);
-
- sync1->finish(0);
- ASSERT_EQ(0, ctx1.wait());
-
- ASSERT_TRUE(sync3->syncing);
- sync3->finish(-EINVAL);
- ASSERT_EQ(-EINVAL, ctx3.wait());
-
- ASSERT_TRUE(sync4->syncing);
-
- sync2->finish(0);
- ASSERT_EQ(0, ctx2.wait());
-
- sync4->finish(0);
- ASSERT_EQ(0, ctx4.wait());
+ MockImageSyncThrottler throttler;
+ throttler.set_max_concurrent_syncs(2);
+
+ C_SaferCond on_start1;
+ throttler.start_op("id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("id2", &on_start2);
+ C_SaferCond on_start3;
+ throttler.start_op("id3", &on_start3);
+ C_SaferCond on_start4;
+ throttler.start_op("id4", &on_start4);
+
+ ASSERT_EQ(0, on_start2.wait());
+ throttler.finish_op("id2");
+ ASSERT_EQ(0, on_start3.wait());
+ throttler.finish_op("id3");
+ ASSERT_EQ(0, on_start1.wait());
+ throttler.finish_op("id1");
+ ASSERT_EQ(0, on_start4.wait());
+ throttler.finish_op("id4");
}
TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync) {
- C_SaferCond ctx1;
- start_sync("image_id_1", &ctx1);
- C_SaferCond ctx2;
- start_sync("image_id_2", &ctx2);
-
- ASSERT_EQ(2u, MockImageSync::instances.size());
-
- MockImageSync *sync1 = MockImageSync::instances[0];
- ASSERT_TRUE(sync1->syncing);
-
- MockImageSync *sync2 = MockImageSync::instances[1];
- ASSERT_TRUE(sync2->syncing);
-
- cancel("image_id_2", sync2);
- ASSERT_EQ(-ECANCELED, ctx2.wait());
-
- sync1->finish(0);
- ASSERT_EQ(0, ctx1.wait());
+ MockImageSyncThrottler throttler;
+ C_SaferCond on_start;
+ throttler.start_op("id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+ ASSERT_FALSE(throttler.cancel_op("id"));
+ throttler.finish_op("id");
}
TEST_F(TestMockImageSyncThrottler, Cancel_Waiting_Sync) {
- mock_sync_throttler->set_max_concurrent_syncs(1);
-
- C_SaferCond ctx1;
- start_sync("image_id_1", &ctx1);
- C_SaferCond ctx2;
- start_sync("image_id_2", &ctx2);
-
- ASSERT_EQ(2u, MockImageSync::instances.size());
-
- MockImageSync *sync1 = MockImageSync::instances[0];
- ASSERT_TRUE(sync1->syncing);
-
- MockImageSync *sync2 = MockImageSync::instances[1];
- ASSERT_FALSE(sync2->syncing);
-
- cancel("image_id_2", sync2, false);
- ASSERT_EQ(-ECANCELED, ctx2.wait());
-
- sync1->finish(0);
- ASSERT_EQ(0, ctx1.wait());
+ MockImageSyncThrottler throttler;
+ throttler.set_max_concurrent_syncs(1);
+
+ C_SaferCond on_start1;
+ throttler.start_op("id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("id2", &on_start2);
+
+ ASSERT_EQ(0, on_start1.wait());
+ ASSERT_TRUE(throttler.cancel_op("id2"));
+ ASSERT_EQ(-ECANCELED, on_start2.wait());
+ throttler.finish_op("id1");
}
-TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
- mock_sync_throttler->set_max_concurrent_syncs(1);
-
- C_SaferCond ctx1;
- start_sync("image_id_1", &ctx1);
- C_SaferCond ctx2;
- start_sync("image_id_2", &ctx2);
-
- ASSERT_EQ(2u, MockImageSync::instances.size());
- MockImageSync *sync1 = MockImageSync::instances[0];
- ASSERT_TRUE(sync1->syncing);
-
- MockImageSync *sync2 = MockImageSync::instances[1];
- ASSERT_FALSE(sync2->syncing);
-
- cancel("image_id_1", sync1);
- ASSERT_EQ(-ECANCELED, ctx1.wait());
-
- ASSERT_TRUE(sync2->syncing);
- sync2->finish(0);
- ASSERT_EQ(0, ctx2.wait());
+TEST_F(TestMockImageSyncThrottler, Cancel_Running_Sync_Start_Waiting) {
+ MockImageSyncThrottler throttler;
+ throttler.set_max_concurrent_syncs(1);
+
+ C_SaferCond on_start1;
+ throttler.start_op("id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("id2", &on_start2);
+
+ ASSERT_EQ(0, on_start1.wait());
+ ASSERT_FALSE(throttler.cancel_op("id1"));
+ throttler.finish_op("id1");
+ ASSERT_EQ(0, on_start2.wait());
+ throttler.finish_op("id2");
}
TEST_F(TestMockImageSyncThrottler, Increase_Max_Concurrent_Syncs) {
- mock_sync_throttler->set_max_concurrent_syncs(2);
-
- C_SaferCond ctx1;
- start_sync("image_id_1", &ctx1);
- C_SaferCond ctx2;
- start_sync("image_id_2", &ctx2);
- C_SaferCond ctx3;
- start_sync("image_id_3", &ctx3);
- C_SaferCond ctx4;
- start_sync("image_id_4", &ctx4);
- C_SaferCond ctx5;
- start_sync("image_id_5", &ctx5);
-
- ASSERT_EQ(5u, MockImageSync::instances.size());
-
- MockImageSync *sync1 = MockImageSync::instances[0];
- ASSERT_TRUE(sync1->syncing);
-
- MockImageSync *sync2 = MockImageSync::instances[1];
- ASSERT_TRUE(sync2->syncing);
-
- MockImageSync *sync3 = MockImageSync::instances[2];
- ASSERT_FALSE(sync3->syncing);
-
- MockImageSync *sync4 = MockImageSync::instances[3];
- ASSERT_FALSE(sync4->syncing);
-
- MockImageSync *sync5 = MockImageSync::instances[4];
- ASSERT_FALSE(sync5->syncing);
-
- mock_sync_throttler->set_max_concurrent_syncs(4);
-
- ASSERT_TRUE(sync3->syncing);
- ASSERT_TRUE(sync4->syncing);
- ASSERT_FALSE(sync5->syncing);
-
- sync1->finish(0);
- ASSERT_EQ(0, ctx1.wait());
-
- ASSERT_TRUE(sync5->syncing);
- sync5->finish(-EINVAL);
- ASSERT_EQ(-EINVAL, ctx5.wait());
-
- sync2->finish(0);
- ASSERT_EQ(0, ctx2.wait());
-
- sync3->finish(0);
- ASSERT_EQ(0, ctx3.wait());
-
- sync4->finish(0);
- ASSERT_EQ(0, ctx4.wait());
+ MockImageSyncThrottler throttler;
+ throttler.set_max_concurrent_syncs(2);
+
+ C_SaferCond on_start1;
+ throttler.start_op("id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("id2", &on_start2);
+ C_SaferCond on_start3;
+ throttler.start_op("id3", &on_start3);
+ C_SaferCond on_start4;
+ throttler.start_op("id4", &on_start4);
+ C_SaferCond on_start5;
+ throttler.start_op("id5", &on_start5);
+
+ ASSERT_EQ(0, on_start1.wait());
+ ASSERT_EQ(0, on_start2.wait());
+
+ throttler.set_max_concurrent_syncs(4);
+
+ ASSERT_EQ(0, on_start3.wait());
+ ASSERT_EQ(0, on_start4.wait());
+
+ throttler.finish_op("id4");
+ ASSERT_EQ(0, on_start5.wait());
+
+ throttler.finish_op("id1");
+ throttler.finish_op("id2");
+ throttler.finish_op("id3");
+ throttler.finish_op("id5");
}
TEST_F(TestMockImageSyncThrottler, Decrease_Max_Concurrent_Syncs) {
- mock_sync_throttler->set_max_concurrent_syncs(4);
-
- C_SaferCond ctx1;
- start_sync("image_id_1", &ctx1);
- C_SaferCond ctx2;
- start_sync("image_id_2", &ctx2);
- C_SaferCond ctx3;
- start_sync("image_id_3", &ctx3);
- C_SaferCond ctx4;
- start_sync("image_id_4", &ctx4);
- C_SaferCond ctx5;
- start_sync("image_id_5", &ctx5);
-
- ASSERT_EQ(5u, MockImageSync::instances.size());
-
- MockImageSync *sync1 = MockImageSync::instances[0];
- ASSERT_TRUE(sync1->syncing);
-
- MockImageSync *sync2 = MockImageSync::instances[1];
- ASSERT_TRUE(sync2->syncing);
-
- MockImageSync *sync3 = MockImageSync::instances[2];
- ASSERT_TRUE(sync3->syncing);
-
- MockImageSync *sync4 = MockImageSync::instances[3];
- ASSERT_TRUE(sync4->syncing);
-
- MockImageSync *sync5 = MockImageSync::instances[4];
- ASSERT_FALSE(sync5->syncing);
-
- mock_sync_throttler->set_max_concurrent_syncs(2);
-
- ASSERT_FALSE(sync5->syncing);
-
- sync1->finish(0);
- ASSERT_EQ(0, ctx1.wait());
-
- ASSERT_FALSE(sync5->syncing);
-
- sync2->finish(0);
- ASSERT_EQ(0, ctx2.wait());
-
- ASSERT_FALSE(sync5->syncing);
-
- sync3->finish(0);
- ASSERT_EQ(0, ctx3.wait());
-
- ASSERT_TRUE(sync5->syncing);
-
- sync4->finish(0);
- ASSERT_EQ(0, ctx4.wait());
-
- sync5->finish(0);
- ASSERT_EQ(0, ctx5.wait());
+ MockImageSyncThrottler throttler;
+ throttler.set_max_concurrent_syncs(4);
+
+ C_SaferCond on_start1;
+ throttler.start_op("id1", &on_start1);
+ C_SaferCond on_start2;
+ throttler.start_op("id2", &on_start2);
+ C_SaferCond on_start3;
+ throttler.start_op("id3", &on_start3);
+ C_SaferCond on_start4;
+ throttler.start_op("id4", &on_start4);
+ C_SaferCond on_start5;
+ throttler.start_op("id5", &on_start5);
+
+ ASSERT_EQ(0, on_start1.wait());
+ ASSERT_EQ(0, on_start2.wait());
+ ASSERT_EQ(0, on_start3.wait());
+ ASSERT_EQ(0, on_start4.wait());
+
+ throttler.set_max_concurrent_syncs(2);
+
+ throttler.finish_op("id1");
+ throttler.finish_op("id2");
+ throttler.finish_op("id3");
+
+ ASSERT_EQ(0, on_start5.wait());
+
+ throttler.finish_op("id4");
+ throttler.finish_op("id5");
}
-
} // namespace mirror
} // namespace rbd
#include "test/librbd/mock/MockImageCtx.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "tools/rbd_mirror/ImageReplayer.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/InstanceReplayer.h"
#include "tools/rbd_mirror/Threads.h"
}
};
+template<>
+struct InstanceWatcher<librbd::MockTestImageCtx> {
+};
+
template<>
struct ImageReplayer<librbd::MockTestImageCtx> {
static ImageReplayer* s_instance;
static ImageReplayer *create(
Threads<librbd::MockTestImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
- ImageSyncThrottlerRef<librbd::MockTestImageCtx> image_sync_throttler,
+ InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
const std::string &global_image_id) {
assert(s_instance != nullptr);
MOCK_METHOD0(is_blacklisted, bool());
};
-template<>
-struct ImageSyncThrottler<librbd::MockTestImageCtx> {
- ImageSyncThrottler() {
- }
- virtual ~ImageSyncThrottler() {
- }
-};
-
ImageReplayer<librbd::MockTestImageCtx>* ImageReplayer<librbd::MockTestImageCtx>::s_instance = nullptr;
} // namespace mirror
public:
typedef ImageReplayer<librbd::MockTestImageCtx> MockImageReplayer;
typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
+ typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
typedef Threads<librbd::MockTestImageCtx> MockThreads;
void SetUp() override {
m_image_deleter.reset(
new rbd::mirror::ImageDeleter(m_threads->work_queue, m_threads->timer,
&m_threads->timer_lock));
- m_image_sync_throttler.reset(
- new rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>());
}
void TearDown() override {
MockThreads *m_mock_threads;
std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
- std::shared_ptr<rbd::mirror::ImageSyncThrottler<librbd::MockTestImageCtx>>
- m_image_sync_throttler;
};
TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
+ MockInstanceWatcher mock_instance_watcher;
MockImageReplayer mock_image_replayer;
MockInstanceReplayer instance_replayer(
- m_mock_threads, m_image_deleter, m_image_sync_throttler,
+ m_mock_threads, m_image_deleter,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
"local_mirror_uuid", m_local_io_ctx.get_id());
.WillOnce(Return(true));
EXPECT_CALL(mock_image_replayer, start(nullptr, false));
- instance_replayer.acquire_image(global_image_id, "remote_mirror_uuid",
- "remote_image_id", &on_acquire);
+ instance_replayer.acquire_image(&mock_instance_watcher, global_image_id,
+ "remote_mirror_uuid", "remote_image_id",
+ &on_acquire);
ASSERT_EQ(0, on_acquire.wait());
// Release
#include "test/librbd/mock/MockImageCtx.h"
#include "test/rbd_mirror/test_mock_fixture.h"
#include "tools/rbd_mirror/InstanceReplayer.h"
+#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/InstanceWatcher.h"
#include "tools/rbd_mirror/Threads.h"
template <>
struct InstanceReplayer<librbd::MockTestImageCtx> {
- MOCK_METHOD4(acquire_image, void(const std::string &, const std::string &,
+ MOCK_METHOD5(acquire_image, void(InstanceWatcher<librbd::MockTestImageCtx> *,
+ const std::string &, const std::string &,
const std::string &, Context *));
MOCK_METHOD5(release_image, void(const std::string &, const std::string &,
const std::string &, bool, Context *));
};
+template <>
+struct ImageSyncThrottler<librbd::MockTestImageCtx> {
+ static ImageSyncThrottler* s_instance;
+
+ static ImageSyncThrottler *create() {
+ assert(s_instance != nullptr);
+ return s_instance;
+ }
+
+ ImageSyncThrottler() {
+ assert(s_instance == nullptr);
+ s_instance = this;
+ }
+
+ virtual ~ImageSyncThrottler() {
+ assert(s_instance == this);
+ s_instance = nullptr;
+ }
+
+ MOCK_METHOD0(destroy, void());
+ MOCK_METHOD1(drain, void(int));
+ MOCK_METHOD2(start_op, void(const std::string &, Context *));
+ MOCK_METHOD1(finish_op, void(const std::string &));
+};
+
+ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
+
} // namespace mirror
} // namespace rbd
ASSERT_EQ(0, instance_watcher2->init());
// Acquire Image on the the same instance
- EXPECT_CALL(mock_instance_replayer1, acquire_image("gid", "uuid", "id", _))
- .WillOnce(WithArg<3>(CompleteContext(0)));
+ EXPECT_CALL(mock_instance_replayer1, acquire_image(instance_watcher1, "gid",
+ "uuid", "id", _))
+ .WillOnce(WithArg<4>(CompleteContext(0)));
C_SaferCond on_acquire1;
instance_watcher1->notify_image_acquire(instance_id1, "gid", "uuid", "id",
&on_acquire1);
ASSERT_EQ(0, on_acquire1.wait());
// Acquire Image on the other instance
- EXPECT_CALL(mock_instance_replayer2, acquire_image("gid", "uuid", "id", _))
- .WillOnce(WithArg<3>(CompleteContext(0)));
+ EXPECT_CALL(mock_instance_replayer2, acquire_image(instance_watcher2, "gid",
+ "uuid", "id", _))
+ .WillOnce(WithArg<4>(CompleteContext(0)));
C_SaferCond on_acquire2;
instance_watcher1->notify_image_acquire(instance_id2, "gid", "uuid", "id",
&on_acquire2);
delete instance_watcher;
}
+class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher {
+public:
+ typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
+
+ MockManagedLock mock_managed_lock;
+ MockImageSyncThrottler mock_image_sync_throttler;
+ std::string instance_id1;
+ std::string instance_id2;
+
+ librados::Rados cluster;
+ librados::IoCtx io_ctx2;
+
+ MockInstanceWatcher *instance_watcher1;
+ MockInstanceWatcher *instance_watcher2;
+
+ void SetUp() override {
+ TestMockInstanceWatcher::SetUp();
+
+ instance_id1 = m_instance_id;
+ librados::IoCtx& io_ctx1 = m_local_io_ctx;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
+ instance_watcher1 = MockInstanceWatcher::create(io_ctx1,
+ m_mock_threads->work_queue,
+ nullptr);
+ EXPECT_EQ("", connect_cluster_pp(cluster));
+ EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
+ instance_id2 = stringify(io_ctx2.get_instance_id());
+ librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
+ instance_watcher2 = MockInstanceWatcher::create(io_ctx2,
+ m_mock_threads->work_queue,
+ nullptr);
+ InSequence seq;
+
+ // Init instance watcher 1 (leader)
+ expect_register_instance(mock_io_ctx1, 0);
+ expect_register_watch(mock_io_ctx1, instance_id1);
+ expect_acquire_lock(mock_managed_lock, 0);
+ EXPECT_EQ(0, instance_watcher1->init());
+ instance_watcher1->handle_acquire_leader();
+
+ // Init instance watcher 2
+ expect_register_instance(mock_io_ctx2, 0);
+ expect_register_watch(mock_io_ctx2, instance_id2);
+ expect_acquire_lock(mock_managed_lock, 0);
+ EXPECT_EQ(0, instance_watcher2->init());
+ instance_watcher2->handle_update_leader(instance_id1);
+ }
+
+ void TearDown() override {
+ librados::IoCtx& io_ctx1 = m_local_io_ctx;
+ librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
+ librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
+
+ InSequence seq;
+
+ expect_throttler_destroy();
+ instance_watcher1->handle_release_leader();
+
+ // Shutdown instance watcher 1
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx1);
+ expect_unregister_instance(mock_io_ctx1, 0);
+ instance_watcher1->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher1;
+
+ // Shutdown instance watcher 2
+ expect_release_lock(mock_managed_lock, 0);
+ expect_unregister_watch(mock_io_ctx2);
+ expect_unregister_instance(mock_io_ctx2, 0);
+ instance_watcher2->shut_down();
+
+ expect_destroy_lock(mock_managed_lock);
+ delete instance_watcher2;
+
+ TestMockInstanceWatcher::TearDown();
+ }
+
+ void expect_throttler_destroy(
+ std::vector<Context *> *throttler_queue = nullptr) {
+ EXPECT_CALL(mock_image_sync_throttler, drain(-ESTALE))
+ .WillOnce(Invoke([throttler_queue] (int r) {
+ if (throttler_queue != nullptr) {
+ for (auto ctx : *throttler_queue) {
+ ctx->complete(r);
+ }
+ }
+ }));
+ EXPECT_CALL(mock_image_sync_throttler, destroy());
+ }
+
+ void expect_throttler_start_op(const std::string &sync_id,
+ Context *on_call = nullptr,
+ Context **on_start_ctx = nullptr) {
+ EXPECT_CALL(mock_image_sync_throttler, start_op(sync_id, _))
+ .WillOnce(Invoke([on_call, on_start_ctx] (const std::string &,
+ Context *ctx) {
+ if (on_call != nullptr) {
+ on_call->complete(0);
+ }
+ if (on_start_ctx != nullptr) {
+ *on_start_ctx = ctx;
+ } else {
+ ctx->complete(0);
+ }
+ }));
+ }
+
+ void expect_throttler_finish_op(const std::string &sync_id,
+ Context *on_finish) {
+ EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
+ .WillOnce(Invoke([on_finish](const std::string &) {
+ on_finish->complete(0);
+ }));
+ }
+};
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) {
+ InSequence seq;
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher1->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher1->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnLeader) {
+ InSequence seq;
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher1->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+
+ ASSERT_FALSE(instance_watcher1->cancel_sync_request("sync_id"));
+
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher1->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnNonLeader) {
+ InSequence seq;
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher2->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher2->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnNonLeader) {
+ InSequence seq;
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher2->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+
+ ASSERT_FALSE(instance_watcher2->cancel_sync_request("sync_id"));
+
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher2->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, CancelWaitingOnNonLeader) {
+ InSequence seq;
+
+ C_SaferCond on_start_op_called;
+ Context *on_start_ctx;
+ expect_throttler_start_op("sync_id", &on_start_op_called,
+ &on_start_ctx);
+ C_SaferCond on_start;
+ instance_watcher2->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start_op_called.wait());
+
+ ASSERT_TRUE(instance_watcher2->cancel_sync_request("sync_id"));
+ // emulate watcher timeout
+ on_start_ctx->complete(-ETIMEDOUT);
+ ASSERT_EQ(-ECANCELED, on_start.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, InFlightPrevNotification) {
+ // start sync when previous notification is still in flight
+
+ InSequence seq;
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start1;
+ instance_watcher2->notify_sync_request("sync_id", &on_start1);
+ ASSERT_EQ(0, on_start1.wait());
+
+ C_SaferCond on_start2;
+ EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
+ .WillOnce(Invoke([this, &on_start2](const std::string &) {
+ instance_watcher2->notify_sync_request("sync_id", &on_start2);
+ }));
+ expect_throttler_start_op("sync_id");
+ instance_watcher2->notify_sync_complete("sync_id");
+
+ ASSERT_EQ(0, on_start2.wait());
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher2->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) {
+ InSequence seq;
+
+ expect_throttler_destroy();
+ instance_watcher1->handle_release_leader();
+ instance_watcher1->handle_acquire_leader();
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) {
+ InSequence seq;
+
+ expect_throttler_destroy();
+ instance_watcher1->handle_release_leader();
+ instance_watcher2->handle_acquire_leader();
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher2->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+ expect_throttler_destroy();
+ instance_watcher2->handle_release_leader();
+ instance_watcher2->notify_sync_complete("sync_id");
+
+ instance_watcher1->handle_acquire_leader();
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) {
+ InSequence seq;
+
+ C_SaferCond on_start_op_called;
+ Context *on_start_ctx;
+ expect_throttler_start_op("sync_id", &on_start_op_called,
+ &on_start_ctx);
+ C_SaferCond on_start;
+ instance_watcher1->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start_op_called.wait());
+
+ std::vector<Context *> throttler_queue = {on_start_ctx};
+ expect_throttler_destroy(&throttler_queue);
+ instance_watcher1->handle_release_leader();
+ instance_watcher2->handle_acquire_leader();
+ instance_watcher1->handle_update_leader(instance_id2);
+
+ expect_throttler_start_op("sync_id");
+ ASSERT_EQ(0, on_start.wait());
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher1->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+
+ expect_throttler_destroy();
+ instance_watcher2->handle_release_leader();
+ instance_watcher1->handle_acquire_leader();
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) {
+ InSequence seq;
+
+ expect_throttler_destroy();
+ instance_watcher1->handle_release_leader();
+ instance_watcher2->handle_acquire_leader();
+ instance_watcher1->handle_update_leader(instance_id2);
+
+ expect_throttler_start_op("sync_id");
+ C_SaferCond on_start;
+ instance_watcher1->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start.wait());
+
+ expect_throttler_destroy();
+ instance_watcher2->handle_release_leader();
+ instance_watcher1->handle_acquire_leader();
+ instance_watcher2->handle_update_leader(instance_id2);
+
+ instance_watcher1->notify_sync_complete("sync_id");
+}
+
+TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnNonLeaderAcquireLeader) {
+ InSequence seq;
+
+ C_SaferCond on_start_op_called;
+ Context *on_start_ctx;
+ expect_throttler_start_op("sync_id", &on_start_op_called,
+ &on_start_ctx);
+ C_SaferCond on_start;
+ instance_watcher2->notify_sync_request("sync_id", &on_start);
+ ASSERT_EQ(0, on_start_op_called.wait());
+
+ std::vector<Context *> throttler_queue = {on_start_ctx};
+ expect_throttler_destroy(&throttler_queue);
+ instance_watcher1->handle_release_leader();
+
+ EXPECT_CALL(mock_image_sync_throttler, start_op("sync_id", _))
+ .WillOnce(WithArg<1>(CompleteContext(0)));
+ instance_watcher2->handle_acquire_leader();
+ instance_watcher1->handle_update_leader(instance_id2);
+
+ ASSERT_EQ(0, on_start.wait());
+
+ C_SaferCond on_finish;
+ expect_throttler_finish_op("sync_id", &on_finish);
+ instance_watcher2->notify_sync_complete("sync_id");
+ ASSERT_EQ(0, on_finish.wait());
+
+ expect_throttler_destroy();
+ instance_watcher2->handle_release_leader();
+ instance_watcher1->handle_acquire_leader();
+}
+
} // namespace mirror
} // namespace rbd
MOCK_CONST_METHOD0(is_shutdown, bool());
MOCK_CONST_METHOD0(is_state_post_acquiring, bool());
+ MOCK_CONST_METHOD0(is_state_pre_releasing, bool());
MOCK_CONST_METHOD0(is_state_locked, bool());
};
return MockManagedLock::get_instance().is_state_post_acquiring();
}
+ bool is_state_pre_releasing() const {
+ return MockManagedLock::get_instance().is_state_pre_releasing();
+ }
+
bool is_state_locked() const {
return MockManagedLock::get_instance().is_state_locked();
}
MOCK_METHOD1(post_acquire_handler, void(Context *));
MOCK_METHOD1(pre_release_handler, void(Context *));
+
+ MOCK_METHOD1(update_leader_handler, void(const std::string &));
};
MockListener *MockListener::s_instance = nullptr;
.Times(AtLeast(0)).WillRepeatedly(Return(false));
EXPECT_CALL(mock_managed_lock, is_state_locked())
.Times(AtLeast(0)).WillRepeatedly(Return(false));
+ EXPECT_CALL(mock_managed_lock, is_state_pre_releasing())
+ .Times(AtLeast(0)).WillRepeatedly(Return(false));
}
void expect_notify_heartbeat(MockManagedLock &mock_managed_lock,
expect_is_shutdown(mock_managed_lock);
expect_is_leader(mock_managed_lock);
expect_destroy(mock_managed_lock);
+ EXPECT_CALL(listener, update_leader_handler(_));
InSequence seq;
virtual void cancel() {}
protected:
- void finish(int r) {
+ virtual void finish(int r) {
if (m_cct) {
lsubdout(m_cct, rbd_mirror, 20) << m_name << "::finish: r=" << r << dendl;
}
#include "librbd/Utils.h"
#include "librbd/journal/Replay.h"
#include "ImageReplayer.h"
-#include "ImageSync.h"
#include "Threads.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
template <typename I>
ImageReplayer<I>::ImageReplayer(Threads<librbd::ImageCtx> *threads,
shared_ptr<ImageDeleter> image_deleter,
- ImageSyncThrottlerRef<I> image_sync_throttler,
+ InstanceWatcher<I> *instance_watcher,
RadosRef local,
const std::string &local_mirror_uuid,
int64_t local_pool_id,
const std::string &global_image_id) :
m_threads(threads),
m_image_deleter(image_deleter),
- m_image_sync_throttler(image_sync_throttler),
+ m_instance_watcher(instance_watcher),
m_local(local),
m_local_mirror_uuid(local_mirror_uuid),
m_local_pool_id(local_pool_id),
ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
BootstrapRequest<I> *request = BootstrapRequest<I>::create(
- m_local_ioctx, m_remote_image.io_ctx, m_image_sync_throttler,
+ m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
&m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
m_global_image_id, m_threads->work_queue, m_threads->timer,
&m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
namespace rbd {
namespace mirror {
+template <typename> struct InstanceWatcher;
template <typename> struct Threads;
namespace image_replayer { template <typename> class BootstrapRequest; }
static ImageReplayer *create(
Threads<librbd::ImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
- ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+ InstanceWatcher<ImageCtxT> *instance_watcher,
RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
const std::string &global_image_id) {
- return new ImageReplayer(threads, image_deleter, image_sync_throttler,
+ return new ImageReplayer(threads, image_deleter, instance_watcher,
local, local_mirror_uuid, local_pool_id,
global_image_id);
}
ImageReplayer(Threads<librbd::ImageCtx> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
- ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+ InstanceWatcher<ImageCtxT> *instance_watcher,
RadosRef local, const std::string &local_mirror_uuid,
int64_t local_pool_id, const std::string &global_image_id);
virtual ~ImageReplayer();
Threads<librbd::ImageCtx> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
- ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
+ InstanceWatcher<ImageCtxT> *m_instance_watcher;
RemoteImages m_remote_images;
RemoteImage m_remote_image;
// vim: ts=8 sw=2 smarttab
#include "ImageSync.h"
+#include "InstanceWatcher.h"
#include "ProgressContext.h"
#include "common/errno.h"
#include "journal/Journaler.h"
SafeTimer *timer, Mutex *timer_lock,
const std::string &mirror_uuid, Journaler *journaler,
MirrorPeerClientMeta *client_meta,
- ContextWQ *work_queue, Context *on_finish,
- ProgressContext *progress_ctx)
+ ContextWQ *work_queue,
+ InstanceWatcher<I> *instance_watcher,
+ Context *on_finish, ProgressContext *progress_ctx)
: BaseRequest("rbd::mirror::ImageSync", local_image_ctx->cct, on_finish),
m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
m_timer(timer), m_timer_lock(timer_lock), m_mirror_uuid(mirror_uuid),
m_journaler(journaler), m_client_meta(client_meta),
- m_work_queue(work_queue), m_progress_ctx(progress_ctx),
+ m_work_queue(work_queue), m_instance_watcher(instance_watcher),
+ m_progress_ctx(progress_ctx),
m_lock(unique_lock_name("ImageSync::m_lock", this)) {
}
template <typename I>
void ImageSync<I>::send() {
- send_prune_catch_up_sync_point();
+ send_notify_sync_request();
}
template <typename I>
m_canceled = true;
+ if (m_instance_watcher->cancel_sync_request(m_local_image_ctx->id)) {
+ return;
+ }
+
if (m_snapshot_copy_request != nullptr) {
m_snapshot_copy_request->cancel();
}
}
}
+template <typename I>
+void ImageSync<I>::send_notify_sync_request() {
+ update_progress("NOTIFY_SYNC_REQUEST");
+
+ dout(20) << dendl;
+
+ Context *ctx = create_context_callback<
+ ImageSync<I>, &ImageSync<I>::handle_notify_sync_request>(this);
+ m_instance_watcher->notify_sync_request(m_local_image_ctx->id, ctx);
+}
+
+template <typename I>
+void ImageSync<I>::handle_notify_sync_request(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ if (r < 0) {
+ BaseRequest::finish(r);
+ return;
+ }
+
+ send_prune_catch_up_sync_point();
+}
+
template <typename I>
void ImageSync<I>::send_prune_catch_up_sync_point() {
update_progress("PRUNE_CATCH_UP_SYNC_POINT");
}
}
+template <typename I>
+void ImageSync<I>::finish(int r) {
+ dout(20) << ": r=" << r << dendl;
+
+ m_instance_watcher->notify_sync_complete(m_local_image_ctx->id);
+ BaseRequest::finish(r);
+}
+
} // namespace mirror
} // namespace rbd
class ProgressContext;
+template <typename> class InstanceWatcher;
+
namespace image_sync { template <typename> class ImageCopyRequest; }
namespace image_sync { template <typename> class SnapshotCopyRequest; }
Mutex *timer_lock, const std::string &mirror_uuid,
Journaler *journaler,
MirrorPeerClientMeta *client_meta,
- ContextWQ *work_queue, Context *on_finish,
- ProgressContext *progress_ctx = nullptr) {
+ ContextWQ *work_queue,
+ InstanceWatcher<ImageCtxT> *instance_watcher,
+ Context *on_finish,
+ ProgressContext *progress_ctx = nullptr) {
return new ImageSync(local_image_ctx, remote_image_ctx, timer, timer_lock,
mirror_uuid, journaler, client_meta, work_queue,
- on_finish, progress_ctx);
+ instance_watcher, on_finish, progress_ctx);
}
ImageSync(ImageCtxT *local_image_ctx, ImageCtxT *remote_image_ctx,
SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid,
Journaler *journaler, MirrorPeerClientMeta *client_meta,
- ContextWQ *work_queue, Context *on_finish,
- ProgressContext *progress_ctx = nullptr);
+ ContextWQ *work_queue, InstanceWatcher<ImageCtxT> *instance_watcher,
+ Context *on_finish, ProgressContext *progress_ctx = nullptr);
~ImageSync() override;
void send() override;
void cancel() override;
+protected:
+ void finish(int r) override;
+
private:
/**
* @verbatim
* <start>
* |
* v
+ * NOTIFY_SYNC_REQUEST
+ * |
+ * v
* PRUNE_CATCH_UP_SYNC_POINT
* |
* v
Journaler *m_journaler;
MirrorPeerClientMeta *m_client_meta;
ContextWQ *m_work_queue;
+ InstanceWatcher<ImageCtxT> *m_instance_watcher;
ProgressContext *m_progress_ctx;
SnapMap m_snap_map;
image_sync::ImageCopyRequest<ImageCtxT> *m_image_copy_request = nullptr;
decltype(ImageCtxT::object_map) m_object_map = nullptr;
+ void send_notify_sync_request();
+ void handle_notify_sync_request(int r);
+
void send_prune_catch_up_sync_point();
void handle_prune_catch_up_sync_point(int r);
*/
#include "ImageSyncThrottler.h"
-#include "ImageSync.h"
-#include "common/ceph_context.h"
+#include "common/Formatter.h"
+#include "common/debug.h"
+#include "common/errno.h"
#include "librbd/Utils.h"
#define dout_context g_ceph_context
#undef dout_prefix
#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
<< " " << __func__ << ": "
-using std::unique_ptr;
-using std::string;
-using std::set;
namespace rbd {
namespace mirror {
-template <typename ImageCtxT>
-struct ImageSyncThrottler<ImageCtxT>::C_SyncHolder : public Context {
- ImageSyncThrottler<ImageCtxT> *m_sync_throttler;
- std::string m_local_image_id;
- ImageSync<ImageCtxT> *m_sync = nullptr;
- Context *m_on_finish;
-
- C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler,
- const std::string &local_image_id, Context *on_finish)
- : m_sync_throttler(sync_throttler),
- m_local_image_id(local_image_id), m_on_finish(on_finish) {
- }
-
- void finish(int r) override {
- m_sync->put();
- m_sync_throttler->handle_sync_finished(this);
-
- m_on_finish->complete(r);
- }
-};
-
template <typename I>
ImageSyncThrottler<I>::ImageSyncThrottler()
- : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs),
- m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", this))
-{
- dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs
- << dendl;
+ : m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler",
+ this)),
+ m_max_concurrent_syncs(
+ g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs) {
+ dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl;
g_ceph_context->_conf->add_observer(this);
}
template <typename I>
ImageSyncThrottler<I>::~ImageSyncThrottler() {
- {
- Mutex::Locker l(m_lock);
- assert(m_sync_queue.empty());
- assert(m_inflight_syncs.empty());
- }
-
g_ceph_context->_conf->remove_observer(this);
+
+ Mutex::Locker locker(m_lock);
+ assert(m_inflight_ops.empty());
+ assert(m_queue.empty());
}
template <typename I>
-void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx,
- SafeTimer *timer, Mutex *timer_lock,
- const std::string &mirror_uuid,
- Journaler *journaler,
- MirrorPeerClientMeta *client_meta,
- ContextWQ *work_queue,
- Context *on_finish,
- ProgressContext *progress_ctx) {
- dout(20) << dendl;
+void ImageSyncThrottler<I>::start_op(const std::string &id, Context *on_start) {
+ dout(20) << "id=" << id << dendl;
- C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, local_image_ctx->id,
- on_finish);
- sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx,
- remote_image_ctx, timer,
- timer_lock, mirror_uuid,
- journaler, client_meta,
- work_queue, sync_holder_ctx,
- progress_ctx);
- sync_holder_ctx->m_sync->get();
-
- bool start = false;
{
- Mutex::Locker l(m_lock);
-
- if (m_inflight_syncs.size() < m_max_concurrent_syncs) {
- assert(m_inflight_syncs.count(local_image_ctx->id) == 0);
- m_inflight_syncs[local_image_ctx->id] = sync_holder_ctx;
- start = true;
- dout(10) << "ready to start image sync for local_image_id "
- << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/"
- << m_max_concurrent_syncs << "]" << dendl;
+ Mutex::Locker locker(m_lock);
+
+ if (m_inflight_ops.count(id) > 0) {
+ dout(20) << "duplicate for already started op " << id << dendl;
+ } else if (m_max_concurrent_syncs == 0 ||
+ m_inflight_ops.size() < m_max_concurrent_syncs) {
+ assert(m_queue.empty());
+ m_inflight_ops.insert(id);
+ dout(20) << "ready to start sync for " << id << " ["
+ << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+ << dendl;
} else {
- m_sync_queue.push_front(sync_holder_ctx);
- dout(10) << "image sync for local_image_id " << local_image_ctx->id
- << " has been queued" << dendl;
+ m_queue.push_back(std::make_pair(id, on_start));
+ on_start = nullptr;
+ dout(20) << "image sync for " << id << " has been queued" << dendl;
}
}
- if (start) {
- sync_holder_ctx->m_sync->send();
+ if (on_start != nullptr) {
+ on_start->complete(0);
}
}
template <typename I>
-void ImageSyncThrottler<I>::cancel_sync(const std::string &local_image_id) {
- dout(20) << dendl;
-
- C_SyncHolder *sync_holder = nullptr;
- bool running_sync = true;
+bool ImageSyncThrottler<I>::cancel_op(const std::string &id) {
+ dout(20) << "id=" << id << dendl;
+ Context *on_start = nullptr;
{
- Mutex::Locker l(m_lock);
- if (m_inflight_syncs.empty()) {
- // no image sync currently running and neither waiting
- return;
- }
-
- auto it = m_inflight_syncs.find(local_image_id);
- if (it != m_inflight_syncs.end()) {
- sync_holder = it->second;
- }
-
- if (!sync_holder) {
- for (auto it = m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
- if ((*it)->m_local_image_id == local_image_id) {
- sync_holder = (*it);
- m_sync_queue.erase(it);
- running_sync = false;
- break;
- }
+ Mutex::Locker locker(m_lock);
+ for (auto it = m_queue.begin(); it != m_queue.end(); ++it) {
+ if (it->first == id) {
+ on_start = it->second;
+ dout(20) << "canceled queued sync for " << id << dendl;
+ m_queue.erase(it);
+ break;
}
}
}
- if (sync_holder) {
- if (running_sync) {
- dout(10) << "canceled running image sync for local_image_id "
- << sync_holder->m_local_image_id << dendl;
- sync_holder->m_sync->cancel();
- } else {
- dout(10) << "canceled waiting image sync for local_image_id "
- << sync_holder->m_local_image_id << dendl;
- sync_holder->m_on_finish->complete(-ECANCELED);
- sync_holder->m_sync->put();
- delete sync_holder;
- }
+ if (on_start == nullptr) {
+ return false;
}
+
+ on_start->complete(-ECANCELED);
+ return true;
}
template <typename I>
-void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) {
- dout(20) << dendl;
+void ImageSyncThrottler<I>::finish_op(const std::string &id) {
+ dout(20) << "id=" << id << dendl;
- C_SyncHolder *next_sync_holder = nullptr;
+ if (cancel_op(id)) {
+ return;
+ }
+ Context *on_start = nullptr;
{
- Mutex::Locker l(m_lock);
- m_inflight_syncs.erase(sync_holder->m_local_image_id);
-
- if (m_inflight_syncs.size() < m_max_concurrent_syncs &&
- !m_sync_queue.empty()) {
- next_sync_holder = m_sync_queue.back();
- m_sync_queue.pop_back();
-
- assert(
- m_inflight_syncs.count(next_sync_holder->m_local_image_id) == 0);
- m_inflight_syncs[next_sync_holder->m_local_image_id] =
- next_sync_holder;
- dout(10) << "ready to start image sync for local_image_id "
- << next_sync_holder->m_local_image_id << " ["
- << m_inflight_syncs.size() << "/" << m_max_concurrent_syncs
- << "]" << dendl;
+ Mutex::Locker locker(m_lock);
+
+ m_inflight_ops.erase(id);
+
+ if (m_inflight_ops.size() < m_max_concurrent_syncs && !m_queue.empty()) {
+ auto pair = m_queue.front();
+ m_inflight_ops.insert(pair.first);
+ dout(20) << "ready to start sync for " << pair.first << " ["
+ << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+ << dendl;
+ on_start= pair.second;
+ m_queue.pop_front();
}
+ }
+
+ if (on_start != nullptr) {
+ on_start->complete(0);
+ }
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::drain(int r) {
+ dout(20) << dendl;
- dout(10) << "currently running image syncs [" << m_inflight_syncs.size()
- << "/" << m_max_concurrent_syncs << "]" << dendl;
+ std::list<std::pair<std::string, Context *>> queue;
+ {
+ Mutex::Locker locker(m_lock);
+ std::swap(m_queue, queue);
+ m_inflight_ops.clear();
}
- if (next_sync_holder) {
- next_sync_holder->m_sync->send();
+ for (auto &pair : queue) {
+ pair.second->complete(r);
}
}
template <typename I>
void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
- dout(20) << " max=" << max << dendl;
-
- assert(max > 0);
+ dout(20) << "max=" << max << dendl;
- std::list<C_SyncHolder *> next_sync_holders;
+ std::list<Context *> ops;
{
- Mutex::Locker l(m_lock);
- this->m_max_concurrent_syncs = max;
-
- // Start waiting syncs in the case of available free slots
- while(m_inflight_syncs.size() < m_max_concurrent_syncs
- && !m_sync_queue.empty()) {
- C_SyncHolder *next_sync_holder = m_sync_queue.back();
- next_sync_holders.push_back(next_sync_holder);
- m_sync_queue.pop_back();
-
- assert(
- m_inflight_syncs.count(next_sync_holder->m_local_image_id) == 0);
- m_inflight_syncs[next_sync_holder->m_local_image_id] = next_sync_holder;
-
- dout(10) << "ready to start image sync for local_image_id "
- << next_sync_holder->m_local_image_id << " ["
- << m_inflight_syncs.size() << "/" << m_max_concurrent_syncs
- << "]" << dendl;
+ Mutex::Locker locker(m_lock);
+ m_max_concurrent_syncs = max;
+
+ // Start waiting ops in the case of available free slots
+ while ((m_max_concurrent_syncs == 0 ||
+ m_inflight_ops.size() < m_max_concurrent_syncs) &&
+ !m_queue.empty()) {
+ auto pair = m_queue.front();
+ m_inflight_ops.insert(pair.first);
+ dout(20) << "ready to start sync for " << pair.first << " ["
+ << m_inflight_ops.size() << "/" << m_max_concurrent_syncs << "]"
+ << dendl;
+ ops.push_back(pair.second);
+ m_queue.pop_front();
}
}
- for (const auto& sync_holder : next_sync_holders) {
- sync_holder->m_sync->send();
+ for (const auto& ctx : ops) {
+ ctx->complete(0);
}
}
template <typename I>
-void ImageSyncThrottler<I>::print_status(Formatter *f, stringstream *ss) {
- Mutex::Locker l(m_lock);
+void ImageSyncThrottler<I>::print_status(Formatter *f, std::stringstream *ss) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
if (f) {
f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
- f->dump_int("running_syncs", m_inflight_syncs.size());
- f->dump_int("waiting_syncs", m_sync_queue.size());
+ f->dump_int("running_syncs", m_inflight_ops.size());
+ f->dump_int("waiting_syncs", m_queue.size());
f->flush(*ss);
} else {
*ss << "[ ";
*ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
- *ss << "running_syncs=" << m_inflight_syncs.size() << ", ";
- *ss << "waiting_syncs=" << m_sync_queue.size() << " ]";
+ *ss << "running_syncs=" << m_inflight_ops.size() << ", ";
+ *ss << "waiting_syncs=" << m_queue.size() << " ]";
}
}
}
template <typename I>
-void ImageSyncThrottler<I>::handle_conf_change(
- const struct md_config_t *conf,
- const set<string> &changed) {
+void ImageSyncThrottler<I>::handle_conf_change(const struct md_config_t *conf,
+ const set<string> &changed) {
if (changed.count("rbd_mirror_concurrent_image_syncs")) {
set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs);
}
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 SUSE LINUX GmbH
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-#ifndef CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
-#define CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
+#ifndef RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
+#define RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
#include <list>
-#include <map>
+#include <set>
+#include <sstream>
+#include <string>
#include <utility>
+
#include "common/Mutex.h"
-#include "librbd/ImageCtx.h"
-#include "include/Context.h"
-#include "librbd/journal/TypeTraits.h"
+#include "common/config_obs.h"
-class CephContext;
class Context;
-class ContextWQ;
-class SafeTimer;
-namespace journal { class Journaler; }
-namespace librbd { namespace journal { struct MirrorPeerClientMeta; } }
+
+namespace ceph { class Formatter; }
+namespace librbd { class ImageCtx; }
namespace rbd {
namespace mirror {
-template <typename> class ImageSync;
-
-class ProgressContext;
-
-/**
- * Manage concurrent image-syncs
- */
template <typename ImageCtxT = librbd::ImageCtx>
class ImageSyncThrottler : public md_config_obs_t {
public:
-
- typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
- typedef typename TypeTraits::Journaler Journaler;
- typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta;
+ static ImageSyncThrottler *create() {
+ return new ImageSyncThrottler();
+ }
+ void destroy() {
+ delete this;
+ }
ImageSyncThrottler();
~ImageSyncThrottler() override;
- ImageSyncThrottler(const ImageSyncThrottler&) = delete;
- ImageSyncThrottler& operator=(const ImageSyncThrottler&) = delete;
-
- void start_sync(ImageCtxT *local_image_ctx,
- ImageCtxT *remote_image_ctx, SafeTimer *timer,
- Mutex *timer_lock, const std::string &mirror_uuid,
- Journaler *journaler, MirrorPeerClientMeta *client_meta,
- ContextWQ *work_queue, Context *on_finish,
- ProgressContext *progress_ctx = nullptr);
-
- void cancel_sync(const std::string &local_image_id);
void set_max_concurrent_syncs(uint32_t max);
+ void start_op(const std::string &id, Context *on_start);
+ bool cancel_op(const std::string &id);
+ void finish_op(const std::string &id);
+ void drain(int r);
void print_status(Formatter *f, std::stringstream *ss);
private:
- struct C_SyncHolder;
-
- void handle_sync_finished(C_SyncHolder *sync_holder);
+ Mutex m_lock;
+ uint32_t m_max_concurrent_syncs;
+ std::list<std::pair<std::string, Context *>> m_queue;
+ std::set<std::string> m_inflight_ops;
const char **get_tracked_conf_keys() const override;
void handle_conf_change(const struct md_config_t *conf,
const std::set<std::string> &changed) override;
-
- uint32_t m_max_concurrent_syncs;
- Mutex m_lock;
- std::list<C_SyncHolder *> m_sync_queue;
- std::map<std::string, C_SyncHolder *> m_inflight_syncs;
-
};
} // namespace mirror
} // namespace rbd
-#endif // CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
+extern template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;
+
+#endif // RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
template <typename I>
InstanceReplayer<I>::InstanceReplayer(
Threads<I> *threads, std::shared_ptr<ImageDeleter> image_deleter,
- ImageSyncThrottlerRef<I> image_sync_throttler, RadosRef local_rados,
- const std::string &local_mirror_uuid, int64_t local_pool_id)
- : m_threads(threads), m_image_deleter(image_deleter),
- m_image_sync_throttler(image_sync_throttler), m_local_rados(local_rados),
- m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
- m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
+ RadosRef local_rados, const std::string &local_mirror_uuid,
+ int64_t local_pool_id)
+ : m_threads(threads), m_image_deleter(image_deleter),
+ m_local_rados(local_rados), m_local_mirror_uuid(local_mirror_uuid),
+ m_local_pool_id(local_pool_id),
+ m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
}
template <typename I>
}
template <typename I>
-void InstanceReplayer<I>::acquire_image(const std::string &global_image_id,
+void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
+ const std::string &global_image_id,
const std::string &peer_mirror_uuid,
const std::string &peer_image_id,
Context *on_finish) {
if (it == m_image_replayers.end()) {
auto image_replayer = ImageReplayer<I>::create(
- m_threads, m_image_deleter, m_image_sync_throttler, m_local_rados,
- m_local_mirror_uuid, m_local_pool_id, global_image_id);
+ m_threads, m_image_deleter, instance_watcher, m_local_rados,
+ m_local_mirror_uuid, m_local_pool_id, global_image_id);
dout(20) << global_image_id << ": creating replayer " << image_replayer
<< dendl;
class ImageDeleter;
template <typename> class ImageReplayer;
+template <typename> class InstanceWatcher;
template <typename> struct Threads;
template <typename ImageCtxT = librbd::ImageCtx>
public:
static InstanceReplayer* create(
Threads<ImageCtxT> *threads, std::shared_ptr<ImageDeleter> image_deleter,
- ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler, RadosRef local_rados,
- const std::string &local_mirror_uuid, int64_t local_pool_id) {
- return new InstanceReplayer(threads, image_deleter, image_sync_throttler,
- local_rados, local_mirror_uuid, local_pool_id);
+ RadosRef local_rados, const std::string &local_mirror_uuid,
+ int64_t local_pool_id) {
+ return new InstanceReplayer(threads, image_deleter, local_rados,
+ local_mirror_uuid, local_pool_id);
}
void destroy() {
delete this;
InstanceReplayer(Threads<ImageCtxT> *threads,
std::shared_ptr<ImageDeleter> image_deleter,
- ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
RadosRef local_rados, const std::string &local_mirror_uuid,
int64_t local_pool_id);
~InstanceReplayer();
void add_peer(std::string mirror_uuid, librados::IoCtx io_ctx);
void remove_peer(std::string mirror_uuid);
- void acquire_image(const std::string &global_image_id,
+ void acquire_image(InstanceWatcher<ImageCtxT> *instance_watcher,
+ const std::string &global_image_id,
const std::string &peer_mirror_uuid,
const std::string &peer_image_id,
Context *on_finish);
Threads<ImageCtxT> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
- ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
RadosRef m_local_rados;
std::string m_local_mirror_uuid;
int64_t m_local_pool_id;
#include "librbd/ManagedLock.h"
#include "librbd/Utils.h"
#include "InstanceReplayer.h"
+#include "ImageSyncThrottler.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
template <typename I>
struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
InstanceWatcher<I> *instance_watcher;
- librbd::watcher::Notifier notifier;
std::string instance_id;
uint64_t request_id;
bufferlist bl;
Context *on_finish;
+ bool send_to_leader;
+ std::unique_ptr<librbd::watcher::Notifier> notifier;
librbd::watcher::NotifyResponse response;
- atomic_t canceling;
+ bool canceling = false;
C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
const std::string &instance_id, uint64_t request_id,
bufferlist &&bl, Context *on_finish)
- : instance_watcher(instance_watcher),
- notifier(instance_watcher->m_work_queue, instance_watcher->m_ioctx,
- RBD_MIRROR_INSTANCE_PREFIX + instance_id),
- instance_id(instance_id), request_id(request_id), bl(bl),
- on_finish(on_finish) {
- instance_watcher->m_notify_op_tracker.start_op();
+ : instance_watcher(instance_watcher), instance_id(instance_id),
+ request_id(request_id), bl(bl), on_finish(on_finish),
+ send_to_leader(instance_id.empty()) {
+ dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": instance_watcher=" << instance_watcher << ", instance_id="
+ << instance_id << ", request_id=" << request_id << dendl;
+
assert(instance_watcher->m_lock.is_locked());
+
+ if (!send_to_leader) {
+ assert((!instance_id.empty()));
+ notifier.reset(new librbd::watcher::Notifier(
+ instance_watcher->m_work_queue,
+ instance_watcher->m_ioctx,
+ RBD_MIRROR_INSTANCE_PREFIX + instance_id));
+ }
+
+ instance_watcher->m_notify_op_tracker.start_op();
auto result = instance_watcher->m_notify_ops.insert(
std::make_pair(instance_id, this)).second;
assert(result);
void send() {
dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
- notifier.notify(bl, &response, this);
+ assert(instance_watcher->m_lock.is_locked());
+
+ if (canceling) {
+ dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": canceling" << dendl;
+ instance_watcher->m_work_queue->queue(this, -ECANCELED);
+ return;
+ }
+
+ if (send_to_leader) {
+ if (instance_watcher->m_leader_instance_id.empty()) {
+ dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": suspending" << dendl;
+ instance_watcher->suspend_notify_request(this);
+ return;
+ }
+
+ if (instance_watcher->m_leader_instance_id != instance_id) {
+ auto count = instance_watcher->m_notify_ops.erase(
+ std::make_pair(instance_id, this));
+ assert(count > 0);
+
+ instance_id = instance_watcher->m_leader_instance_id;
+
+ auto result = instance_watcher->m_notify_ops.insert(
+ std::make_pair(instance_id, this)).second;
+ assert(result);
+
+ notifier.reset(new librbd::watcher::Notifier(
+ instance_watcher->m_work_queue,
+ instance_watcher->m_ioctx,
+ RBD_MIRROR_INSTANCE_PREFIX + instance_id));
+ }
+ }
+
+ dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": sendding to " << instance_id << dendl;
+ notifier->notify(bl, &response, this);
}
void cancel() {
dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
- canceling.set(1);
+ assert(instance_watcher->m_lock.is_locked());
+
+ canceling = true;
+ instance_watcher->unsuspend_notify_request(this);
}
void finish(int r) override {
if (!found) {
if (r == -ETIMEDOUT) {
- if (canceling.read()) {
- r = -ECANCELED;
- } else {
- derr << "C_NotifyInstanceRequest: " << this << " " << __func__
- << ": resending after timeout" << dendl;
- send();
- return;
- }
+ derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": resending after timeout" << dendl;
+ Mutex::Locker locker(instance_watcher->m_lock);
+ send();
+ return;
} else {
r = -EINVAL;
}
+ } else {
+ if (r == -ESTALE && send_to_leader) {
+ derr << "C_NotifyInstanceRequest: " << this << " " << __func__
+ << ": resending due to leader change" << dendl;
+ Mutex::Locker locker(instance_watcher->m_lock);
+ send();
+ return;
+ }
}
}
- instance_watcher->m_notify_op_tracker.finish_op();
on_finish->complete(r);
- Mutex::Locker locker(instance_watcher->m_lock);
- auto result = instance_watcher->m_notify_ops.erase(
+ {
+ Mutex::Locker locker(instance_watcher->m_lock);
+ auto result = instance_watcher->m_notify_ops.erase(
std::make_pair(instance_id, this));
- assert(result > 0);
+ assert(result > 0);
+ instance_watcher->m_notify_op_tracker.finish_op();
+ }
+
delete this;
}
}
};
+template <typename I>
+struct InstanceWatcher<I>::C_SyncRequest : public Context {
+ InstanceWatcher<I> *instance_watcher;
+ std::string sync_id;
+ Context *on_start;
+ Context *on_complete = nullptr;
+ C_NotifyInstanceRequest *req = nullptr;
+
+ C_SyncRequest(InstanceWatcher<I> *instance_watcher,
+ const std::string &sync_id, Context *on_start)
+ : instance_watcher(instance_watcher), sync_id(sync_id),
+ on_start(on_start) {
+ dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
+ << sync_id << dendl;
+ }
+
+ void finish(int r) override {
+ dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r="
+ << r << dendl;
+
+ if (on_start != nullptr) {
+ instance_watcher->handle_notify_sync_request(this, r);
+ } else {
+ instance_watcher->handle_notify_sync_complete(this, r);
+ delete this;
+ }
+ }
+
+ // called twice
+ void complete(int r) override {
+ finish(r);
+ }
+};
+
#undef dout_prefix
#define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
<< this << " " << __func__ << ": "
template <typename I>
InstanceWatcher<I>::~InstanceWatcher() {
+ assert(m_notify_ops.empty());
+ assert(m_notify_op_tracker.empty());
+ assert(m_suspended_ops.empty());
+ assert(m_inflight_sync_reqs.empty());
+ assert(m_image_sync_throttler == nullptr);
m_instance_lock->destroy();
}
}
}
+template <typename I>
+void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
+ Context *on_sync_start) {
+ dout(20) << "sync_id=" << sync_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(m_inflight_sync_reqs.count(sync_id) == 0);
+
+ uint64_t request_id = ++m_request_seq;
+
+ bufferlist bl;
+ ::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
+
+ auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
+ sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
+ std::move(bl), sync_ctx);
+
+ m_inflight_sync_reqs[sync_id] = sync_ctx;
+ sync_ctx->req->send();
+}
+
+template <typename I>
+bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
+ dout(20) << "sync_id=" << sync_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ auto it = m_inflight_sync_reqs.find(sync_id);
+ if (it == m_inflight_sync_reqs.end()) {
+ return false;
+ }
+
+ auto sync_ctx = it->second;
+
+ if (sync_ctx->on_start == nullptr) {
+ return false;
+ }
+
+ assert(sync_ctx->req != nullptr);
+ sync_ctx->req->cancel();
+ return true;
+}
+
+template <typename I>
+void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
+ const std::string &sync_id) {
+ dout(20) << "sync_id=" << sync_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ uint64_t request_id = ++m_request_seq;
+
+ bufferlist bl;
+ ::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
+
+ auto ctx = new FunctionContext(
+ [this, sync_id] (int r) {
+ dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
+ Mutex::Locker locker(m_lock);
+ if (r != -ESTALE && m_image_sync_throttler != nullptr) {
+ m_image_sync_throttler->finish_op(sync_id);
+ }
+ });
+ auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
+ std::move(bl), ctx);
+ req->send();
+}
+
+template <typename I>
+void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
+ dout(20) << "sync_id=" << sync_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ auto it = m_inflight_sync_reqs.find(sync_id);
+ assert(it != m_inflight_sync_reqs.end());
+
+ auto sync_ctx = it->second;
+ assert(sync_ctx->req == nullptr);
+
+ m_inflight_sync_reqs.erase(it);
+ m_work_queue->queue(sync_ctx, 0);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
+ int r) {
+ dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
+
+ Context *on_start = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+
+ assert(sync_ctx->req != nullptr);
+ assert(sync_ctx->on_start != nullptr);
+
+ if (sync_ctx->req->canceling) {
+ r = -ECANCELED;
+ }
+
+ std::swap(sync_ctx->on_start, on_start);
+ sync_ctx->req = nullptr;
+ }
+
+ on_start->complete(r == -ECANCELED ? r : 0);
+
+ if (r == -ECANCELED) {
+ notify_sync_complete(sync_ctx->sync_id);
+ }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
+ int r) {
+ dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
+
+ if (sync_ctx->on_complete != nullptr) {
+ sync_ctx->on_complete->complete(r);
+ }
+}
+
+template <typename I>
+void InstanceWatcher<I>::print_sync_status(Formatter *f, stringstream *ss) {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ if (m_image_sync_throttler != nullptr) {
+ m_image_sync_throttler->print_status(f, ss);
+ }
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_acquire_leader() {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(m_image_sync_throttler == nullptr);
+ m_image_sync_throttler = ImageSyncThrottler<I>::create();
+
+ m_leader_instance_id = m_instance_id;
+ unsuspend_notify_requests();
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_release_leader() {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ assert(m_image_sync_throttler != nullptr);
+
+ m_leader_instance_id.clear();
+
+ m_image_sync_throttler->drain(-ESTALE);
+ m_image_sync_throttler->destroy();
+ m_image_sync_throttler = nullptr;
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_update_leader(
+ const std::string &leader_instance_id) {
+ dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ m_leader_instance_id = leader_instance_id;
+
+ if (!m_leader_instance_id.empty()) {
+ unsuspend_notify_requests();
+ }
+}
+
template <typename I>
void InstanceWatcher<I>::cancel_notify_requests(
const std::string &instance_id) {
Mutex::Locker locker(m_lock);
for (auto op : m_notify_ops) {
- if (op.first == instance_id) {
+ if (op.first == instance_id && !op.second->send_to_leader) {
op.second->cancel();
}
}
}
-
template <typename I>
void InstanceWatcher<I>::register_instance() {
assert(m_lock.is_locked());
remove_instance_object();
}
+template <typename I>
+void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
+ dout(20) << req << dendl;
+
+ assert(m_lock.is_locked());
+
+ auto result = m_suspended_ops.insert(req).second;
+ assert(result);
+}
+
+template <typename I>
+bool InstanceWatcher<I>::unsuspend_notify_request(
+ C_NotifyInstanceRequest *req) {
+ dout(20) << req << dendl;
+
+ assert(m_lock.is_locked());
+
+ auto result = m_suspended_ops.erase(req);
+ if (result == 0) {
+ return false;
+ }
+
+ req->send();
+ return true;
+}
+
+template <typename I>
+void InstanceWatcher<I>::unsuspend_notify_requests() {
+ dout(20) << dendl;
+
+ assert(m_lock.is_locked());
+
+ std::set<C_NotifyInstanceRequest *> suspended_ops;
+ std::swap(m_suspended_ops, suspended_ops);
+
+ for (auto op : suspended_ops) {
+ op->send();
+ }
+}
+
template <typename I>
Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
uint64_t request_id,
delete it->on_notify_ack;
m_requests.erase(it);
} else {
- ctx = new FunctionContext(
- [this, instance_id, request_id] (int r) {
- C_NotifyAck *on_notify_ack = nullptr;
- {
- // update request state in the requests list
- Mutex::Locker locker(m_lock);
- Request request(instance_id, request_id);
- auto it = m_requests.find(request);
- assert(it != m_requests.end());
- on_notify_ack = it->on_notify_ack;
- m_requests.erase(it);
- }
-
- ::encode(NotifyAckPayload(instance_id, request_id, r),
- on_notify_ack->out);
- on_notify_ack->complete(0);
- });
+ ctx = create_async_context_callback(
+ m_work_queue, new FunctionContext(
+ [this, instance_id, request_id] (int r) {
+ complete_request(instance_id, request_id, r);
+ }));
}
request.on_notify_ack = on_notify_ack;
return ctx;
}
+template <typename I>
+void InstanceWatcher<I>::complete_request(const std::string &instance_id,
+ uint64_t request_id, int r) {
+ dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
+ << dendl;
+
+ C_NotifyAck *on_notify_ack;
+ {
+ Mutex::Locker locker(m_lock);
+ Request request(instance_id, request_id);
+ auto it = m_requests.find(request);
+ assert(it != m_requests.end());
+ on_notify_ack = it->on_notify_ack;
+ m_requests.erase(it);
+ }
+
+ ::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
+ on_notify_ack->complete(0);
+}
+
template <typename I>
void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
uint64_t notifier_id, bufferlist &bl) {
auto ctx = new FunctionContext(
[this, global_image_id, peer_mirror_uuid, peer_image_id,
on_finish] (int r) {
- m_instance_replayer->acquire_image(global_image_id, peer_mirror_uuid,
- peer_image_id, on_finish);
+ m_instance_replayer->acquire_image(this, global_image_id,
+ peer_mirror_uuid, peer_image_id,
+ on_finish);
m_notify_op_tracker.finish_op();
});
m_work_queue->queue(ctx, 0);
}
+template <typename I>
+void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
+ const std::string &sync_id,
+ Context *on_finish) {
+ dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ if (m_image_sync_throttler == nullptr) {
+ dout(20) << "sync request for non-leader" << dendl;
+ m_work_queue->queue(on_finish, -ESTALE);
+ return;
+ }
+
+ Context *on_start = create_async_context_callback(
+ m_work_queue, new FunctionContext(
+ [this, instance_id, sync_id, on_finish] (int r) {
+ dout(20) << "handle_sync_request: finish: instance_id=" << instance_id
+ << ", sync_id=" << sync_id << ", r=" << r << dendl;
+ if (r == 0) {
+ notify_sync_start(instance_id, sync_id);
+ }
+ on_finish->complete(r);
+ }));
+ m_image_sync_throttler->start_op(sync_id, on_start);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
+ const std::string &sync_id,
+ Context *on_finish) {
+ dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ auto it = m_inflight_sync_reqs.find(sync_id);
+ if (it == m_inflight_sync_reqs.end()) {
+ dout(20) << "not found" << dendl;
+ m_work_queue->queue(on_finish, 0);
+ return;
+ }
+
+ auto sync_ctx = it->second;
+
+ if (sync_ctx->on_complete != nullptr) {
+ dout(20) << "duplicate request" << dendl;
+ m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
+ }
+
+ sync_ctx->on_complete = on_finish;
+}
+
template <typename I>
void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
const ImageAcquirePayload &payload,
}
}
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+ const SyncRequestPayload &payload,
+ C_NotifyAck *on_notify_ack) {
+ dout(20) << "sync_request: instance_id=" << instance_id << ", "
+ << "request_id=" << payload.request_id << dendl;
+
+ auto on_finish = prepare_request(instance_id, payload.request_id,
+ on_notify_ack);
+ if (on_finish == nullptr) {
+ return;
+ }
+
+ handle_sync_request(instance_id, payload.sync_id, on_finish);
+}
+
+template <typename I>
+void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
+ const SyncStartPayload &payload,
+ C_NotifyAck *on_notify_ack) {
+ dout(20) << "sync_start: instance_id=" << instance_id << ", "
+ << "request_id=" << payload.request_id << dendl;
+
+ auto on_finish = prepare_request(instance_id, payload.request_id,
+ on_notify_ack);
+ if (on_finish == nullptr) {
+ return;
+ }
+
+ handle_sync_start(instance_id, payload.sync_id, on_finish);
+}
+
template <typename I>
void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
const UnknownPayload &payload,
#define CEPH_RBD_MIRROR_INSTANCE_WATCHER_H
#include <map>
+#include <memory>
#include <set>
#include <string>
#include <vector>
namespace rbd {
namespace mirror {
+template <typename> class ImageSyncThrottler;
template <typename> class InstanceReplayer;
template <typename> struct Threads;
const std::string &peer_image_id,
bool schedule_delete, Context *on_notify_ack);
+ void notify_sync_request(const std::string &sync_id, Context *on_sync_start);
+ bool cancel_sync_request(const std::string &sync_id);
+ void notify_sync_complete(const std::string &sync_id);
+
+ void print_sync_status(Formatter *f, stringstream *ss);
+
void cancel_notify_requests(const std::string &instance_id);
+ void handle_acquire_leader();
+ void handle_release_leader();
+ void handle_update_leader(const std::string &leader_instance_id);
+
private:
/**
* @verbatim
*/
struct C_NotifyInstanceRequest;
+ struct C_SyncRequest;
+
+ typedef std::pair<std::string, std::string> Id;
struct HandlePayloadVisitor : public boost::static_visitor<void> {
InstanceWatcher *instance_watcher;
Context *m_on_finish = nullptr;
int m_ret_val = 0;
bool m_removing = false;
+ std::string m_leader_instance_id;
librbd::managed_lock::Locker m_instance_locker;
std::set<std::pair<std::string, C_NotifyInstanceRequest *>> m_notify_ops;
AsyncOpTracker m_notify_op_tracker;
uint64_t m_request_seq = 0;
std::set<Request> m_requests;
+ std::set<C_NotifyInstanceRequest *> m_suspended_ops;
+ std::map<std::string, C_SyncRequest *> m_inflight_sync_reqs;
+ ImageSyncThrottler<ImageCtxT> *m_image_sync_throttler = nullptr;
void register_instance();
void handle_register_instance(int r);
void break_instance_lock();
void handle_break_instance_lock(int r);
+ void suspend_notify_request(C_NotifyInstanceRequest *req);
+ bool unsuspend_notify_request(C_NotifyInstanceRequest *req);
+ void unsuspend_notify_requests();
+
+ void handle_notify_sync_request(C_SyncRequest *sync_ctx, int r);
+ void handle_notify_sync_complete(C_SyncRequest *sync_ctx, int r);
+
+ void notify_sync_start(const std::string &instance_id,
+ const std::string &sync_id);
+
Context *prepare_request(const std::string &instance_id, uint64_t request_id,
C_NotifyAck *on_notify_ack);
+ void complete_request(const std::string &instance_id, uint64_t request_id,
+ int r);
void handle_notify(uint64_t notify_id, uint64_t handle,
uint64_t notifier_id, bufferlist &bl) override;
const std::string &peer_image_id,
bool schedule_delete, Context *on_finish);
+ void handle_sync_request(const std::string &instance_id,
+ const std::string &sync_id, Context *on_finish);
+ void handle_sync_start(const std::string &instance_id,
+ const std::string &sync_id, Context *on_finish);
+
void handle_payload(const std::string &instance_id,
const instance_watcher::ImageAcquirePayload &payload,
C_NotifyAck *on_notify_ack);
void handle_payload(const std::string &instance_id,
const instance_watcher::ImageReleasePayload &payload,
C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::SyncRequestPayload &payload,
+ C_NotifyAck *on_notify_ack);
+ void handle_payload(const std::string &instance_id,
+ const instance_watcher::SyncStartPayload &payload,
+ C_NotifyAck *on_notify_ack);
void handle_payload(const std::string &instance_id,
const instance_watcher::UnknownPayload &payload,
C_NotifyAck *on_notify_ack);
delete m_leader_lock;
}
+template <typename I>
+std::string LeaderWatcher<I>::get_instance_id() {
+ return stringify(m_notifier_id);
+}
+
template <typename I>
int LeaderWatcher<I>::init() {
C_SaferCond init_ctx;
return;
}
+ bool notify_listener = false;
if (m_locker != locker) {
m_locker = locker;
+ notify_listener = true;
if (m_acquire_attempts > 1) {
dout(10) << "new lock owner detected -- resetting heartbeat counter"
<< dendl;
dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
<< "failed attempts to acquire" << dendl;
break_leader_lock();
- } else {
- schedule_acquire_leader_lock(1);
+ return;
+ }
+
+ schedule_acquire_leader_lock(1);
+
+ if (!notify_listener) {
m_timer_op_tracker.finish_op();
+ return;
}
+
+ auto ctx = new FunctionContext(
+ [this](int r) {
+ std::string instance_id;
+ if (get_leader_instance_id(&instance_id)) {
+ m_listener->update_leader_handler(instance_id);
+ }
+ Mutex::Locker timer_locker(m_threads->timer_lock);
+ Mutex::Locker locker(m_lock);
+ m_timer_op_tracker.finish_op();
+ });
+ m_work_queue->queue(ctx, 0);
}
template <typename I>
virtual void post_acquire_handler(Context *on_finish) = 0;
virtual void pre_release_handler(Context *on_finish) = 0;
+
+ virtual void update_leader_handler(
+ const std::string &leader_instance_id) = 0;
};
LeaderWatcher(Threads<ImageCtxT> *threads, librados::IoCtx &io_ctx,
void release_leader();
void list_instances(std::vector<std::string> *instance_ids);
+ std::string get_instance_id();
+
private:
/**
* @verbatim
dout(20) << "connected to " << m_peer << dendl;
- m_image_sync_throttler.reset(new ImageSyncThrottler<>());
-
m_instance_replayer.reset(
- InstanceReplayer<>::create(m_threads, m_image_deleter,
- m_image_sync_throttler, m_local_rados,
+ InstanceReplayer<>::create(m_threads, m_image_deleter, m_local_rados,
local_mirror_uuid, m_local_pool_id));
m_instance_replayer->init();
m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
&m_leader_listener));
+
r = m_leader_watcher->init();
if (r < 0) {
derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
admin_socket);
f->open_object_section("sync_throttler");
- m_image_sync_throttler->print_status(f, ss);
+ m_instance_watcher->print_sync_status(f, ss);
f->close_section();
m_instance_replayer->print_status(f, ss);
void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
dout(20) << dendl;
+
+ m_instance_watcher->handle_acquire_leader();
init_local_pool_watcher(on_finish);
}
void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
dout(20) << dendl;
+
+ m_instance_watcher->handle_release_leader();
shut_down_pool_watchers(on_finish);
}
m_instance_replayer->release_all(on_finish);
}
+void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
+ dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
+
+ m_instance_watcher->handle_update_leader(leader_instance_id);
+}
+
} // namespace mirror
} // namespace rbd
void wait_for_update_ops(Context *on_finish);
void handle_wait_for_update_ops(int r, Context *on_finish);
+ void handle_update_leader(const std::string &leader_instance_id);
+
Threads<librbd::ImageCtx> *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
- ImageSyncThrottlerRef<> m_image_sync_throttler;
mutable Mutex m_lock;
Cond m_cond;
std::atomic<bool> m_stopping = { false };
m_pool_replayer->handle_pre_release_leader(on_finish);
}
+ void update_leader_handler(
+ const std::string &leader_instance_id) override {
+ m_pool_replayer->handle_update_leader(leader_instance_id);
+ }
+
private:
PoolReplayer *m_pool_replayer;
} m_leader_listener;
#include "librbd/Utils.h"
#include "librbd/journal/Types.h"
#include "tools/rbd_mirror/ProgressContext.h"
-#include "tools/rbd_mirror/ImageSyncThrottler.h"
+#include "tools/rbd_mirror/ImageSync.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
BootstrapRequest<I>::BootstrapRequest(
librados::IoCtx &local_io_ctx,
librados::IoCtx &remote_io_ctx,
- std::shared_ptr<ImageSyncThrottler<I>> image_sync_throttler,
+ InstanceWatcher<I> *instance_watcher,
I **local_image_ctx,
const std::string &local_image_id,
const std::string &remote_image_id,
: BaseRequest("rbd::mirror::image_replayer::BootstrapRequest",
reinterpret_cast<CephContext*>(local_io_ctx.cct()), on_finish),
m_local_io_ctx(local_io_ctx), m_remote_io_ctx(remote_io_ctx),
- m_image_sync_throttler(image_sync_throttler),
- m_local_image_ctx(local_image_ctx), m_local_image_id(local_image_id),
- m_remote_image_id(remote_image_id), m_global_image_id(global_image_id),
- m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
+ m_instance_watcher(instance_watcher), m_local_image_ctx(local_image_ctx),
+ m_local_image_id(local_image_id), m_remote_image_id(remote_image_id),
+ m_global_image_id(global_image_id), m_work_queue(work_queue),
+ m_timer(timer), m_timer_lock(timer_lock),
m_local_mirror_uuid(local_mirror_uuid),
m_remote_mirror_uuid(remote_mirror_uuid), m_journaler(journaler),
m_client_meta(client_meta), m_progress_ctx(progress_ctx),
Mutex::Locker locker(m_lock);
m_canceled = true;
- m_image_sync_throttler->cancel_sync(m_local_image_id);
+ if (m_image_sync != nullptr) {
+ m_image_sync->cancel();
+ }
}
template <typename I>
{
Mutex::Locker locker(m_lock);
- if (!m_canceled) {
- m_image_sync_throttler->start_sync(*m_local_image_ctx,
- m_remote_image_ctx, m_timer,
- m_timer_lock,
- m_local_mirror_uuid, m_journaler,
- m_client_meta, m_work_queue, ctx,
- m_progress_ctx);
+ if (m_canceled) {
+ m_ret_val = -ECANCELED;
+ } else {
+ assert(m_image_sync == nullptr);
+ m_image_sync = ImageSync<I>::create(
+ *m_local_image_ctx, m_remote_image_ctx, m_timer, m_timer_lock,
+ m_local_mirror_uuid, m_journaler, m_client_meta, m_work_queue,
+ m_instance_watcher, ctx, m_progress_ctx);
+
+ m_image_sync->get();
+ m_image_sync->send();
return;
}
}
dout(10) << ": request canceled" << dendl;
- m_ret_val = -ECANCELED;
close_remote_image();
}
void BootstrapRequest<I>::handle_image_sync(int r) {
dout(20) << ": r=" << r << dendl;
- if (m_canceled) {
- dout(10) << ": request canceled" << dendl;
- m_ret_val = -ECANCELED;
- }
+ {
+ Mutex::Locker locker(m_lock);
- if (r < 0) {
- derr << ": failed to sync remote image: " << cpp_strerror(r) << dendl;
- m_ret_val = r;
+ m_image_sync->put();
+ m_image_sync = nullptr;
+
+ if (m_canceled) {
+ dout(10) << ": request canceled" << dendl;
+ m_ret_val = -ECANCELED;
+ }
+
+ if (r < 0) {
+ derr << ": failed to sync remote image: " << cpp_strerror(r) << dendl;
+ m_ret_val = r;
+ }
}
close_remote_image();
class ProgressContext;
+template <typename> class ImageSync;
+template <typename> class InstanceWatcher;
+
namespace image_replayer {
template <typename ImageCtxT = librbd::ImageCtx>
static BootstrapRequest* create(
librados::IoCtx &local_io_ctx,
librados::IoCtx &remote_io_ctx,
- ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+ InstanceWatcher<ImageCtxT> *instance_watcher,
ImageCtxT **local_image_ctx,
const std::string &local_image_id,
const std::string &remote_image_id,
bool *do_resync,
ProgressContext *progress_ctx = nullptr) {
return new BootstrapRequest(local_io_ctx, remote_io_ctx,
- image_sync_throttler, local_image_ctx,
+ instance_watcher, local_image_ctx,
local_image_id, remote_image_id,
global_image_id, work_queue, timer, timer_lock,
local_mirror_uuid, remote_mirror_uuid,
BootstrapRequest(librados::IoCtx &local_io_ctx,
librados::IoCtx &remote_io_ctx,
- ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+ InstanceWatcher<ImageCtxT> *instance_watcher,
ImageCtxT **local_image_ctx,
const std::string &local_image_id,
const std::string &remote_image_id,
librados::IoCtx &m_local_io_ctx;
librados::IoCtx &m_remote_io_ctx;
- ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
+ InstanceWatcher<ImageCtxT> *m_instance_watcher;
ImageCtxT **m_local_image_ctx;
std::string m_local_image_id;
std::string m_remote_image_id;
MirrorPeerClientMeta *m_client_meta;
ProgressContext *m_progress_ctx;
bool *m_do_resync;
+
Mutex m_lock;
bool m_canceled = false;
ImageCtxT *m_remote_image_ctx = nullptr;
bool m_primary = false;
int m_ret_val = 0;
+ ImageSync<ImageCtxT> *m_image_sync = nullptr;
bufferlist m_out_bl;
} // anonymous namespace
-void ImagePayloadBase::encode(bufferlist &bl) const {
+void PayloadBase::encode(bufferlist &bl) const {
::encode(request_id, bl);
+}
+
+void PayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
+ ::decode(request_id, iter);
+}
+
+void PayloadBase::dump(Formatter *f) const {
+ f->dump_unsigned("request_id", request_id);
+}
+
+void ImagePayloadBase::encode(bufferlist &bl) const {
+ PayloadBase::encode(bl);
::encode(global_image_id, bl);
::encode(peer_mirror_uuid, bl);
::encode(peer_image_id, bl);
}
void ImagePayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
- ::decode(request_id, iter);
+ PayloadBase::decode(version, iter);
::decode(global_image_id, iter);
::decode(peer_mirror_uuid, iter);
::decode(peer_image_id, iter);
}
void ImagePayloadBase::dump(Formatter *f) const {
- f->dump_unsigned("request_id", request_id);
+ PayloadBase::dump(f);
f->dump_string("global_image_id", global_image_id);
f->dump_string("peer_mirror_uuid", peer_mirror_uuid);
f->dump_string("peer_image_id", peer_image_id);
f->dump_bool("schedule_delete", schedule_delete);
}
+void SyncPayloadBase::encode(bufferlist &bl) const {
+ PayloadBase::encode(bl);
+ ::encode(sync_id, bl);
+}
+
+void SyncPayloadBase::decode(__u8 version, bufferlist::iterator &iter) {
+ PayloadBase::decode(version, iter);
+ ::decode(sync_id, iter);
+}
+
+void SyncPayloadBase::dump(Formatter *f) const {
+ PayloadBase::dump(f);
+ f->dump_string("sync_id", sync_id);
+}
+
void UnknownPayload::encode(bufferlist &bl) const {
assert(false);
}
case NOTIFY_OP_IMAGE_RELEASE:
payload = ImageReleasePayload();
break;
+ case NOTIFY_OP_SYNC_REQUEST:
+ payload = SyncRequestPayload();
+ break;
+ case NOTIFY_OP_SYNC_START:
+ payload = SyncStartPayload();
+ break;
default:
payload = UnknownPayload();
break;
o.push_back(new NotifyMessage(ImageReleasePayload()));
o.push_back(new NotifyMessage(ImageReleasePayload(1, "gid", "uuid", "id",
true)));
+
+ o.push_back(new NotifyMessage(SyncRequestPayload()));
+ o.push_back(new NotifyMessage(SyncRequestPayload(1, "sync_id")));
+
+ o.push_back(new NotifyMessage(SyncStartPayload()));
+ o.push_back(new NotifyMessage(SyncStartPayload(1, "sync_id")));
}
std::ostream &operator<<(std::ostream &out, const NotifyOp &op) {
case NOTIFY_OP_IMAGE_RELEASE:
out << "ImageRelease";
break;
+ case NOTIFY_OP_SYNC_REQUEST:
+ out << "SyncRequest";
+ break;
+ case NOTIFY_OP_SYNC_START:
+ out << "SyncStart";
+ break;
default:
out << "Unknown (" << static_cast<uint32_t>(op) << ")";
break;
enum NotifyOp {
NOTIFY_OP_IMAGE_ACQUIRE = 0,
NOTIFY_OP_IMAGE_RELEASE = 1,
+ NOTIFY_OP_SYNC_REQUEST = 2,
+ NOTIFY_OP_SYNC_START = 3,
};
-struct ImagePayloadBase {
+struct PayloadBase {
uint64_t request_id;
+
+ PayloadBase() : request_id(0) {
+ }
+
+ PayloadBase(uint64_t request_id) : request_id(request_id) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+ void dump(Formatter *f) const;
+};
+
+struct ImagePayloadBase : public PayloadBase {
std::string global_image_id;
std::string peer_mirror_uuid;
std::string peer_image_id;
- ImagePayloadBase() : request_id(0) {
+ ImagePayloadBase() : PayloadBase() {
}
ImagePayloadBase(uint64_t request_id, const std::string &global_image_id,
const std::string &peer_mirror_uuid,
const std::string &peer_image_id)
- : request_id(request_id), global_image_id(global_image_id),
+ : PayloadBase(request_id), global_image_id(global_image_id),
peer_mirror_uuid(peer_mirror_uuid), peer_image_id(peer_image_id) {
}
void dump(Formatter *f) const;
};
+struct SyncPayloadBase : public PayloadBase {
+ std::string sync_id;
+
+ SyncPayloadBase() : PayloadBase() {
+ }
+
+ SyncPayloadBase(uint64_t request_id, const std::string &sync_id)
+ : PayloadBase(request_id), sync_id(sync_id) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &iter);
+ void dump(Formatter *f) const;
+};
+
+struct SyncRequestPayload : public SyncPayloadBase {
+ static const NotifyOp NOTIFY_OP = NOTIFY_OP_SYNC_REQUEST;
+
+ SyncRequestPayload() : SyncPayloadBase() {
+ }
+
+ SyncRequestPayload(uint64_t request_id, const std::string &sync_id)
+ : SyncPayloadBase(request_id, sync_id) {
+ }
+};
+
+struct SyncStartPayload : public SyncPayloadBase {
+ static const NotifyOp NOTIFY_OP = NOTIFY_OP_SYNC_START;
+
+ SyncStartPayload() : SyncPayloadBase() {
+ }
+
+ SyncStartPayload(uint64_t request_id, const std::string &sync_id)
+ : SyncPayloadBase(request_id, sync_id) {
+ }
+};
+
struct UnknownPayload {
static const NotifyOp NOTIFY_OP = static_cast<NotifyOp>(-1);
typedef boost::variant<ImageAcquirePayload,
ImageReleasePayload,
+ SyncRequestPayload,
+ SyncStartPayload,
UnknownPayload> Payload;
struct NotifyMessage {
namespace leader_watcher {
enum NotifyOp {
- NOTIFY_OP_HEARTBEAT = 0,
- NOTIFY_OP_LOCK_ACQUIRED = 1,
- NOTIFY_OP_LOCK_RELEASED = 2,
+ NOTIFY_OP_HEARTBEAT = 0,
+ NOTIFY_OP_LOCK_ACQUIRED = 1,
+ NOTIFY_OP_LOCK_RELEASED = 2,
};
struct HeartbeatPayload {
// vim: ts=8 sw=2 smarttab
#include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
-#include "common/dout.h"
+#include "common/debug.h"
#include "common/errno.h"
#include "cls/rbd/cls_rbd_client.h"
#include "librbd/Utils.h"
#include <vector>
#include "include/rbd/librbd.hpp"
-#include "ImageSyncThrottler.h"
namespace rbd {
namespace mirror {
typedef shared_ptr<librados::IoCtx> IoCtxRef;
typedef shared_ptr<librbd::Image> ImageRef;
-template <typename I = librbd::ImageCtx>
-using ImageSyncThrottlerRef = std::shared_ptr<ImageSyncThrottler<I>>;
-
struct ImageId {
std::string global_id;
std::string id;