}
template <typename I>
-void ImageSyncThrottler<I>::start_sync(
- I *local_image_ctx, I *remote_image_ctx,
- SafeTimer *timer, Mutex *timer_lock,
- const std::string &mirror_uuid,
- Journaler *journaler,
- MirrorPeerClientMeta *client_meta,
- ContextWQ *work_queue, Context *on_finish,
- ProgressContext *progress_ctx) {
+void ImageSyncThrottler<I>::start_sync(I *local_image_ctx, I *remote_image_ctx,
+ SafeTimer *timer, Mutex *timer_lock,
+ const std::string &mirror_uuid,
+ Journaler *journaler,
+ MirrorPeerClientMeta *client_meta,
+ ContextWQ *work_queue,
+ Context *on_finish,
+ ProgressContext *progress_ctx) {
dout(20) << dendl;
- C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, local_image_ctx->id,
+ PoolImageId pool_image_id(local_image_ctx->md_ctx.get_id(),
+ local_image_ctx->id);
+ C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, pool_image_id,
on_finish);
sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx,
- remote_image_ctx, timer,
- timer_lock, mirror_uuid,
- journaler, client_meta,
- work_queue, sync_holder_ctx,
- progress_ctx);
+ remote_image_ctx, timer,
+ timer_lock, mirror_uuid,
+ journaler, client_meta,
+ work_queue, sync_holder_ctx,
+ progress_ctx);
sync_holder_ctx->m_sync->get();
bool start = false;
Mutex::Locker l(m_lock);
if (m_inflight_syncs.size() < m_max_concurrent_syncs) {
- m_inflight_syncs.insert(std::make_pair(local_image_ctx->id,
- sync_holder_ctx));
+ assert(m_inflight_syncs.count(pool_image_id) == 0);
+ m_inflight_syncs[pool_image_id] = sync_holder_ctx;
start = true;
dout(10) << "ready to start image sync for local_image_id "
<< local_image_ctx->id << " [" << m_inflight_syncs.size() << "/"
}
if (start) {
- sync_holder_ctx->m_sync->send();
+ sync_holder_ctx->m_sync->send();
}
}
template <typename I>
-void ImageSyncThrottler<I>::cancel_sync(const std::string& local_image_id) {
+void ImageSyncThrottler<I>::cancel_sync(librados::IoCtx &local_io_ctx,
+ const std::string local_image_id) {
dout(20) << dendl;
C_SyncHolder *sync_holder = nullptr;
{
Mutex::Locker l(m_lock);
-
if (m_inflight_syncs.empty()) {
// no image sync currently running and neither waiting
return;
}
- auto it = m_inflight_syncs.find(local_image_id);
+ PoolImageId local_pool_image_id(local_io_ctx.get_id(),
+ local_image_id);
+ auto it = m_inflight_syncs.find(local_pool_image_id);
if (it != m_inflight_syncs.end()) {
sync_holder = it->second;
}
if (!sync_holder) {
- for (auto it=m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
- if ((*it)->m_local_image_id == local_image_id) {
+ for (auto it = m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
+ if ((*it)->m_local_pool_image_id == local_pool_image_id) {
sync_holder = (*it);
m_sync_queue.erase(it);
running_sync = false;
if (sync_holder) {
if (running_sync) {
dout(10) << "canceled running image sync for local_image_id "
- << sync_holder->m_local_image_id << dendl;
+ << sync_holder->m_local_pool_image_id.second << dendl;
sync_holder->m_sync->cancel();
} else {
dout(10) << "canceled waiting image sync for local_image_id "
- << sync_holder->m_local_image_id << dendl;
+ << sync_holder->m_local_pool_image_id.second << dendl;
sync_holder->m_on_finish->complete(-ECANCELED);
sync_holder->m_sync->put();
delete sync_holder;
{
Mutex::Locker l(m_lock);
+ m_inflight_syncs.erase(sync_holder->m_local_pool_image_id);
- m_inflight_syncs.erase(sync_holder->m_local_image_id);
-
- if (m_inflight_syncs.size() < m_max_concurrent_syncs
- && !m_sync_queue.empty()) {
+ if (m_inflight_syncs.size() < m_max_concurrent_syncs &&
+ !m_sync_queue.empty()) {
next_sync_holder = m_sync_queue.back();
m_sync_queue.pop_back();
- m_inflight_syncs.insert(std::make_pair(next_sync_holder->m_local_image_id,
- next_sync_holder));
+
+ assert(
+ m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0);
+ m_inflight_syncs[next_sync_holder->m_local_pool_image_id] =
+ next_sync_holder;
dout(10) << "ready to start image sync for local_image_id "
- << next_sync_holder->m_local_image_id
+ << next_sync_holder->m_local_pool_image_id.second
<< " [" << m_inflight_syncs.size() << "/"
<< m_max_concurrent_syncs << "]" << dendl;
}
C_SyncHolder *next_sync_holder = m_sync_queue.back();
next_sync_holders.push_back(next_sync_holder);
m_sync_queue.pop_back();
- m_inflight_syncs.insert(std::make_pair(next_sync_holder->m_local_image_id,
- next_sync_holder));
+
+ assert(
+ m_inflight_syncs.count(next_sync_holder->m_local_pool_image_id) == 0);
+ m_inflight_syncs[next_sync_holder->m_local_pool_image_id] =
+ next_sync_holder;
+
dout(10) << "ready to start image sync for local_image_id "
- << next_sync_holder->m_local_image_id
+ << next_sync_holder->m_local_pool_image_id.second
<< " [" << m_inflight_syncs.size() << "/"
<< m_max_concurrent_syncs << "]" << dendl;
}
#include <list>
#include <map>
+#include <utility>
#include "common/Mutex.h"
#include "librbd/ImageCtx.h"
#include "include/Context.h"
ContextWQ *work_queue, Context *on_finish,
ProgressContext *progress_ctx = nullptr);
- void cancel_sync(const std::string& mirror_uuid);
+ void cancel_sync(librados::IoCtx &local_io_ctx,
+ const std::string local_image_id);
void set_max_concurrent_syncs(uint32_t max);
void print_status(Formatter *f, std::stringstream *ss);
private:
+ typedef std::pair<int64_t, std::string> PoolImageId;
struct C_SyncHolder : public Context {
ImageSyncThrottler<ImageCtxT> *m_sync_throttler;
- std::string m_local_image_id;
- ImageSync<ImageCtxT> *m_sync;
+ PoolImageId m_local_pool_image_id;
+ ImageSync<ImageCtxT> *m_sync = nullptr;
Context *m_on_finish;
C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler,
- const std::string& local_image_id, Context *on_finish)
- : m_sync_throttler(sync_throttler), m_local_image_id(local_image_id),
- m_sync(nullptr), m_on_finish(on_finish) {}
+ const PoolImageId &local_pool_image_id, Context *on_finish)
+ : m_sync_throttler(sync_throttler),
+ m_local_pool_image_id(local_pool_image_id), m_on_finish(on_finish) {
+ }
virtual void finish(int r) {
m_sync_throttler->handle_sync_finished(this);
uint32_t m_max_concurrent_syncs;
Mutex m_lock;
std::list<C_SyncHolder *> m_sync_queue;
- std::map<std::string, C_SyncHolder *> m_inflight_syncs;
+ std::map<PoolImageId, C_SyncHolder *> m_inflight_syncs;
};