#include "librbd/ImageState.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/ImageDeleter.h"
+#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/Threads.h"
#include <string>
rbd::mirror::RadosRef remote(new librados::Rados());
rbd::mirror::Threads *threads = nullptr;
std::shared_ptr<rbd::mirror::ImageDeleter> image_deleter;
+ std::shared_ptr<rbd::mirror::ImageSyncThrottler<>> image_sync_throttler;
C_SaferCond start_cond, stop_cond;
image_deleter.reset(new rbd::mirror::ImageDeleter(local, threads->timer,
&threads->timer_lock));
- replayer = new rbd::mirror::ImageReplayer<>(threads, image_deleter, local,
+ image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>());
+
+ replayer = new rbd::mirror::ImageReplayer<>(threads, image_deleter,
+ image_sync_throttler, local,
remote, client_id,
"remote mirror uuid",
local_pool_id, remote_pool_id,
#include "librbd/internal.h"
#include "tools/rbd_mirror/types.h"
#include "tools/rbd_mirror/ImageReplayer.h"
+#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "tools/rbd_mirror/Threads.h"
#include "tools/rbd_mirror/ImageDeleter.h"
m_threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
m_local_ioctx.cct()));
+
m_image_deleter.reset(new rbd::mirror::ImageDeleter(m_local_cluster,
m_threads->timer,
&m_threads->timer_lock));
+
+ m_image_sync_throttler.reset(new rbd::mirror::ImageSyncThrottler<>());
}
~TestImageReplayer()
template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
void create_replayer() {
- m_replayer = new ImageReplayerT(m_threads, m_image_deleter,
+ m_replayer = new ImageReplayerT(m_threads, m_image_deleter, m_image_sync_throttler,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
m_local_mirror_uuid, m_remote_mirror_uuid, m_local_ioctx.get_id(),
std::shared_ptr<rbd::mirror::ImageDeleter> m_image_deleter;
std::shared_ptr<librados::Rados> m_local_cluster;
librados::Rados m_remote_cluster;
+ std::shared_ptr<rbd::mirror::ImageSyncThrottler<>> m_image_sync_throttler;
std::string m_local_mirror_uuid = "local mirror uuid";
std::string m_remote_mirror_uuid = "remote mirror uuid";
std::string m_local_pool_name, m_remote_pool_name;
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
+#include "tools/rbd_mirror/ImageSyncThrottler.h"
#include "test/journal/mock/MockJournaler.h"
#include "test/librbd/mock/MockImageCtx.h"
#include "test/librbd/mock/MockJournal.h"
Context *on_finish = nullptr;
static BootstrapRequest* create(librados::IoCtx &local_io_ctx,
- librados::IoCtx &remote_io_ctx,
- librbd::MockTestImageCtx **local_image_ctx,
- const std::string &local_image_name,
- const std::string &remote_image_id,
- const std::string &global_image_id,
- ContextWQ *work_queue, SafeTimer *timer,
- Mutex *timer_lock,
- const std::string &local_mirror_uuid,
- const std::string &remote_mirror_uuid,
- ::journal::MockJournalerProxy *journaler,
- librbd::journal::MirrorPeerClientMeta *client_meta,
- Context *on_finish,
- rbd::mirror::ProgressContext *progress_ctx = nullptr) {
+ librados::IoCtx &remote_io_ctx,
+ rbd::mirror::ImageSyncThrottlerRef<librbd::MockTestImageCtx> image_sync_throttler,
+ librbd::MockTestImageCtx **local_image_ctx,
+ const std::string &local_image_name,
+ const std::string &remote_image_id,
+ const std::string &global_image_id,
+ ContextWQ *work_queue, SafeTimer *timer,
+ Mutex *timer_lock,
+ const std::string &local_mirror_uuid,
+ const std::string &remote_mirror_uuid,
+ ::journal::MockJournalerProxy *journaler,
+ librbd::journal::MirrorPeerClientMeta *client_meta,
+ Context *on_finish,
+ rbd::mirror::ProgressContext *progress_ctx = nullptr) {
assert(s_instance != nullptr);
s_instance->on_finish = on_finish;
return s_instance;
template <typename I>
ImageReplayer<I>::ImageReplayer(Threads *threads,
shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<I> image_sync_throttler,
RadosRef local, RadosRef remote,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid,
const std::string &global_image_id) :
m_threads(threads),
m_image_deleter(image_deleter),
+ m_image_sync_throttler(image_sync_throttler),
m_local(local),
m_remote(remote),
m_local_mirror_uuid(local_mirror_uuid),
ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
BootstrapRequest<I> *request = BootstrapRequest<I>::create(
- m_local_ioctx, m_remote_ioctx, &m_local_image_ctx,
- m_local_image_name, m_remote_image_id, m_global_image_id,
- m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
- m_local_mirror_uuid, m_remote_mirror_uuid, m_remote_journaler,
- &m_client_meta, ctx, &m_progress_cxt);
+ m_local_ioctx, m_remote_ioctx, m_image_sync_throttler,
+ &m_local_image_ctx, m_local_image_name, m_remote_image_id,
+ m_global_image_id, m_threads->work_queue, m_threads->timer,
+ &m_threads->timer_lock, m_local_mirror_uuid, m_remote_mirror_uuid,
+ m_remote_journaler, &m_client_meta, ctx, &m_progress_cxt);
{
Mutex::Locker locker(m_lock);
};
ImageReplayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
RadosRef local, RadosRef remote,
const std::string &local_mirror_uuid,
const std::string &remote_mirror_uuid, int64_t local_pool_id,
Threads *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
+ ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
RadosRef m_local, m_remote;
std::string m_local_mirror_uuid;
std::string m_remote_mirror_uuid;
#include "common/errno.h"
#include "Mirror.h"
#include "Threads.h"
+#include "ImageSync.h"
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
m_image_deleter.reset(new ImageDeleter(m_local, m_threads->timer,
&m_threads->timer_lock));
+ m_image_sync_throttler.reset(new ImageSyncThrottler<>());
+
return r;
}
m_image_deleter->print_status(f, ss);
+ if (f) {
+ f->close_section();
+ f->open_object_section("sync_throttler");
+ }
+
+ m_image_sync_throttler->print_status(f, ss);
+
if (f) {
f->close_section();
f->close_section();
if (m_replayers.find(pool_peer) == m_replayers.end()) {
dout(20) << "starting replayer for " << peer << dendl;
unique_ptr<Replayer> replayer(new Replayer(m_threads, m_image_deleter,
+ m_image_sync_throttler,
m_local, kv.first, peer,
m_args));
// TODO: make async, and retry connecting within replayer
// monitor local cluster for config changes in peers
std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
std::shared_ptr<ImageDeleter> m_image_deleter;
+ ImageSyncThrottlerRef<> m_image_sync_throttler;
std::map<PoolPeer, std::unique_ptr<Replayer> > m_replayers;
atomic_t m_stopping;
bool m_manual_stop = false;
};
Replayer::Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<> image_sync_throttler,
RadosRef local_cluster, int64_t local_pool_id,
const peer_t &peer, const std::vector<const char*> &args) :
m_threads(threads),
m_image_deleter(image_deleter),
+ m_image_sync_throttler(image_sync_throttler),
m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)),
m_peer(peer),
m_args(args),
auto it = m_image_replayers.find(image_id.id);
if (it == m_image_replayers.end()) {
unique_ptr<ImageReplayer<> > image_replayer(new ImageReplayer<>(
- m_threads, m_image_deleter, m_local, m_remote, local_mirror_uuid,
- remote_mirror_uuid, m_local_pool_id, m_remote_pool_id, image_id.id,
- image_id.global_id));
+ m_threads, m_image_deleter, m_image_sync_throttler, m_local, m_remote,
+ local_mirror_uuid, remote_mirror_uuid, m_local_pool_id,
+ m_remote_pool_id, image_id.id, image_id.global_id));
it = m_image_replayers.insert(
std::make_pair(image_id.id, std::move(image_replayer))).first;
}
class Replayer {
public:
Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
+ ImageSyncThrottlerRef<> image_sync_throttler,
RadosRef local_cluster, int64_t local_pool_id, const peer_t &peer,
const std::vector<const char*> &args);
~Replayer();
Threads *m_threads;
std::shared_ptr<ImageDeleter> m_image_deleter;
+ ImageSyncThrottlerRef<> m_image_sync_throttler;
Mutex m_lock;
Cond m_cond;
atomic_t m_stopping;
#include "librbd/journal/Types.h"
#include "tools/rbd_mirror/ImageSync.h"
#include "tools/rbd_mirror/ProgressContext.h"
+#include "tools/rbd_mirror/ImageSyncThrottler.h"
#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
using librbd::util::unique_lock_name;
template <typename I>
-BootstrapRequest<I>::BootstrapRequest(librados::IoCtx &local_io_ctx,
- librados::IoCtx &remote_io_ctx,
- I **local_image_ctx,
- const std::string &local_image_name,
- const std::string &remote_image_id,
- const std::string &global_image_id,
- ContextWQ *work_queue, SafeTimer *timer,
- Mutex *timer_lock,
- const std::string &local_mirror_uuid,
- const std::string &remote_mirror_uuid,
- Journaler *journaler,
- MirrorPeerClientMeta *client_meta,
- Context *on_finish,
- rbd::mirror::ProgressContext *progress_ctx)
+BootstrapRequest<I>::BootstrapRequest(
+ librados::IoCtx &local_io_ctx,
+ librados::IoCtx &remote_io_ctx,
+ std::shared_ptr<ImageSyncThrottler<I>> image_sync_throttler,
+ I **local_image_ctx,
+ const std::string &local_image_name,
+ const std::string &remote_image_id,
+ const std::string &global_image_id,
+ ContextWQ *work_queue, SafeTimer *timer,
+ Mutex *timer_lock,
+ const std::string &local_mirror_uuid,
+ const std::string &remote_mirror_uuid,
+ Journaler *journaler,
+ MirrorPeerClientMeta *client_meta,
+ Context *on_finish,
+ rbd::mirror::ProgressContext *progress_ctx)
: BaseRequest("rbd::mirror::image_replayer::BootstrapRequest",
reinterpret_cast<CephContext*>(local_io_ctx.cct()), on_finish),
m_local_io_ctx(local_io_ctx), m_remote_io_ctx(remote_io_ctx),
+ m_image_sync_throttler(image_sync_throttler),
m_local_image_ctx(local_image_ctx), m_local_image_name(local_image_name),
m_remote_image_id(remote_image_id), m_global_image_id(global_image_id),
m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
template <typename I>
BootstrapRequest<I>::~BootstrapRequest() {
- assert(m_image_sync_request == nullptr);
assert(m_remote_image_ctx == nullptr);
}
Mutex::Locker locker(m_lock);
m_canceled = true;
- if (m_image_sync_request) {
- m_image_sync_request->cancel();
- }
+ m_image_sync_throttler->cancel_sync(m_local_image_id);
}
template <typename I>
Context *ctx = create_context_callback<
BootstrapRequest<I>, &BootstrapRequest<I>::handle_image_sync>(
this);
- ImageSync<I> *request = ImageSync<I>::create(*m_local_image_ctx,
- m_remote_image_ctx, m_timer,
- m_timer_lock,
- m_local_mirror_uuid, m_journaler,
- m_client_meta, m_work_queue, ctx,
- m_progress_ctx);
- {
- Mutex::Locker locker(m_lock);
- request->get();
- m_image_sync_request = request;
- }
- request->send();
+ m_image_sync_throttler->start_sync(*m_local_image_ctx,
+ m_remote_image_ctx, m_timer,
+ m_timer_lock,
+ m_local_mirror_uuid, m_journaler,
+ m_client_meta, m_work_queue, ctx,
+ m_progress_ctx);
}
template <typename I>
void BootstrapRequest<I>::handle_image_sync(int r) {
dout(20) << ": r=" << r << dendl;
- {
- Mutex::Locker locker(m_lock);
- m_image_sync_request->put();
- m_image_sync_request = nullptr;
- }
-
if (m_canceled) {
dout(10) << ": request canceled" << dendl;
m_ret_val = -ECANCELED;
#include "cls/journal/cls_journal_types.h"
#include "librbd/journal/TypeTraits.h"
#include "tools/rbd_mirror/BaseRequest.h"
+#include "tools/rbd_mirror/types.h"
#include <list>
#include <string>
namespace rbd {
namespace mirror {
-template <typename> class ImageSync;
class ProgressContext;
namespace image_replayer {
typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta;
typedef rbd::mirror::ProgressContext ProgressContext;
- static BootstrapRequest* create(librados::IoCtx &local_io_ctx,
- librados::IoCtx &remote_io_ctx,
- ImageCtxT **local_image_ctx,
- const std::string &local_image_name,
- const std::string &remote_image_id,
- const std::string &global_image_id,
- ContextWQ *work_queue, SafeTimer *timer,
- Mutex *timer_lock,
- const std::string &local_mirror_uuid,
- const std::string &remote_mirror_uuid,
- Journaler *journaler,
- MirrorPeerClientMeta *client_meta,
- Context *on_finish,
- ProgressContext *progress_ctx = nullptr) {
- return new BootstrapRequest(local_io_ctx, remote_io_ctx, local_image_ctx,
+ static BootstrapRequest* create(
+ librados::IoCtx &local_io_ctx,
+ librados::IoCtx &remote_io_ctx,
+ ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
+ ImageCtxT **local_image_ctx,
+ const std::string &local_image_name,
+ const std::string &remote_image_id,
+ const std::string &global_image_id,
+ ContextWQ *work_queue, SafeTimer *timer,
+ Mutex *timer_lock,
+ const std::string &local_mirror_uuid,
+ const std::string &remote_mirror_uuid,
+ Journaler *journaler,
+ MirrorPeerClientMeta *client_meta,
+ Context *on_finish,
+ ProgressContext *progress_ctx = nullptr) {
+ return new BootstrapRequest(local_io_ctx, remote_io_ctx,
+ image_sync_throttler, local_image_ctx,
local_image_name, remote_image_id,
global_image_id, work_queue, timer, timer_lock,
local_mirror_uuid, remote_mirror_uuid,
BootstrapRequest(librados::IoCtx &local_io_ctx,
librados::IoCtx &remote_io_ctx,
+ ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
ImageCtxT **local_image_ctx,
const std::string &local_image_name,
const std::string &remote_image_id,
librados::IoCtx &m_local_io_ctx;
librados::IoCtx &m_remote_io_ctx;
+ ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
ImageCtxT **m_local_image_ctx;
std::string m_local_image_name;
std::string m_local_image_id;
MirrorPeerClientMeta *m_client_meta;
ProgressContext *m_progress_ctx;
Mutex m_lock;
- ImageSync<ImageCtxT> *m_image_sync_request = nullptr;
bool m_canceled = false;
Tags m_remote_tags;
#include <vector>
#include "include/rbd/librbd.hpp"
+#include "ImageSyncThrottler.h"
namespace rbd {
namespace mirror {
typedef shared_ptr<librados::IoCtx> IoCtxRef;
typedef shared_ptr<librbd::Image> ImageRef;
+template <typename I = librbd::ImageCtx>
+using ImageSyncThrottlerRef = std::shared_ptr<ImageSyncThrottler<I>>;
+
struct peer_t {
peer_t() = default;
peer_t(const std::string &uuid, const std::string &cluster_name,