--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 SUSE LINUX GmbH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "ImageSyncThrottler.h"
+#include "ImageSync.h"
+#include "common/ceph_context.h"
+
+#define dout_subsys ceph_subsys_rbd_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
+ << " " << __func__ << ": "
+using std::unique_ptr;
+using std::string;
+using std::set;
+
+namespace rbd {
+namespace mirror {
+
+template <typename I>
+ImageSyncThrottler<I>::ImageSyncThrottler()
+ : m_max_concurrent_syncs(g_ceph_context->_conf->rbd_mirror_concurrent_image_syncs),
+ m_lock("rbd::mirror::ImageSyncThrottler")
+{
+ dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs
+ << dendl;
+ g_ceph_context->_conf->add_observer(this);
+}
+
+template <typename I>
+ImageSyncThrottler<I>::~ImageSyncThrottler() {
+ {
+ Mutex::Locker l(m_lock);
+ assert(m_sync_queue.empty());
+ assert(m_inflight_syncs.empty());
+ }
+
+ g_ceph_context->_conf->remove_observer(this);
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::start_sync(
+ I *local_image_ctx, I *remote_image_ctx,
+ SafeTimer *timer, Mutex *timer_lock,
+ const std::string &mirror_uuid,
+ Journaler *journaler,
+ MirrorPeerClientMeta *client_meta,
+ ContextWQ *work_queue, Context *on_finish,
+ ProgressContext *progress_ctx) {
+ dout(20) << dendl;
+
+ C_SyncHolder *sync_holder_ctx = new C_SyncHolder(this, local_image_ctx->id,
+ on_finish);
+ sync_holder_ctx->m_sync = ImageSync<I>::create(local_image_ctx,
+ remote_image_ctx, timer,
+ timer_lock, mirror_uuid,
+ journaler, client_meta,
+ work_queue, sync_holder_ctx,
+ progress_ctx);
+ sync_holder_ctx->m_sync->get();
+
+ bool start = false;
+ {
+ Mutex::Locker l(m_lock);
+
+ if (m_inflight_syncs.size() < m_max_concurrent_syncs) {
+ m_inflight_syncs.insert(std::make_pair(local_image_ctx->id,
+ sync_holder_ctx));
+ start = true;
+ dout(10) << "ready to start image sync for local_image_id "
+ << local_image_ctx->id << " [" << m_inflight_syncs.size() << "/"
+ << m_max_concurrent_syncs << "]" << dendl;
+ } else {
+ m_sync_queue.push_front(sync_holder_ctx);
+ dout(10) << "image sync for local_image_id " << local_image_ctx->id
+ << " has been queued" << dendl;
+ }
+ }
+
+ if (start) {
+ sync_holder_ctx->m_sync->send();
+ }
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::cancel_sync(const std::string& local_image_id) {
+ dout(20) << dendl;
+
+ C_SyncHolder *sync_holder = nullptr;
+ bool running_sync = true;
+
+ {
+ Mutex::Locker l(m_lock);
+
+ if (m_inflight_syncs.empty()) {
+ // no image sync currently running and neither waiting
+ return;
+ }
+
+ auto it = m_inflight_syncs.find(local_image_id);
+ if (it != m_inflight_syncs.end()) {
+ sync_holder = it->second;
+ }
+
+ if (!sync_holder) {
+ for (auto it=m_sync_queue.begin(); it != m_sync_queue.end(); ++it) {
+ if ((*it)->m_local_image_id == local_image_id) {
+ sync_holder = (*it);
+ m_sync_queue.erase(it);
+ running_sync = false;
+ break;
+ }
+ }
+ }
+ }
+
+ if (sync_holder) {
+ if (running_sync) {
+ dout(10) << "canceled running image sync for local_image_id "
+ << sync_holder->m_local_image_id << dendl;
+ sync_holder->m_sync->cancel();
+ } else {
+ dout(10) << "canceled waiting image sync for local_image_id "
+ << sync_holder->m_local_image_id << dendl;
+ sync_holder->m_on_finish->complete(-ECANCELED);
+ sync_holder->m_sync->put();
+ delete sync_holder;
+ }
+ }
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::handle_sync_finished(C_SyncHolder *sync_holder) {
+ dout(20) << dendl;
+
+ C_SyncHolder *next_sync_holder = nullptr;
+
+ {
+ Mutex::Locker l(m_lock);
+
+ m_inflight_syncs.erase(sync_holder->m_local_image_id);
+
+ if (m_inflight_syncs.size() < m_max_concurrent_syncs
+ && !m_sync_queue.empty()) {
+ next_sync_holder = m_sync_queue.back();
+ m_sync_queue.pop_back();
+ m_inflight_syncs.insert(std::make_pair(next_sync_holder->m_local_image_id,
+ next_sync_holder));
+ dout(10) << "ready to start image sync for local_image_id "
+ << next_sync_holder->m_local_image_id
+ << " [" << m_inflight_syncs.size() << "/"
+ << m_max_concurrent_syncs << "]" << dendl;
+ }
+
+ dout(10) << "currently running image syncs [" << m_inflight_syncs.size()
+ << "/" << m_max_concurrent_syncs << "]" << dendl;
+ }
+
+ if (next_sync_holder) {
+ next_sync_holder->m_sync->send();
+ }
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::set_max_concurrent_syncs(uint32_t max) {
+ dout(20) << " max=" << max << dendl;
+
+ assert(max > 0);
+
+ std::list<C_SyncHolder *> next_sync_holders;
+ {
+ Mutex::Locker l(m_lock);
+ this->m_max_concurrent_syncs = max;
+
+ // Start waiting syncs in the case of available free slots
+ while(m_inflight_syncs.size() < m_max_concurrent_syncs
+ && !m_sync_queue.empty()) {
+ C_SyncHolder *next_sync_holder = m_sync_queue.back();
+ next_sync_holders.push_back(next_sync_holder);
+ m_sync_queue.pop_back();
+ m_inflight_syncs.insert(std::make_pair(next_sync_holder->m_local_image_id,
+ next_sync_holder));
+ dout(10) << "ready to start image sync for local_image_id "
+ << next_sync_holder->m_local_image_id
+ << " [" << m_inflight_syncs.size() << "/"
+ << m_max_concurrent_syncs << "]" << dendl;
+ }
+ }
+
+ for (const auto& sync_holder : next_sync_holders) {
+ sync_holder->m_sync->send();
+ }
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::print_status(Formatter *f, stringstream *ss) {
+ Mutex::Locker l(m_lock);
+
+ if (f) {
+ f->dump_int("max_parallel_syncs", m_max_concurrent_syncs);
+ f->dump_int("running_syncs", m_inflight_syncs.size());
+ f->dump_int("waiting_syncs", m_sync_queue.size());
+ f->flush(*ss);
+ } else {
+ *ss << "[ ";
+ *ss << "max_parallel_syncs=" << m_max_concurrent_syncs << ", ";
+ *ss << "running_syncs=" << m_inflight_syncs.size() << ", ";
+ *ss << "waiting_syncs=" << m_sync_queue.size() << " ]";
+ }
+}
+
+template <typename I>
+const char** ImageSyncThrottler<I>::get_tracked_conf_keys() const {
+ static const char* KEYS[] = {
+ "rbd_mirror_concurrent_image_syncs",
+ NULL
+ };
+ return KEYS;
+}
+
+template <typename I>
+void ImageSyncThrottler<I>::handle_conf_change(
+ const struct md_config_t *conf,
+ const set<string> &changed) {
+ if (changed.count("rbd_mirror_concurrent_image_syncs")) {
+ set_max_concurrent_syncs(conf->rbd_mirror_concurrent_image_syncs);
+ }
+}
+
+} // namespace mirror
+} // namespace rbd
+
+template class rbd::mirror::ImageSyncThrottler<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 SUSE LINUX GmbH
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
+#define CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H
+
+#include <list>
+#include <map>
+#include "common/Mutex.h"
+#include "librbd/ImageCtx.h"
+#include "include/Context.h"
+#include "librbd/journal/TypeTraits.h"
+
+class CephContext;
+class Context;
+class ContextWQ;
+class SafeTimer;
+namespace journal { class Journaler; }
+namespace librbd { namespace journal { struct MirrorPeerClientMeta; } }
+
+namespace rbd {
+namespace mirror {
+
+template <typename> class ImageSync;
+
+class ProgressContext;
+
+/**
+ * Manage concurrent image-syncs
+ */
+template <typename ImageCtxT = librbd::ImageCtx>
+class ImageSyncThrottler : public md_config_obs_t {
+public:
+
+ typedef librbd::journal::TypeTraits<ImageCtxT> TypeTraits;
+ typedef typename TypeTraits::Journaler Journaler;
+ typedef librbd::journal::MirrorPeerClientMeta MirrorPeerClientMeta;
+
+ ImageSyncThrottler();
+ ~ImageSyncThrottler();
+ ImageSyncThrottler(const ImageSyncThrottler&) = delete;
+ ImageSyncThrottler& operator=(const ImageSyncThrottler&) = delete;
+
+ void start_sync(ImageCtxT *local_image_ctx,
+ ImageCtxT *remote_image_ctx, SafeTimer *timer,
+ Mutex *timer_lock, const std::string &mirror_uuid,
+ Journaler *journaler, MirrorPeerClientMeta *client_meta,
+ ContextWQ *work_queue, Context *on_finish,
+ ProgressContext *progress_ctx = nullptr);
+
+ void cancel_sync(const std::string& mirror_uuid);
+
+ void set_max_concurrent_syncs(uint32_t max);
+
+ void print_status(Formatter *f, std::stringstream *ss);
+
+private:
+
+ struct C_SyncHolder : public Context {
+ ImageSyncThrottler<ImageCtxT> *m_sync_throttler;
+ std::string m_local_image_id;
+ ImageSync<ImageCtxT> *m_sync;
+ Context *m_on_finish;
+
+ C_SyncHolder(ImageSyncThrottler<ImageCtxT> *sync_throttler,
+ const std::string& local_image_id, Context *on_finish)
+ : m_sync_throttler(sync_throttler), m_local_image_id(local_image_id),
+ m_sync(nullptr), m_on_finish(on_finish) {}
+
+ virtual void finish(int r) {
+ m_sync_throttler->handle_sync_finished(this);
+ m_on_finish->complete(r);
+ }
+ };
+
+ void handle_sync_finished(C_SyncHolder *sync_holder);
+
+ const char **get_tracked_conf_keys() const;
+ void handle_conf_change(const struct md_config_t *conf,
+ const std::set<std::string> &changed);
+
+ uint32_t m_max_concurrent_syncs;
+ Mutex m_lock;
+ std::list<C_SyncHolder *> m_sync_queue;
+ std::map<std::string, C_SyncHolder *> m_inflight_syncs;
+
+};
+
+} // namespace mirror
+} // namespace rbd
+
+#endif // CEPH_RBD_MIRROR_IMAGE_SYNC_THROTTLER_H