From: Ricardo Dias Date: Wed, 8 Jun 2016 15:37:20 +0000 (+0100) Subject: rbd-mirror: Implementation of image-sync throttler X-Git-Tag: v11.0.0~15^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c4f926d6980d1efd95771885a11d8cc4ebd2e4c3;p=ceph.git rbd-mirror: Implementation of image-sync throttler Signed-off-by: Ricardo Dias --- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index df103872a45b..16a22d66160c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1223,6 +1223,7 @@ if(${WITH_RBD}) tools/rbd_mirror/ImageReplayer.cc tools/rbd_mirror/ImageDeleter.cc tools/rbd_mirror/ImageSync.cc + tools/rbd_mirror/ImageSyncThrottler.cc tools/rbd_mirror/Mirror.cc tools/rbd_mirror/PoolWatcher.cc tools/rbd_mirror/Replayer.cc diff --git a/src/common/config_opts.h b/src/common/config_opts.h index c1b94e22034b..e4179820345f 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1238,6 +1238,7 @@ OPTION(rbd_journal_pool, OPT_STR, "") // pool for journal objects * RBD Mirror options */ OPTION(rbd_mirror_sync_point_update_age, OPT_DOUBLE, 30) // number of seconds between each update of the image sync point object number +OPTION(rbd_mirror_concurrent_image_syncs, OPT_U32, 5) // maximum number of image syncs in parallel OPTION(nss_db_path, OPT_STR, "") // path to nss db diff --git a/src/tools/Makefile-client.am b/src/tools/Makefile-client.am index 6bb86a6ab461..323eb3642f7c 100644 --- a/src/tools/Makefile-client.am +++ b/src/tools/Makefile-client.am @@ -93,6 +93,7 @@ librbd_mirror_internal_la_SOURCES = \ tools/rbd_mirror/ClusterWatcher.cc \ tools/rbd_mirror/ImageReplayer.cc \ tools/rbd_mirror/ImageSync.cc \ + tools/rbd_mirror/ImageSyncThrottler.cc \ tools/rbd_mirror/Mirror.cc \ tools/rbd_mirror/PoolWatcher.cc \ tools/rbd_mirror/Replayer.cc \ @@ -117,6 +118,7 @@ noinst_HEADERS += \ tools/rbd_mirror/ClusterWatcher.h \ tools/rbd_mirror/ImageReplayer.h \ tools/rbd_mirror/ImageSync.h \ + tools/rbd_mirror/ImageSyncThrottler.h \ tools/rbd_mirror/Mirror.h \ tools/rbd_mirror/PoolWatcher.h \ tools/rbd_mirror/ProgressContext.h \ diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/tools/rbd_mirror/ImageSyncThrottler.cc new file mode 100644 index 000000000000..fd759acbb7e2 --- /dev/null +++ b/src/tools/rbd_mirror/ImageSyncThrottler.cc @@ -0,0 +1,243 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 SUSE LINUX GmbH + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "ImageSyncThrottler.h" +#include "ImageSync.h" +#include "common/ceph_context.h" + +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \ + << " " << __func__ << ": " +using std::unique_ptr; +using std::string; +using std::set; + +namespace rbd { +namespace mirror { + +template +ImageSyncThrottler::ImageSyncThrottler() + : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs), + m_lock("rbd::mirror::ImageSyncThrottler") +{ + dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs + << dendl; + g_ceph_context->_conf->add_observer(this); +} + +template +ImageSyncThrottler::~ImageSyncThrottler() { + { + Mutex::Locker l(m_lock); + assert(m_sync_queue.empty()); + assert(m_inflight_syncs.empty()); + } + + g_ceph_context->_conf->remove_observer(this); +} + +template +void ImageSyncThrottler::start_sync( + I *local_image_ctx, I *remote_image_ctx, + SafeTimer *timer, Mutex *timer_lock, + const std::string &mirror_uuid, + Journaler *journaler, + MirrorPeerClientMeta *client_meta, + ContextWQ *work_queue, Context *on_finish, + ProgressContext *progress_ctx) { + dout(20) << dendl; + + C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, local_image_ctx->id, + on_finish); + sync_holder_ctx->m_sync = ImageSync::create(local_image_ctx, + remote_image_ctx, timer, + timer_lock, mirror_uuid, + journaler, client_meta, + work_queue, sync_holder_ctx, + progress_ctx); + sync_holder_ctx->m_sync->get(); + + bool start = false; + { + Mutex::Locker l(m_lock); + + if (m_inflight_syncs.size() < m_max_concurrent_syncs) { + m_inflight_syncs.insert(std::make_pair(local_image_ctx->id, + sync_holder_ctx)); + start = true; + dout(10) << "ready to start image sync for local_image_id " + << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/" + << m_max_concurrent_syncs << "]" << dendl; + } else { + m_sync_queue.push_front(sync_holder_ctx); + dout(10) << "image sync for local_image_id " << local_image_ctx->id + << " has been queued" << dendl; + } + } + + if (start) { + sync_holder_ctx->m_sync->send(); + } +} + +template +void ImageSyncThrottler::cancel_sync(const std::string& local_image_id) { + dout(20) << dendl; + + C_SyncHolder *sync_holder = nullptr; + bool running_sync = true; + + { + Mutex::Locker l(m_lock); + + if (m_inflight_syncs.empty()) { + // no image sync currently running and neither waiting + return; + } + + auto it = m_inflight_syncs.find(local_image_id); + if (it != m_inflight_syncs.end()) { + sync_holder = it->second; + } + + if (!sync_holder) { + for (auto it=m_sync_queue.begin(); it != m_sync_queue.end(); ++it) { + if ((*it)->m_local_image_id == local_image_id) { + sync_holder = (*it); + m_sync_queue.erase(it); + running_sync = false; + break; + } + } + } + } + + if (sync_holder) { + if (running_sync) { + dout(10) << "canceled running image sync for local_image_id " + << sync_holder->m_local_image_id << dendl; + sync_holder->m_sync->cancel(); + } else { + dout(10) << "canceled waiting image sync for local_image_id " + << sync_holder->m_local_image_id << dendl; + sync_holder->m_on_finish->complete(-ECANCELED); + sync_holder->m_sync->put(); + delete sync_holder; + } + } +} + +template +void ImageSyncThrottler::handle_sync_finished(C_SyncHolder *sync_holder) { + dout(20) << dendl; + + C_SyncHolder *next_sync_holder = nullptr; + + { + Mutex::Locker l(m_lock); + + m_inflight_syncs.erase(sync_holder->m_local_image_id); + + if (m_inflight_syncs.size() < m_max_concurrent_syncs + && !m_sync_queue.empty()) { + next_sync_holder = m_sync_queue.back(); + m_sync_queue.pop_back(); + m_inflight_syncs.insert(std::make_pair(next_sync_holder->m_local_image_id, + next_sync_holder)); + dout(10) << "ready to start image sync for local_image_id " + << next_sync_holder->m_local_image_id + << " [" << m_inflight_syncs.size() << "/" + << m_max_concurrent_syncs << "]" << dendl; + } + + dout(10) << "currently running image syncs [" << m_inflight_syncs.size() + << "/" << m_max_concurrent_syncs << "]" << dendl; + } + + if (next_sync_holder) { + next_sync_holder->m_sync->send(); + } +} + +template +void ImageSyncThrottler::set_max_concurrent_syncs(uint32_t max) { + dout(20) << " max=" << max << dendl; + + assert(max > 0); + + std::list next_sync_holders; + { + Mutex::Locker l(m_lock); + this->m_max_concurrent_syncs = max; + + // Start waiting syncs in the case of available free slots + while(m_inflight_syncs.size() < m_max_concurrent_syncs + && !m_sync_queue.empty()) { + C_SyncHolder *next_sync_holder = m_sync_queue.back(); + next_sync_holders.push_back(next_sync_holder); + m_sync_queue.pop_back(); + m_inflight_syncs.insert(std::make_pair(next_sync_holder->m_local_image_id, + next_sync_holder)); + dout(10) << "ready to start image sync for local_image_id " + << next_sync_holder->m_local_image_id + << " [" << m_inflight_syncs.size() << "/" + << m_max_concurrent_syncs << "]" << dendl; + } + } + + for (const auto& sync_holder : next_sync_holders) { + sync_holder->m_sync->send(); + } +} + +template +void ImageSyncThrottler::print_status(Formatter *f, stringstream *ss) { + Mutex::Locker l(m_lock); + + if (f) { + f->dump_int("max_parallel_syncs", m_max_concurrent_syncs); + f->dump_int("running_syncs", m_inflight_syncs.size()); + f->dump_int("waiting_syncs", m_sync_queue.size()); + f->flush(*ss); + } else { + *ss << "[ "; + *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", "; + *ss << "running_syncs=" << m_inflight_syncs.size() << ", "; + *ss << "waiting_syncs=" << m_sync_queue.size() << " ]"; + } +} + +template +const char** ImageSyncThrottler::get_tracked_conf_keys() const { + static const char* KEYS[] = { + "rbd_mirror_concurrent_image_syncs", + NULL + }; + return KEYS; +} + +template +void ImageSyncThrottler::handle_conf_change( + const struct md_config_t *conf, + const set &changed) { + if (changed.count("rbd_mirror_concurrent_image_syncs")) { + set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs); + } +} + +} // namespace mirror +} // namespace rbd + +template class rbd::mirror::ImageSyncThrottler; diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.h b/src/tools/rbd_mirror/ImageSyncThrottler.h new file mode 100644 index 000000000000..6c7fbf338ed1 --- /dev/null +++ b/src/tools/rbd_mirror/ImageSyncThrottler.h @@ -0,0 +1,103 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 SUSE LINUX GmbH + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H +#define CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H + +#include +#include +#include "common/Mutex.h" +#include "librbd/ImageCtx.h" +#include "include/Context.h" +#include "librbd/journal/TypeTraits.h" + +class CephContext; +class Context; +class ContextWQ; +class SafeTimer; +namespace journal { class Journaler; } +namespace librbd { namespace journal { struct MirrorPeerClientMeta; } } + +namespace rbd { +namespace mirror { + +template class ImageSync; + +class ProgressContext; + +/** + * Manage concurrent image-syncs + */ +template +class ImageSyncThrottler : public md_config_obs_t { +public: + + typedef librbd::journal::TypeTraits TypeTraits; + typedef typename TypeTraits::Journaler Journaler; + typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta; + + ImageSyncThrottler(); + ~ImageSyncThrottler(); + ImageSyncThrottler(const ImageSyncThrottler&) = delete; + ImageSyncThrottler& operator=(const ImageSyncThrottler&) = delete; + + void start_sync(ImageCtxT *local_image_ctx, + ImageCtxT *remote_image_ctx, SafeTimer *timer, + Mutex *timer_lock, const std::string &mirror_uuid, + Journaler *journaler, MirrorPeerClientMeta *client_meta, + ContextWQ *work_queue, Context *on_finish, + ProgressContext *progress_ctx = nullptr); + + void cancel_sync(const std::string& mirror_uuid); + + void set_max_concurrent_syncs(uint32_t max); + + void print_status(Formatter *f, std::stringstream *ss); + +private: + + struct C_SyncHolder : public Context { + ImageSyncThrottler *m_sync_throttler; + std::string m_local_image_id; + ImageSync *m_sync; + Context *m_on_finish; + + C_SyncHolder(ImageSyncThrottler *sync_throttler, + const std::string& local_image_id, Context *on_finish) + : m_sync_throttler(sync_throttler), m_local_image_id(local_image_id), + m_sync(nullptr), m_on_finish(on_finish) {} + + virtual void finish(int r) { + m_sync_throttler->handle_sync_finished(this); + m_on_finish->complete(r); + } + }; + + void handle_sync_finished(C_SyncHolder *sync_holder); + + const char **get_tracked_conf_keys() const; + void handle_conf_change(const struct md_config_t *conf, + const std::set &changed); + + uint32_t m_max_concurrent_syncs; + Mutex m_lock; + std::list m_sync_queue; + std::map m_inflight_syncs; + +}; + +} // namespace mirror +} // namespace rbd + +#endif // CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H