s_instance = this;
}
- MOCK_METHOD0(start, void());
+ void put() {
+ }
+
+ void get() {
+ }
+
+ MOCK_METHOD0(send, void());
+ MOCK_METHOD0(cancel, void());
};
ImageSync<librbd::MockTestImageCtx>* ImageSync<librbd::MockTestImageCtx>::s_instance = nullptr;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::WithArg;
+using ::testing::InvokeWithoutArgs;
class TestMockImageSyncImageCopyRequest : public TestMockFixture {
public:
expect_get_object_count(mock_remote_image_ctx, 2);
expect_update_client(mock_journaler, 0);
expect_object_copy_send(mock_object_copy_request);
- expect_update_client(mock_journaler, 0);
C_SaferCond ctx;
MockImageCopyRequest *request = create_request(mock_remote_image_ctx,
request->cancel();
ASSERT_TRUE(complete_object_copy(mock_object_copy_request, 0, 0));
- ASSERT_EQ(0, ctx.wait());
+ ASSERT_EQ(-ECANCELED, ctx.wait());
+}
+
+TEST_F(TestMockImageSyncImageCopyRequest, Cancel1) {
+ ASSERT_EQ(0, create_snap("snap1"));
+ m_client_meta.sync_points = {{"snap1", boost::none}};
+
+ librbd::MockImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
+ librbd::MockImageCtx mock_local_image_ctx(*m_local_image_ctx);
+ journal::MockJournaler mock_journaler;
+ MockObjectCopyRequest mock_object_copy_request;
+
+ C_SaferCond ctx;
+ MockImageCopyRequest *request = create_request(mock_remote_image_ctx,
+ mock_local_image_ctx,
+ mock_journaler,
+ m_client_meta.sync_points.front(),
+ &ctx);
+
+ expect_get_snap_id(mock_remote_image_ctx);
+
+ InSequence seq;
+ expect_get_object_count(mock_remote_image_ctx, 1);
+ expect_get_object_count(mock_remote_image_ctx, 0);
+ EXPECT_CALL(mock_journaler, update_client(_, _))
+ .WillOnce(DoAll(InvokeWithoutArgs([request]() {
+ request->cancel();
+ }),
+ WithArg<1>(CompleteContext(0))));
+
+ request->send();
+ ASSERT_EQ(-ECANCELED, ctx.wait());
}
TEST_F(TestMockImageSyncImageCopyRequest, MissingSnap) {
ASSERT_EQ(-EINVAL, ctx.wait());
}
+TEST_F(TestMockImageSyncSnapshotCopyRequest, UpdateClientCancel) {
+ librbd::MockImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
+ librbd::MockImageCtx mock_local_image_ctx(*m_local_image_ctx);
+ journal::MockJournaler mock_journaler;
+
+ C_SaferCond ctx;
+ MockSnapshotCopyRequest *request = create_request(mock_remote_image_ctx,
+ mock_local_image_ctx,
+ mock_journaler, &ctx);
+ InSequence seq;
+ EXPECT_CALL(mock_journaler, update_client(_, _))
+ .WillOnce(DoAll(InvokeWithoutArgs([request]() {
+ request->cancel();
+ }),
+ WithArg<1>(CompleteContext(0))));
+
+ request->send();
+ ASSERT_EQ(-ECANCELED, ctx.wait());
+}
+
TEST_F(TestMockImageSyncSnapshotCopyRequest, SnapCreate) {
ASSERT_EQ(0, create_snap(m_remote_image_ctx, "snap1"));
ASSERT_EQ(0, create_snap(m_remote_image_ctx, "snap2"));
ASSERT_EQ(-EINVAL, ctx.wait());
}
+TEST_F(TestMockImageSyncSnapshotCopyRequest, SnapCreateCancel) {
+ ASSERT_EQ(0, create_snap(m_remote_image_ctx, "snap1"));
+
+ librbd::MockImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
+ librbd::MockImageCtx mock_local_image_ctx(*m_local_image_ctx);
+ MockSnapshotCreateRequest mock_snapshot_create_request;
+ journal::MockJournaler mock_journaler;
+
+ C_SaferCond ctx;
+ MockSnapshotCopyRequest *request = create_request(mock_remote_image_ctx,
+ mock_local_image_ctx,
+ mock_journaler, &ctx);
+ InSequence seq;
+ EXPECT_CALL(mock_snapshot_create_request, send())
+ .WillOnce(DoAll(InvokeWithoutArgs([request]() {
+ request->cancel();
+ }),
+ Invoke([this, &mock_snapshot_create_request]() {
+ m_threads->work_queue->queue(mock_snapshot_create_request.on_finish, 0);
+ })));
+
+ request->send();
+ ASSERT_EQ(-ECANCELED, ctx.wait());
+}
+
TEST_F(TestMockImageSyncSnapshotCopyRequest, SnapRemoveAndCreate) {
ASSERT_EQ(0, create_snap(m_remote_image_ctx, "snap1"));
ASSERT_EQ(0, create_snap(m_local_image_ctx, "snap1"));
ASSERT_EQ(-EBUSY, ctx.wait());
}
+TEST_F(TestMockImageSyncSnapshotCopyRequest, SnapUnprotectCancel) {
+ ASSERT_EQ(0, create_snap(m_remote_image_ctx, "snap1", true));
+ ASSERT_EQ(0, create_snap(m_local_image_ctx, "snap1", true));
+
+ uint64_t remote_snap_id1 = m_remote_image_ctx->snap_ids["snap1"];
+ uint64_t local_snap_id1 = m_local_image_ctx->snap_ids["snap1"];
+ m_client_meta.snap_seqs[remote_snap_id1] = local_snap_id1;
+
+ librbd::MockImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
+ librbd::MockImageCtx mock_local_image_ctx(*m_local_image_ctx);
+ journal::MockJournaler mock_journaler;
+
+ C_SaferCond ctx;
+ MockSnapshotCopyRequest *request = create_request(mock_remote_image_ctx,
+ mock_local_image_ctx,
+ mock_journaler, &ctx);
+ InSequence seq;
+ expect_snap_is_unprotected(mock_local_image_ctx, local_snap_id1, false, 0);
+ expect_snap_is_unprotected(mock_remote_image_ctx, remote_snap_id1, true, 0);
+ EXPECT_CALL(*mock_local_image_ctx.operations,
+ execute_snap_unprotect(StrEq("snap1"), _))
+ .WillOnce(DoAll(InvokeWithoutArgs([request]() {
+ request->cancel();
+ }),
+ WithArg<1>(Invoke([this](Context *ctx) {
+ m_threads->work_queue->queue(ctx, 0);
+ }))));
+
+ request->send();
+ ASSERT_EQ(-ECANCELED, ctx.wait());
+}
+
TEST_F(TestMockImageSyncSnapshotCopyRequest, SnapUnprotectRemove) {
ASSERT_EQ(0, create_snap(m_remote_image_ctx, "snap1", true));
ASSERT_EQ(0, create_snap(m_local_image_ctx, "snap1", true));
ASSERT_EQ(-EINVAL, ctx.wait());
}
+TEST_F(TestMockImageSyncSnapshotCopyRequest, SnapProtectCancel) {
+ ASSERT_EQ(0, create_snap(m_remote_image_ctx, "snap1", true));
+ ASSERT_EQ(0, create_snap(m_local_image_ctx, "snap1", true));
+
+ uint64_t remote_snap_id1 = m_remote_image_ctx->snap_ids["snap1"];
+ uint64_t local_snap_id1 = m_local_image_ctx->snap_ids["snap1"];
+ m_client_meta.snap_seqs[remote_snap_id1] = local_snap_id1;
+
+ librbd::MockImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
+ librbd::MockImageCtx mock_local_image_ctx(*m_local_image_ctx);
+ journal::MockJournaler mock_journaler;
+
+ C_SaferCond ctx;
+ MockSnapshotCopyRequest *request = create_request(mock_remote_image_ctx,
+ mock_local_image_ctx,
+ mock_journaler, &ctx);
+ InSequence seq;
+ expect_snap_is_unprotected(mock_local_image_ctx, local_snap_id1, true, 0);
+ expect_snap_is_protected(mock_remote_image_ctx, remote_snap_id1, true, 0);
+ expect_snap_is_protected(mock_local_image_ctx, local_snap_id1, false, 0);
+ EXPECT_CALL(*mock_local_image_ctx.operations,
+ execute_snap_protect(StrEq("snap1"), _))
+ .WillOnce(DoAll(InvokeWithoutArgs([request]() {
+ request->cancel();
+ }),
+ WithArg<1>(Invoke([this](Context *ctx) {
+ m_threads->work_queue->queue(ctx, 0);
+ }))));
+
+ request->send();
+ ASSERT_EQ(-ECANCELED, ctx.wait());
+}
+
} // namespace image_sync
} // namespace mirror
} // namespace rbd
TEST_F(TestImageSync, Empty) {
C_SaferCond ctx;
ImageSync<> *request = create_request(&ctx);
- request->start();
+ request->send();
ASSERT_EQ(0, ctx.wait());
ASSERT_EQ(0U, m_client_meta.sync_points.size());
C_SaferCond ctx;
ImageSync<> *request = create_request(&ctx);
- request->start();
+ request->send();
ASSERT_EQ(0, ctx.wait());
int64_t object_size = std::min<int64_t>(
C_SaferCond ctx;
ImageSync<> *request = create_request(&ctx);
- request->start();
+ request->send();
ASSERT_EQ(0, ctx.wait());
int64_t object_size = std::min<int64_t>(
s_instance = this;
}
+ void put() {
+ }
+
+ void get() {
+ }
+
MOCK_METHOD0(send, void());
+ MOCK_METHOD0(cancel, void());
};
template<>
ImageCopyRequest() {
s_instance = this;
}
+
+ void put() {
+ }
+
+ void get() {
+ }
+
MOCK_METHOD0(cancel, void());
MOCK_METHOD0(send, void());
};
SnapshotCopyRequest() {
s_instance = this;
}
+
+ void put() {
+ }
+
+ void get() {
+ }
+
MOCK_METHOD0(send, void());
+ MOCK_METHOD0(cancel, void());
};
template <>
using ::testing::Invoke;
using ::testing::Return;
using ::testing::WithArg;
+using ::testing::InvokeWithoutArgs;
class TestMockImageSync : public TestMockFixture {
public:
MockImageSync *request = create_request(mock_remote_image_ctx,
mock_local_image_ctx,
mock_journaler, &ctx);
- request->start();
+ request->send();
ASSERT_EQ(0, ctx.wait());
}
MockImageSync *request = create_request(mock_remote_image_ctx,
mock_local_image_ctx,
mock_journaler, &ctx);
- request->start();
+ request->send();
ASSERT_EQ(0, ctx.wait());
}
MockImageSync *request = create_request(mock_remote_image_ctx,
mock_local_image_ctx,
mock_journaler, &ctx);
- request->start();
+ request->get();
+ request->send();
// cancel the image copy once it starts
ASSERT_EQ(0, image_copy_ctx.wait());
request->cancel();
+ request->put();
m_threads->work_queue->queue(mock_image_copy_request.on_finish, 0);
ASSERT_EQ(-ECANCELED, ctx.wait());
}
+TEST_F(TestMockImageSync, CancelAfterCopySnapshots) {
+ librbd::MockImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
+ librbd::MockImageCtx mock_local_image_ctx(*m_local_image_ctx);
+ journal::MockJournaler mock_journaler;
+ MockSnapshotCopyRequest mock_snapshot_copy_request;
+ MockSyncPointCreateRequest mock_sync_point_create_request;
+
+ librbd::MockObjectMap *mock_object_map = new librbd::MockObjectMap();
+ mock_local_image_ctx.object_map = mock_object_map;
+ expect_test_features(mock_local_image_ctx);
+
+ C_SaferCond ctx;
+ MockImageSync *request = create_request(mock_remote_image_ctx,
+ mock_local_image_ctx,
+ mock_journaler, &ctx);
+ InSequence seq;
+ expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
+ EXPECT_CALL(mock_snapshot_copy_request, send())
+ .WillOnce((DoAll(InvokeWithoutArgs([request]() {
+ request->cancel();
+ }),
+ Invoke([this, &mock_snapshot_copy_request]() {
+ m_threads->work_queue->queue(mock_snapshot_copy_request.on_finish, 0);
+ }))));
+ EXPECT_CALL(mock_snapshot_copy_request, cancel());
+
+ request->send();
+ ASSERT_EQ(-ECANCELED, ctx.wait());
+}
+
+TEST_F(TestMockImageSync, CancelAfterCopyImage) {
+ librbd::MockImageCtx mock_remote_image_ctx(*m_remote_image_ctx);
+ librbd::MockImageCtx mock_local_image_ctx(*m_local_image_ctx);
+ journal::MockJournaler mock_journaler;
+ MockImageCopyRequest mock_image_copy_request;
+ MockSnapshotCopyRequest mock_snapshot_copy_request;
+ MockSyncPointCreateRequest mock_sync_point_create_request;
+ MockSyncPointPruneRequest mock_sync_point_prune_request;
+
+ librbd::MockObjectMap *mock_object_map = new librbd::MockObjectMap();
+ mock_local_image_ctx.object_map = mock_object_map;
+ expect_test_features(mock_local_image_ctx);
+
+ C_SaferCond ctx;
+ MockImageSync *request = create_request(mock_remote_image_ctx,
+ mock_local_image_ctx,
+ mock_journaler, &ctx);
+ InSequence seq;
+ expect_create_sync_point(mock_local_image_ctx, mock_sync_point_create_request, 0);
+ expect_copy_snapshots(mock_snapshot_copy_request, 0);
+ EXPECT_CALL(mock_image_copy_request, send())
+ .WillOnce((DoAll(InvokeWithoutArgs([request]() {
+ request->cancel();
+ }),
+ Invoke([this, &mock_image_copy_request]() {
+ m_threads->work_queue->queue(mock_image_copy_request.on_finish, 0);
+ }))));
+ EXPECT_CALL(mock_image_copy_request, cancel());
+
+ request->send();
+ ASSERT_EQ(-ECANCELED, ctx.wait());
+}
+
} // namespace mirror
} // namespace rbd
tools/rbd_mirror/image_sync/SyncPointPruneRequest.cc
noinst_LTLIBRARIES += librbd_mirror_internal.la
noinst_HEADERS += \
+ tools/rbd_mirror/BaseRequest.h \
tools/rbd_mirror/ClusterWatcher.h \
tools/rbd_mirror/ImageReplayer.h \
tools/rbd_mirror/ImageSync.h \
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_MIRROR_BASE_REQUEST_H
+#define CEPH_RBD_MIRROR_BASE_REQUEST_H
+
+#include "common/RefCountedObj.h"
+
+namespace rbd {
+namespace mirror {
+
+class BaseRequest : public RefCountedObject {
+public:
+ BaseRequest(const std::string& name, CephContext *cct, Context *on_finish)
+ : RefCountedObject(cct, 1), m_name(name), m_cct(cct),
+ m_on_finish(on_finish) {
+ }
+
+ virtual void send() = 0;
+ virtual void cancel() {}
+
+protected:
+ void finish(int r) {
+ if (m_cct) {
+ lsubdout(m_cct, rbd_mirror, 20) << m_name << "::finish: r=" << r << dendl;
+ }
+ if (m_on_finish) {
+ m_on_finish->complete(r);
+ }
+ put();
+ }
+
+private:
+ const std::string m_name;
+ CephContext *m_cct;
+ Context *m_on_finish;
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_BASE_REQUEST_H
struct ReplayHandler : public ::journal::ReplayHandler {
ImageReplayer<I> *replayer;
ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
-virtual void get() {}
+ virtual void get() {}
virtual void put() {}
virtual void handle_entries_available() {
assert(m_replay_handler == nullptr);
assert(m_on_start_finish == nullptr);
assert(m_on_stop_finish == nullptr);
+ assert(m_bootstrap_request == nullptr);
assert(m_in_flight_status_updates == 0);
delete m_asok_hook;
}
dout(20) << "bootstrap params: "
<< "local_image_name=" << m_local_image_name << dendl;
- // TODO: add a new bootstrap state and support canceling
Context *ctx = create_context_callback<
ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
{
Mutex::Locker locker(m_lock);
+ request->get();
m_bootstrap_request = request;
}
{
Mutex::Locker locker(m_lock);
+ m_bootstrap_request->put();
m_bootstrap_request = nullptr;
if (m_local_image_ctx) {
m_local_image_id = m_local_image_ctx->id;
} else {
if (!is_stopped_()) {
if (m_state == STATE_STARTING) {
- dout(20) << "interrupting start" << dendl;
+ dout(20) << "canceling start" << dendl;
+ if (m_bootstrap_request) {
+ m_bootstrap_request->cancel();
+ }
} else {
dout(20) << "interrupting replay" << dendl;
shut_down_replay = true;
MirrorPeerClientMeta *client_meta,
ContextWQ *work_queue, Context *on_finish,
ProgressContext *progress_ctx)
- : m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
+ : BaseRequest("rbd::mirror::ImageSync", local_image_ctx->cct, on_finish),
+ m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
m_timer(timer), m_timer_lock(timer_lock), m_mirror_uuid(mirror_uuid),
m_journaler(journaler), m_client_meta(client_meta),
- m_work_queue(work_queue), m_on_finish(on_finish),
- m_progress_ctx(progress_ctx),
+ m_work_queue(work_queue), m_progress_ctx(progress_ctx),
m_lock(unique_lock_name("ImageSync::m_lock", this)) {
}
template <typename I>
-void ImageSync<I>::start() {
+ImageSync<I>::~ImageSync() {
+ assert(m_snapshot_copy_request == nullptr);
+ assert(m_image_copy_request == nullptr);
+}
+
+template <typename I>
+void ImageSync<I>::send() {
send_prune_catch_up_sync_point();
}
ldout(cct, 20) << dendl;
m_canceled = true;
+
+ if (m_snapshot_copy_request != nullptr) {
+ m_snapshot_copy_request->cancel();
+ }
+
if (m_image_copy_request != nullptr) {
m_image_copy_request->cancel();
}
template <typename I>
void ImageSync<I>::send_copy_snapshots() {
- update_progress("COPY_SNAPSHOTS");
+ m_lock.Lock();
+ if (m_canceled) {
+ m_lock.Unlock();
+ finish(-ECANCELED);
+ return;
+ }
CephContext *cct = m_local_image_ctx->cct;
ldout(cct, 20) << dendl;
Context *ctx = create_context_callback<
ImageSync<I>, &ImageSync<I>::handle_copy_snapshots>(this);
- SnapshotCopyRequest<I> *request = SnapshotCopyRequest<I>::create(
+ m_snapshot_copy_request = SnapshotCopyRequest<I>::create(
m_local_image_ctx, m_remote_image_ctx, &m_snap_map, m_journaler,
m_client_meta, m_work_queue, ctx);
- request->send();
+ m_snapshot_copy_request->get();
+ m_lock.Unlock();
+
+ update_progress("COPY_SNAPSHOTS");
+
+ m_snapshot_copy_request->send();
}
template <typename I>
CephContext *cct = m_local_image_ctx->cct;
ldout(cct, 20) << ": r=" << r << dendl;
- if (r < 0) {
+ {
+ Mutex::Locker locker(m_lock);
+ m_snapshot_copy_request->put();
+ m_snapshot_copy_request = nullptr;
+ if (r == 0 && m_canceled) {
+ r = -ECANCELED;
+ }
+ }
+
+ if (r == -ECANCELED) {
+ ldout(cct, 10) << ": snapshot copy canceled" << dendl;
+ finish(r);
+ return;
+ } else if (r < 0) {
lderr(cct) << ": failed to copy snapshot metadata: "
<< cpp_strerror(r) << dendl;
finish(r);
m_local_image_ctx, m_remote_image_ctx, m_timer, m_timer_lock,
m_journaler, m_client_meta, &m_client_meta->sync_points.front(),
ctx, m_progress_ctx);
+ m_image_copy_request->get();
m_lock.Unlock();
update_progress("COPY_IMAGE");
template <typename I>
void ImageSync<I>::handle_copy_image(int r) {
+ CephContext *cct = m_local_image_ctx->cct;
+ ldout(cct, 20) << ": r=" << r << dendl;
+
{
Mutex::Locker locker(m_lock);
+ m_image_copy_request->put();
m_image_copy_request = nullptr;
if (r == 0 && m_canceled) {
r = -ECANCELED;
}
}
- CephContext *cct = m_local_image_ctx->cct;
- ldout(cct, 20) << ": r=" << r << dendl;
-
if (r == -ECANCELED) {
ldout(cct, 10) << ": image copy canceled" << dendl;
finish(r);
finish(0);
}
-template <typename I>
-void ImageSync<I>::finish(int r) {
- CephContext *cct = m_local_image_ctx->cct;
- ldout(cct, 20) << ": r=" << r << dendl;
-
- m_on_finish->complete(r);
- delete this;
-}
-
template <typename I>
void ImageSync<I>::update_progress(const std::string &description) {
dout(20) << ": " << description << dendl;
#include "librbd/ImageCtx.h"
#include "librbd/journal/TypeTraits.h"
#include "common/Mutex.h"
+#include "tools/rbd_mirror/BaseRequest.h"
#include <map>
#include <vector>
class ProgressContext;
namespace image_sync { template <typename> class ImageCopyRequest; }
+namespace image_sync { template <typename> class SnapshotCopyRequest; }
template <typename ImageCtxT = librbd::ImageCtx>
-class ImageSync {
+class ImageSync : public BaseRequest {
public:
typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
typedef typename TypeTraits::Journaler Journaler;
Journaler *journaler, MirrorPeerClientMeta *client_meta,
ContextWQ *work_queue, Context *on_finish,
ProgressContext *progress_ctx = nullptr);
+ ~ImageSync();
- void start();
+ void send();
void cancel();
private:
Journaler *m_journaler;
MirrorPeerClientMeta *m_client_meta;
ContextWQ *m_work_queue;
- Context *m_on_finish;
ProgressContext *m_progress_ctx;
SnapMap m_snap_map;
Mutex m_lock;
bool m_canceled = false;
- image_sync::ImageCopyRequest<ImageCtxT> *m_image_copy_request;
+ image_sync::SnapshotCopyRequest<ImageCtxT> *m_snapshot_copy_request = nullptr;
+ image_sync::ImageCopyRequest<ImageCtxT> *m_image_copy_request = nullptr;
decltype(ImageCtxT::object_map) m_object_map = nullptr;
void send_prune_catch_up_sync_point();
void send_prune_sync_points();
void handle_prune_sync_points(int r);
- void finish(int r);
-
void update_progress(const std::string &description);
};
using librbd::util::create_context_callback;
using librbd::util::create_rados_ack_callback;
+using librbd::util::unique_lock_name;
template <typename I>
BootstrapRequest<I>::BootstrapRequest(librados::IoCtx &local_io_ctx,
MirrorPeerClientMeta *client_meta,
Context *on_finish,
rbd::mirror::ProgressContext *progress_ctx)
- : m_local_io_ctx(local_io_ctx), m_remote_io_ctx(remote_io_ctx),
+ : BaseRequest("rbd::mirror::image_replayer::BootstrapRequest",
+ reinterpret_cast<CephContext*>(local_io_ctx.cct()), on_finish),
+ m_local_io_ctx(local_io_ctx), m_remote_io_ctx(remote_io_ctx),
m_local_image_ctx(local_image_ctx), m_local_image_name(local_image_name),
m_remote_image_id(remote_image_id), m_global_image_id(global_image_id),
m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
m_local_mirror_uuid(local_mirror_uuid),
m_remote_mirror_uuid(remote_mirror_uuid), m_journaler(journaler),
- m_client_meta(client_meta), m_on_finish(on_finish),
- m_progress_ctx(progress_ctx) {
+ m_client_meta(client_meta), m_progress_ctx(progress_ctx),
+ m_lock(unique_lock_name("BootstrapRequest::m_lock", this)) {
}
template <typename I>
BootstrapRequest<I>::~BootstrapRequest() {
+ assert(m_image_sync_request == nullptr);
assert(m_remote_image_ctx == nullptr);
}
get_local_image_id();
}
+template <typename I>
+void BootstrapRequest<I>::cancel() {
+ dout(20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_canceled = true;
+
+ if (m_image_sync_request) {
+ m_image_sync_request->cancel();
+ }
+}
+
template <typename I>
void BootstrapRequest<I>::get_local_image_id() {
dout(20) << dendl;
return;
}
+ if (m_canceled) {
+ dout(10) << ": request canceled" << dendl;
+ m_ret_val = -ECANCELED;
+ close_local_image();
+ return;
+ }
+
m_client_meta->image_id = m_local_image_id;
get_remote_tags();
}
return;
}
+ if (m_canceled) {
+ dout(10) << ": request canceled" << dendl;
+ m_ret_val = -ECANCELED;
+ close_local_image();
+ return;
+ }
+
// decode the remote tags
librbd::journal::TagData remote_tag_data;
for (auto &tag : m_remote_tags) {
m_local_mirror_uuid, m_journaler,
m_client_meta, m_work_queue, ctx,
m_progress_ctx);
- request->start();
+ {
+ Mutex::Locker locker(m_lock);
+ request->get();
+ m_image_sync_request = request;
+ }
+
+ request->send();
}
template <typename I>
void BootstrapRequest<I>::handle_image_sync(int r) {
dout(20) << ": r=" << r << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ m_image_sync_request->put();
+ m_image_sync_request = nullptr;
+ }
+
+ if (m_canceled) {
+ dout(10) << ": request canceled" << dendl;
+ m_ret_val = -ECANCELED;
+ }
+
if (r < 0) {
derr << ": failed to sync remote image: " << cpp_strerror(r) << dendl;
m_ret_val = r;
finish(m_ret_val);
}
-template <typename I>
-void BootstrapRequest<I>::finish(int r) {
- dout(20) << ": r=" << r << dendl;
-
- m_on_finish->complete(r);
- delete this;
-}
-
template <typename I>
bool BootstrapRequest<I>::decode_client_meta() {
dout(20) << dendl;
*m_client_meta = *client_meta;
- dout(20) << ": client found: image_id=" << m_local_image_id << dendl;
+ dout(20) << ": client found: image_id=" << m_local_image_id
+ << ", client_meta=" << *m_client_meta << dendl;
return true;
}
#include "include/int_types.h"
#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
#include "cls/journal/cls_journal_types.h"
#include "librbd/journal/TypeTraits.h"
+#include "tools/rbd_mirror/BaseRequest.h"
#include <list>
#include <string>
namespace rbd {
namespace mirror {
+template <typename> class ImageSync;
class ProgressContext;
namespace image_replayer {
template <typename ImageCtxT = librbd::ImageCtx>
-class BootstrapRequest {
+class BootstrapRequest : public BaseRequest {
public:
typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
typedef typename TypeTraits::Journaler Journaler;
~BootstrapRequest();
void send();
+ void cancel();
private:
/**
std::string m_remote_mirror_uuid;
Journaler *m_journaler;
MirrorPeerClientMeta *m_client_meta;
- Context *m_on_finish;
ProgressContext *m_progress_ctx;
+ Mutex m_lock;
+ ImageSync<ImageCtxT> *m_image_sync_request = nullptr;
+ bool m_canceled = false;
Tags m_remote_tags;
cls::journal::Client m_client;
void close_remote_image();
void handle_close_remote_image(int r);
- void finish(int r);
-
bool decode_client_meta();
void update_progress(const std::string &description);
MirrorPeerSyncPoint *sync_point,
Context *on_finish,
ProgressContext *progress_ctx)
- : m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
+ : BaseRequest("rbd::mirror::image_sync::ImageCopyRequest",
+ local_image_ctx->cct, on_finish),
+ m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
m_timer(timer), m_timer_lock(timer_lock), m_journaler(journaler),
m_client_meta(client_meta), m_sync_point(sync_point),
- m_on_finish(on_finish), m_progress_ctx(progress_ctx),
+ m_progress_ctx(progress_ctx),
m_lock(unique_lock_name("ImageCopyRequest::m_lock", this)),
m_client_meta_copy(*client_meta) {
assert(!m_client_meta_copy.sync_points.empty());
CephContext *cct = m_local_image_ctx->cct;
ldout(cct, 20) << ": r=" << r << dendl;
+ if (r == 0) {
+ Mutex::Locker locker(m_lock);
+ if (m_canceled) {
+ ldout(cct, 10) << ": image copy canceled" << dendl;
+ r = -ECANCELED;
+ }
+ }
+
if (r < 0) {
- lderr(cct) << ": failed to update client data: " << cpp_strerror(r)
- << dendl;
+ if (r != -ECANCELED) {
+ lderr(cct) << ": failed to update client data: " << cpp_strerror(r)
+ << dendl;
+ }
finish(r);
return;
}
template <typename I>
void ImageCopyRequest<I>::send_next_object_copy() {
assert(m_lock.is_locked());
- if (m_canceled) {
- return;
- } else if (m_ret_val < 0 || m_object_no >= m_end_object_no) {
+ CephContext *cct = m_local_image_ctx->cct;
+
+ if (m_canceled && m_ret_val == 0) {
+ ldout(cct, 10) << ": image copy canceled" << dendl;
+ m_ret_val = -ECANCELED;
+ }
+
+ if (m_ret_val < 0 || m_object_no >= m_end_object_no) {
return;
}
uint64_t ono = m_object_no++;
- CephContext *cct = m_local_image_ctx->cct;
ldout(cct, 20) << ": object_num=" << ono << dendl;
++m_current_ops;
Mutex::Locker locker(m_lock);
assert(m_current_ops > 0);
--m_current_ops;
+
percent = 100 * m_object_no / m_end_object_no;
if (r < 0) {
finish(0);
}
-template <typename I>
-void ImageCopyRequest<I>::finish(int r) {
- CephContext *cct = m_local_image_ctx->cct;
- ldout(cct, 20) << ": r=" << r << dendl;
-
- m_on_finish->complete(r);
- delete this;
-}
-
template <typename I>
int ImageCopyRequest<I>::compute_snap_map() {
CephContext *cct = m_local_image_ctx->cct;
#include "common/Mutex.h"
#include "librbd/journal/Types.h"
#include "librbd/journal/TypeTraits.h"
+#include "tools/rbd_mirror/BaseRequest.h"
#include <map>
#include <vector>
namespace image_sync {
template <typename ImageCtxT = librbd::ImageCtx>
-class ImageCopyRequest {
+class ImageCopyRequest : public BaseRequest {
public:
typedef std::vector<librados::snap_t> SnapIds;
typedef std::map<librados::snap_t, SnapIds> SnapMap;
Journaler *m_journaler;
MirrorPeerClientMeta *m_client_meta;
MirrorPeerSyncPoint *m_sync_point;
- Context *m_on_finish;
ProgressContext *m_progress_ctx;
SnapMap m_snap_map;
void send_flush_sync_point();
void handle_flush_sync_point(int r);
- void finish(int r);
-
int compute_snap_map();
void update_progress(const std::string &description, bool flush = true);
} // anonymous namespace
using librbd::util::create_context_callback;
+using librbd::util::unique_lock_name;
template <typename I>
SnapshotCopyRequest<I>::SnapshotCopyRequest(I *local_image_ctx,
librbd::journal::MirrorPeerClientMeta *meta,
ContextWQ *work_queue,
Context *on_finish)
- : m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
+ : BaseRequest("rbd::mirror::image_sync::SnapshotCopyRequest",
+ local_image_ctx->cct, on_finish),
+ m_local_image_ctx(local_image_ctx), m_remote_image_ctx(remote_image_ctx),
m_snap_map(snap_map), m_journaler(journaler), m_client_meta(meta),
- m_work_queue(work_queue), m_on_finish(on_finish),
- m_snap_seqs(meta->snap_seqs) {
+ m_work_queue(work_queue), m_snap_seqs(meta->snap_seqs),
+ m_lock(unique_lock_name("SnapshotCopyRequest::m_lock", this)) {
m_snap_map->clear();
// snap ids ordered from oldest to newest
send_snap_unprotect();
}
+template <typename I>
+void SnapshotCopyRequest<I>::cancel() {
+ Mutex::Locker locker(m_lock);
+
+ CephContext *cct = m_local_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+ m_canceled = true;
+}
+
template <typename I>
void SnapshotCopyRequest<I>::send_snap_unprotect() {
CephContext *cct = m_local_image_ctx->cct;
finish(r);
return;
}
+ if (handle_cancellation())
+ {
+ return;
+ }
send_snap_unprotect();
}
finish(r);
return;
}
+ if (handle_cancellation())
+ {
+ return;
+ }
send_snap_remove();
}
finish(r);
return;
}
+ if (handle_cancellation())
+ {
+ return;
+ }
assert(m_prev_snap_id != CEPH_NOSNAP);
finish(r);
return;
}
+ if (handle_cancellation())
+ {
+ return;
+ }
send_snap_protect();
}
finish(r);
return;
}
+ if (handle_cancellation())
+ {
+ return;
+ }
m_client_meta->snap_seqs = m_snap_seqs;
}
template <typename I>
-void SnapshotCopyRequest<I>::error(int r) {
- dout(20) << ": r=" << r << dendl;
-
- m_work_queue->queue(create_context_callback<
- SnapshotCopyRequest<I>, &SnapshotCopyRequest<I>::finish>(this), r);
+bool SnapshotCopyRequest<I>::handle_cancellation() {
+ {
+ Mutex::Locker locker(m_lock);
+ if (!m_canceled) {
+ return false;
+ }
+ }
+ CephContext *cct = m_local_image_ctx->cct;
+ ldout(cct, 10) << ": snapshot copy canceled" << dendl;
+ finish(-ECANCELED);
+ return true;
}
template <typename I>
-void SnapshotCopyRequest<I>::finish(int r) {
- CephContext *cct = m_local_image_ctx->cct;
- ldout(cct, 20) << ": r=" << r << dendl;
-
- if (r >= 0) {
- m_client_meta->snap_seqs = m_snap_seqs;
- }
+void SnapshotCopyRequest<I>::error(int r) {
+ dout(20) << ": r=" << r << dendl;
- m_on_finish->complete(r);
- delete this;
+ m_work_queue->queue(new FunctionContext([this, r](int r1) { finish(r); }));
}
template <typename I>
#include "librbd/ImageCtx.h"
#include "librbd/parent_types.h"
#include "librbd/journal/TypeTraits.h"
+#include "tools/rbd_mirror/BaseRequest.h"
#include <map>
#include <set>
#include <string>
namespace image_sync {
template <typename ImageCtxT = librbd::ImageCtx>
-class SnapshotCopyRequest {
+class SnapshotCopyRequest : public BaseRequest {
public:
typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
typedef typename TypeTraits::Journaler Journaler;
ContextWQ *work_queue, Context *on_finish);
void send();
+ void cancel();
private:
/**
Journaler *m_journaler;
librbd::journal::MirrorPeerClientMeta *m_client_meta;
ContextWQ *m_work_queue;
- Context *m_on_finish;
SnapIdSet m_local_snap_ids;
SnapIdSet m_remote_snap_ids;
librbd::parent_spec m_local_parent_spec;
+ Mutex m_lock;
+ bool m_canceled = false;
+
void send_snap_unprotect();
void handle_snap_unprotect(int r);
void send_update_client();
void handle_update_client(int r);
+ bool handle_cancellation();
+
void error(int r);
- void finish(int r);
void compute_snap_map();