"mgr_stats_threshold.")
.set_min_max((int64_t)PerfCountersBuilder::PRIO_DEBUGONLY,
(int64_t)PerfCountersBuilder::PRIO_CRITICAL + 1),
+
+ Option("rbd_mirror_memory_autotune", Option::TYPE_BOOL, Option::LEVEL_DEV)
+ .set_default(true)
+ .add_see_also("rbd_mirror_memory_target")
+ .set_description("Automatically tune the ratio of caches while respecting min values."),
+
+ Option("rbd_mirror_memory_target", Option::TYPE_SIZE, Option::LEVEL_BASIC)
+ .set_default(4_G)
+ .add_see_also("rbd_mirror_memory_autotune")
+ .set_description("When tcmalloc and cache autotuning is enabled, try to keep this many bytes mapped in memory."),
+
+ Option("rbd_mirror_memory_base", Option::TYPE_SIZE, Option::LEVEL_DEV)
+ .set_default(768_M)
+ .add_see_also("rbd_mirror_memory_autotune")
+ .set_description("When tcmalloc and cache autotuning is enabled, estimate the minimum amount of memory in bytes the rbd-mirror daemon will need."),
+
+ Option("rbd_mirror_memory_expected_fragmentation", Option::TYPE_FLOAT, Option::LEVEL_DEV)
+ .set_default(0.15)
+ .set_min_max(0.0, 1.0)
+ .add_see_also("rbd_mirror_memory_autotune")
+ .set_description("When tcmalloc and cache autotuning is enabled, estimate the percent of memory fragmentation."),
+
+ Option("rbd_mirror_memory_cache_min", Option::TYPE_SIZE, Option::LEVEL_DEV)
+ .set_default(128_M)
+ .add_see_also("rbd_mirror_memory_autotune")
+ .set_description("When tcmalloc and cache autotuning is enabled, set the minimum amount of memory used for cache."),
+
+ Option("rbd_mirror_memory_cache_resize_interval", Option::TYPE_FLOAT, Option::LEVEL_DEV)
+ .set_default(5)
+ .add_see_also("rbd_mirror_memory_autotune")
+ .set_description("When tcmalloc and cache autotuning is enabled, wait this many seconds between resizing caches."),
+
+ Option("rbd_mirror_memory_cache_autotune_interval", Option::TYPE_FLOAT, Option::LEVEL_DEV)
+ .set_default(30)
+ .add_see_also("rbd_mirror_memory_autotune")
+ .set_description("The number of seconds to wait between rebalances when cache autotune is enabled."),
});
}
"global image id",
"local mirror uuid",
"local image id", {},
- &remote_mirror_uuid,
+ nullptr, &remote_mirror_uuid,
&remote_image_id,
&remote_journaler,
&client_state, &client_meta,
"global image id",
"local mirror uuid",
"local image id", {},
- &remote_mirror_uuid,
+ nullptr, &remote_mirror_uuid,
&remote_image_id,
&remote_journaler,
&client_state, &client_meta,
m_remote_io_ctx,
"global image id",
"local mirror uuid",
- "", {},
+ "", {}, nullptr,
&remote_mirror_uuid,
&remote_image_id,
&remote_journaler,
m_remote_io_ctx,
"global image id",
"local mirror uuid",
- "", {},
+ "", {}, nullptr,
&remote_mirror_uuid,
&remote_image_id,
&remote_journaler,
"global image id",
"local mirror uuid",
"local image id", {},
- &remote_mirror_uuid,
+ nullptr, &remote_mirror_uuid,
&remote_image_id,
&remote_journaler,
&client_state, &client_meta,
"global image id",
"local mirror uuid",
"local image id", {},
- &remote_mirror_uuid,
+ nullptr, &remote_mirror_uuid,
&remote_image_id,
&remote_journaler,
&client_state, &client_meta,
template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
void create_replayer() {
m_replayer = new ImageReplayerT(
- m_threads.get(), m_instance_watcher,
+ m_threads.get(), m_instance_watcher, nullptr,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
m_local_mirror_uuid, m_local_ioctx.get_id(), m_global_image_id);
m_replayer->add_peer("peer uuid", m_remote_ioctx);
const std::string &local_mirror_uuid,
const std::string &local_image_id,
const journal::Settings &settings,
+ journal::CacheManagerHandler *cache_manager_handler,
std::string *remote_mirror_uuid,
std::string *remote_image_id,
::journal::MockJournalerProxy **remote_journaler,
void create_image_replayer(MockThreads &mock_threads) {
m_image_replayer = new MockImageReplayer(
- &mock_threads, &m_instance_watcher,
+ &mock_threads, &m_instance_watcher, nullptr,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
"local_mirror_uuid", m_local_io_ctx.get_id(), "global image id");
m_image_replayer->add_peer("peer_uuid", m_remote_io_ctx);
static ImageReplayer *create(
Threads<librbd::MockTestImageCtx> *threads,
InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
+ journal::CacheManagerHandler *cache_manager_handler,
RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
const std::string &global_image_id) {
ceph_assert(s_instance != nullptr);
MockInstanceWatcher mock_instance_watcher;
MockImageReplayer mock_image_replayer;
MockInstanceReplayer instance_replayer(
- &mock_threads, &mock_service_daemon,
+ &mock_threads, &mock_service_daemon, nullptr,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
"local_mirror_uuid", m_local_io_ctx.get_id());
std::string global_image_id("global_image_id");
MockInstanceWatcher mock_instance_watcher;
MockImageReplayer mock_image_replayer;
MockInstanceReplayer instance_replayer(
- &mock_threads, &mock_service_daemon,
+ &mock_threads, &mock_service_daemon, nullptr,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
"local_mirror_uuid", m_local_io_ctx.get_id());
std::string global_image_id("global_image_id");
MockInstanceWatcher mock_instance_watcher;
MockImageReplayer mock_image_replayer;
MockInstanceReplayer instance_replayer(
- &mock_threads, &mock_service_daemon,
+ &mock_threads, &mock_service_daemon, nullptr,
rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
"local_mirror_uuid", m_local_io_ctx.get_id());
std::string global_image_id("global_image_id");
static InstanceReplayer* create(Threads<librbd::MockTestImageCtx> *threads,
ServiceDaemon<librbd::MockTestImageCtx> *service_daemon,
+ journal::CacheManagerHandler *cache_manager_handler,
RadosRef rados, const std::string& uuid,
int64_t pool_id) {
ceph_assert(s_instance != nullptr);
expect_leader_watcher_init(*mock_leader_watcher, 0);
MockThreads mock_threads(m_threads);
- MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon,
+ MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr,
m_local_io_ctx.get_id(), peer_spec, {});
pool_replayer.init();
service_daemon/Types.cc)
add_library(rbd_mirror_internal STATIC
- ${rbd_mirror_internal})
+ ${rbd_mirror_internal}
+ $<TARGET_OBJECTS:common_prioritycache_obj>)
add_executable(rbd-mirror
main.cc)
cls_lock_client
cls_journal_client
global
+ heap_profiler
${ALLOC_LIBS})
install(TARGETS rbd-mirror DESTINATION bin)
}
void handle_complete(int r) override {
std::stringstream ss;
- if (r < 0) {
+ if (r == -ENOMEM) {
+ ss << "not enough memory in autotune cache";
+ } else if (r < 0) {
ss << "replay completed with error: " << cpp_strerror(r);
}
replayer->handle_replay_complete(r, ss.str());
}
template <typename I>
-ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
- InstanceWatcher<I> *instance_watcher,
- RadosRef local,
- const std::string &local_mirror_uuid,
- int64_t local_pool_id,
- const std::string &global_image_id) :
+ImageReplayer<I>::ImageReplayer(
+ Threads<I> *threads, InstanceWatcher<I> *instance_watcher,
+ journal::CacheManagerHandler *cache_manager_handler, RadosRef local,
+ const std::string &local_mirror_uuid, int64_t local_pool_id,
+ const std::string &global_image_id) :
m_threads(threads),
m_instance_watcher(instance_watcher),
+ m_cache_manager_handler(cache_manager_handler),
m_local(local),
m_local_mirror_uuid(local_mirror_uuid),
m_local_pool_id(local_pool_id),
ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
auto req = PrepareRemoteImageRequest<I>::create(
m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
- m_local_image_id, journal_settings, &m_remote_image.mirror_uuid,
- &m_remote_image.image_id, &m_remote_journaler, &m_client_state,
- &m_client_meta, ctx);
+ m_local_image_id, journal_settings, m_cache_manager_handler,
+ &m_remote_image.mirror_uuid, &m_remote_image.image_id, &m_remote_journaler,
+ &m_client_state, &m_client_meta, ctx);
req->send();
}
namespace journal {
+struct CacheManagerHandler;
+
class Journaler;
class ReplayHandler;
public:
static ImageReplayer *create(
Threads<ImageCtxT> *threads, InstanceWatcher<ImageCtxT> *instance_watcher,
- RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
+ journal::CacheManagerHandler *cache_manager_handler, RadosRef local,
+ const std::string &local_mirror_uuid, int64_t local_pool_id,
const std::string &global_image_id) {
- return new ImageReplayer(threads, instance_watcher, local,
- local_mirror_uuid, local_pool_id, global_image_id);
+ return new ImageReplayer(threads, instance_watcher, cache_manager_handler,
+ local, local_mirror_uuid, local_pool_id,
+ global_image_id);
}
void destroy() {
delete this;
ImageReplayer(Threads<ImageCtxT> *threads,
InstanceWatcher<ImageCtxT> *instance_watcher,
+ journal::CacheManagerHandler *cache_manager_handler,
RadosRef local, const std::string &local_mirror_uuid,
int64_t local_pool_id, const std::string &global_image_id);
virtual ~ImageReplayer();
Threads<ImageCtxT> *m_threads;
InstanceWatcher<ImageCtxT> *m_instance_watcher;
+ journal::CacheManagerHandler *m_cache_manager_handler;
Peers m_peers;
RemoteImage m_remote_image;
template <typename I>
InstanceReplayer<I>::InstanceReplayer(
Threads<I> *threads, ServiceDaemon<I>* service_daemon,
- RadosRef local_rados, const std::string &local_mirror_uuid,
- int64_t local_pool_id)
+ journal::CacheManagerHandler *cache_manager_handler, RadosRef local_rados,
+ const std::string &local_mirror_uuid, int64_t local_pool_id)
: m_threads(threads), m_service_daemon(service_daemon),
- m_local_rados(local_rados), m_local_mirror_uuid(local_mirror_uuid),
- m_local_pool_id(local_pool_id),
+ m_cache_manager_handler(cache_manager_handler), m_local_rados(local_rados),
+ m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
}
auto it = m_image_replayers.find(global_image_id);
if (it == m_image_replayers.end()) {
auto image_replayer = ImageReplayer<I>::create(
- m_threads, instance_watcher, m_local_rados,
+ m_threads, instance_watcher, m_cache_manager_handler, m_local_rados,
m_local_mirror_uuid, m_local_pool_id, global_image_id);
dout(10) << global_image_id << ": creating replayer " << image_replayer
#include "common/Mutex.h"
#include "tools/rbd_mirror/Types.h"
+namespace journal { struct CacheManagerHandler; }
+
namespace librbd { class ImageCtx; }
namespace rbd {
public:
static InstanceReplayer* create(
Threads<ImageCtxT> *threads,
- ServiceDaemon<ImageCtxT>* service_daemon,
+ ServiceDaemon<ImageCtxT> *service_daemon,
+ journal::CacheManagerHandler *cache_manager_handler,
RadosRef local_rados, const std::string &local_mirror_uuid,
int64_t local_pool_id) {
- return new InstanceReplayer(threads, service_daemon, local_rados,
- local_mirror_uuid, local_pool_id);
+ return new InstanceReplayer(threads, service_daemon, cache_manager_handler,
+ local_rados, local_mirror_uuid, local_pool_id);
}
void destroy() {
delete this;
}
InstanceReplayer(Threads<ImageCtxT> *threads,
- ServiceDaemon<ImageCtxT>* service_daemon,
+ ServiceDaemon<ImageCtxT> *service_daemon,
+ journal::CacheManagerHandler *cache_manager_handler,
RadosRef local_rados, const std::string &local_mirror_uuid,
int64_t local_pool_id);
~InstanceReplayer();
*/
Threads<ImageCtxT> *m_threads;
- ServiceDaemon<ImageCtxT>* m_service_daemon;
+ ServiceDaemon<ImageCtxT> *m_service_daemon;
+ journal::CacheManagerHandler *m_cache_manager_handler;
RadosRef m_local_rados;
std::string m_local_mirror_uuid;
int64_t m_local_pool_id;
#include <boost/range/adaptor/map.hpp>
#include "common/Formatter.h"
+#include "common/PriorityCache.h"
#include "common/admin_socket.h"
#include "common/debug.h"
#include "common/errno.h"
+#include "journal/Types.h"
#include "librbd/ImageCtx.h"
+#include "perfglue/heap_profiler.h"
#include "Mirror.h"
#include "ServiceDaemon.h"
#include "Threads.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd_mirror
-#undef dout_prefix
-#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
- << __func__ << ": "
using std::list;
using std::map;
Mirror *mirror;
};
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \
+ << m_name << " " << __func__ << ": "
+
+struct PriCache : public PriorityCache::PriCache {
+ std::string m_name;
+ int64_t m_base_cache_max_size;
+ int64_t m_extra_cache_max_size;
+
+ PriorityCache::Priority m_base_cache_pri = PriorityCache::Priority::PRI10;
+ PriorityCache::Priority m_extra_cache_pri = PriorityCache::Priority::PRI10;
+ int64_t m_base_cache_bytes = 0;
+ int64_t m_extra_cache_bytes = 0;
+ int64_t m_committed_bytes = 0;
+ double m_cache_ratio = 0;
+
+ PriCache(const std::string &name, uint64_t min_size, uint64_t max_size)
+ : m_name(name), m_base_cache_max_size(min_size),
+ m_extra_cache_max_size(max_size - min_size) {
+ ceph_assert(max_size >= min_size);
+ }
+
+ void prioritize() {
+ if (m_base_cache_pri == PriorityCache::Priority::PRI0) {
+ return;
+ }
+ auto pri = static_cast<uint8_t>(m_base_cache_pri);
+ m_base_cache_pri = static_cast<PriorityCache::Priority>(--pri);
+
+ dout(30) << m_base_cache_pri << dendl;
+ }
+
+ int64_t request_cache_bytes(PriorityCache::Priority pri,
+ uint64_t total_cache) const override {
+ int64_t cache_bytes = 0;
+
+ if (pri == m_base_cache_pri) {
+ cache_bytes += m_base_cache_max_size;
+ }
+ if (pri == m_extra_cache_pri) {
+ cache_bytes += m_extra_cache_max_size;
+ }
+
+ dout(30) << cache_bytes << dendl;
+
+ return cache_bytes;
+ }
+
+ int64_t get_cache_bytes(PriorityCache::Priority pri) const override {
+ int64_t cache_bytes = 0;
+
+ if (pri == m_base_cache_pri) {
+ cache_bytes += m_base_cache_bytes;
+ }
+ if (pri == m_extra_cache_pri) {
+ cache_bytes += m_extra_cache_bytes;
+ }
+
+ dout(30) << "pri=" << pri << " " << cache_bytes << dendl;
+
+ return cache_bytes;
+ }
+
+ int64_t get_cache_bytes() const override {
+ auto cache_bytes = m_base_cache_bytes + m_extra_cache_bytes;
+
+ dout(30) << m_base_cache_bytes << "+" << m_extra_cache_bytes << "="
+ << cache_bytes << dendl;
+
+ return cache_bytes;
+ }
+
+ void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
+ ceph_assert(bytes >= 0);
+ ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri ||
+ bytes == 0);
+
+ dout(30) << "pri=" << pri << " " << bytes << dendl;
+
+ if (pri == m_base_cache_pri) {
+ m_base_cache_bytes = std::min(m_base_cache_max_size, bytes);
+ bytes -= std::min(m_base_cache_bytes, bytes);
+ }
+
+ if (pri == m_extra_cache_pri) {
+ m_extra_cache_bytes = bytes;
+ }
+ }
+
+ void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
+ ceph_assert(bytes >= 0);
+ ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri);
+
+ dout(30) << "pri=" << pri << " " << bytes << dendl;
+
+ if (pri == m_base_cache_pri) {
+ ceph_assert(m_base_cache_bytes <= m_base_cache_max_size);
+
+ auto chunk = std::min(m_base_cache_max_size - m_base_cache_bytes, bytes);
+ m_base_cache_bytes += chunk;
+ bytes -= chunk;
+ }
+
+ if (pri == m_extra_cache_pri) {
+ m_extra_cache_bytes += bytes;
+ }
+ }
+
+ int64_t commit_cache_size(uint64_t total_cache) override {
+ m_committed_bytes = p2roundup<int64_t>(get_cache_bytes(), 4096);
+
+ dout(30) << m_committed_bytes << dendl;
+
+ return m_committed_bytes;
+ }
+
+ int64_t get_committed_size() const override {
+ dout(30) << m_committed_bytes << dendl;
+
+ return m_committed_bytes;
+ }
+
+ double get_cache_ratio() const override {
+ dout(30) << m_cache_ratio << dendl;
+
+ return m_cache_ratio;
+ }
+
+ void set_cache_ratio(double ratio) override {
+ dout(30) << m_cache_ratio << dendl;
+
+ m_cache_ratio = ratio;
+ }
+
+ std::string get_cache_name() const override {
+ return m_name;
+ }
+};
+
} // anonymous namespace
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
+ << __func__ << ": "
+
class MirrorAdminSocketHook : public AdminSocketHook {
public:
MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
Commands commands;
};
+class CacheManagerHandler : public journal::CacheManagerHandler {
+public:
+ CacheManagerHandler(CephContext *cct)
+ : m_cct(cct), m_lock("rbd::mirror::CacheManagerHandler") {
+
+ if (!m_cct->_conf.get_val<bool>("rbd_mirror_memory_autotune")) {
+ return;
+ }
+
+ uint64_t base = m_cct->_conf.get_val<Option::size_t>(
+ "rbd_mirror_memory_base");
+ double fragmentation = m_cct->_conf.get_val<double>(
+ "rbd_mirror_memory_expected_fragmentation");
+ uint64_t target = m_cct->_conf.get_val<Option::size_t>(
+ "rbd_mirror_memory_target");
+ uint64_t min = m_cct->_conf.get_val<Option::size_t>(
+ "rbd_mirror_memory_cache_min");
+ uint64_t max = min;
+
+ // When setting the maximum amount of memory to use for cache, first
+ // assume some base amount of memory for the daemon and then fudge in
+ // some overhead for fragmentation that scales with cache usage.
+ uint64_t ltarget = (1.0 - fragmentation) * target;
+ if (ltarget > base + min) {
+ max = ltarget - base;
+ }
+
+ m_next_balance = ceph_clock_now();
+ m_next_resize = ceph_clock_now();
+
+ m_cache_manager = std::make_unique<PriorityCache::Manager>(
+ m_cct, min, max, target, false);
+ }
+
+ ~CacheManagerHandler() {
+ Mutex::Locker locker(m_lock);
+
+ ceph_assert(m_caches.empty());
+ }
+
+ void register_cache(const std::string &cache_name,
+ uint64_t min_size, uint64_t max_size,
+ journal::CacheRebalanceHandler* handler) override {
+ if (!m_cache_manager) {
+ handler->handle_cache_rebalanced(max_size);
+ return;
+ }
+
+ dout(20) << cache_name << " min_size=" << min_size << " max_size="
+ << max_size << " handler=" << handler << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ auto p = m_caches.insert(
+ {cache_name, {cache_name, min_size, max_size, handler}});
+ ceph_assert(p.second == true);
+
+ m_cache_manager->insert(cache_name, p.first->second.pri_cache, false);
+ m_next_balance = ceph_clock_now();
+ }
+
+ void unregister_cache(const std::string &cache_name) override {
+ if (!m_cache_manager) {
+ return;
+ }
+
+ dout(20) << cache_name << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ auto it = m_caches.find(cache_name);
+ ceph_assert(it != m_caches.end());
+
+ m_cache_manager->erase(cache_name);
+ m_caches.erase(it);
+ m_next_balance = ceph_clock_now();
+ }
+
+ void run_cache_manager() {
+ if (!m_cache_manager) {
+ return;
+ }
+
+ Mutex::Locker locker(m_lock);
+
+ // Before we trim, check and see if it's time to rebalance/resize.
+ auto autotune_interval = m_cct->_conf.get_val<double>(
+ "rbd_mirror_memory_cache_autotune_interval");
+ auto resize_interval = m_cct->_conf.get_val<double>(
+ "rbd_mirror_memory_cache_resize_interval");
+
+ utime_t now = ceph_clock_now();
+
+ if (autotune_interval > 0 && m_next_balance <= now) {
+ dout(20) << "balance" << dendl;
+ m_cache_manager->balance();
+
+ for (auto &it : m_caches) {
+ auto pri_cache = static_cast<PriCache *>(it.second.pri_cache.get());
+ auto new_cache_bytes = pri_cache->get_cache_bytes();
+ it.second.handler->handle_cache_rebalanced(new_cache_bytes);
+ pri_cache->prioritize();
+ }
+
+ m_next_balance = ceph_clock_now();
+ m_next_balance += autotune_interval;
+ }
+
+ if (resize_interval > 0 && m_next_resize < now) {
+ if (ceph_using_tcmalloc()) {
+ dout(20) << "tune memory" << dendl;
+ m_cache_manager->tune_memory();
+ }
+
+ m_next_resize = ceph_clock_now();
+ m_next_resize += resize_interval;
+ }
+ }
+
+private:
+ struct Cache {
+ std::shared_ptr<PriorityCache::PriCache> pri_cache;
+ journal::CacheRebalanceHandler *handler;
+
+ Cache(const std::string name, uint64_t min_size, uint64_t max_size,
+ journal::CacheRebalanceHandler *handler)
+ : pri_cache(new PriCache(name, min_size, max_size)), handler(handler) {
+ }
+ };
+
+ CephContext *m_cct;
+
+ mutable Mutex m_lock;
+ std::unique_ptr<PriorityCache::Manager> m_cache_manager;
+ std::map<std::string, Cache> m_caches;
+
+ utime_t m_next_balance;
+ utime_t m_next_resize;
+};
+
Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
m_cct(cct),
m_args(args),
m_lock("rbd::mirror::Mirror"),
m_local(new librados::Rados()),
+ m_cache_manager_handler(new CacheManagerHandler(cct)),
m_asok_hook(new MirrorAdminSocketHook(cct, this))
{
m_threads =
void Mirror::run()
{
dout(20) << "enter" << dendl;
+
+ utime_t next_refresh_pools = ceph_clock_now();
+
while (!m_stopping) {
- m_local_cluster_watcher->refresh_pools();
+ utime_t now = ceph_clock_now();
+ bool refresh_pools = next_refresh_pools <= now;
+ if (refresh_pools) {
+ m_local_cluster_watcher->refresh_pools();
+ next_refresh_pools = ceph_clock_now();
+ next_refresh_pools += m_cct->_conf.get_val<uint64_t>(
+ "rbd_mirror_pool_replayers_refresh_interval");
+ }
Mutex::Locker l(m_lock);
if (!m_manual_stop) {
- update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
+ if (refresh_pools) {
+ update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
+ }
+ m_cache_manager_handler->run_cache_manager();
}
- m_cond.WaitInterval(
- m_lock,
- utime_t(m_cct->_conf.get_val<uint64_t>("rbd_mirror_pool_replayers_refresh_interval"), 0));
+ m_cond.WaitInterval(m_lock, {1, 0});
}
// stop all pool replayers in parallel
}
} else {
dout(20) << "starting pool replayer for " << peer << dendl;
- unique_ptr<PoolReplayer<>> pool_replayer(new PoolReplayer<>(
- m_threads, m_service_daemon.get(), kv.first, peer, m_args));
+ unique_ptr<PoolReplayer<>> pool_replayer(
+ new PoolReplayer<>(m_threads, m_service_daemon.get(),
+ m_cache_manager_handler.get(), kv.first, peer,
+ m_args));
// TODO: make async
pool_replayer->init();
#include "common/ceph_context.h"
#include "common/Mutex.h"
#include "include/rados/librados.hpp"
+#include "include/utime.h"
#include "ClusterWatcher.h"
#include "PoolReplayer.h"
#include "tools/rbd_mirror/Types.h"
#include <memory>
#include <atomic>
+namespace journal { class CacheManagerHandler; }
+
namespace librbd { struct ImageCtx; }
namespace rbd {
template <typename> struct ServiceDaemon;
template <typename> struct Threads;
+class CacheManagerHandler;
class MirrorAdminSocketHook;
/**
void update_pool_replayers(const PoolPeers &pool_peers);
+ void create_cache_manager();
+ void run_cache_manager(utime_t *next_run_interval);
+
CephContext *m_cct;
std::vector<const char*> m_args;
Threads<librbd::ImageCtx> *m_threads = nullptr;
// monitor local cluster for config changes in peers
std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
+ std::unique_ptr<CacheManagerHandler> m_cache_manager_handler;
std::map<PoolPeer, std::unique_ptr<PoolReplayer<>>> m_pool_replayers;
std::atomic<bool> m_stopping = { false };
bool m_manual_stop = false;
} // anonymous namespace
template <typename I>
-PoolReplayer<I>::PoolReplayer(Threads<I> *threads,
- ServiceDaemon<I>* service_daemon,
- int64_t local_pool_id, const PeerSpec &peer,
- const std::vector<const char*> &args) :
+PoolReplayer<I>::PoolReplayer(
+ Threads<I> *threads, ServiceDaemon<I> *service_daemon,
+ journal::CacheManagerHandler *cache_manager_handler, int64_t local_pool_id,
+ const PeerSpec &peer, const std::vector<const char*> &args) :
m_threads(threads),
m_service_daemon(service_daemon),
+ m_cache_manager_handler(cache_manager_handler),
m_local_pool_id(local_pool_id),
m_peer(peer),
m_args(args),
dout(10) << "connected to " << m_peer << dendl;
m_instance_replayer.reset(InstanceReplayer<I>::create(
- m_threads, m_service_daemon, m_local_rados, local_mirror_uuid,
- m_local_pool_id));
+ m_threads, m_service_daemon, m_cache_manager_handler, m_local_rados,
+ local_mirror_uuid, m_local_pool_id));
m_instance_replayer->init();
m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
class AdminSocketHook;
+namespace journal { struct CacheManagerHandler; }
+
namespace librbd { class ImageCtx; }
namespace rbd {
class PoolReplayer {
public:
PoolReplayer(Threads<ImageCtxT> *threads,
- ServiceDaemon<ImageCtxT>* service_daemon,
+ ServiceDaemon<ImageCtxT> *service_daemon,
+ journal::CacheManagerHandler *cache_manager_handler,
int64_t local_pool_id, const PeerSpec &peer,
const std::vector<const char*> &args);
~PoolReplayer();
void handle_instances_removed(const InstanceIds &instance_ids);
Threads<ImageCtxT> *m_threads;
- ServiceDaemon<ImageCtxT>* m_service_daemon;
+ ServiceDaemon<ImageCtxT> *m_service_daemon;
+ journal::CacheManagerHandler *m_cache_manager_handler;
int64_t m_local_pool_id = -1;
PeerSpec m_peer;
std::vector<const char*> m_args;
*m_remote_journaler = new Journaler(m_threads->work_queue, m_threads->timer,
&m_threads->timer_lock, m_remote_io_ctx,
*m_remote_image_id, m_local_mirror_uuid,
- m_journal_settings, nullptr);
+ m_journal_settings,
+ m_cache_manager_handler);
Context *ctx = create_async_context_callback(
m_threads->work_queue, create_context_callback<
namespace journal { class Journaler; }
namespace journal { class Settings; }
+namespace journal { struct CacheManagerHandler; }
namespace librbd { struct ImageCtx; }
namespace librbd { namespace journal { struct MirrorPeerClientMeta; } }
const std::string &local_mirror_uuid,
const std::string &local_image_id,
const journal::Settings &settings,
+ journal::CacheManagerHandler *cache_manager_handler,
std::string *remote_mirror_uuid,
std::string *remote_image_id,
Journaler **remote_journaler,
return new PrepareRemoteImageRequest(threads, remote_io_ctx,
global_image_id, local_mirror_uuid,
local_image_id, settings,
- remote_mirror_uuid, remote_image_id,
- remote_journaler, client_state,
- client_meta, on_finish);
+ cache_manager_handler,
+ remote_mirror_uuid,
+ remote_image_id, remote_journaler,
+ client_state, client_meta, on_finish);
}
PrepareRemoteImageRequest(Threads<ImageCtxT> *threads,
- librados::IoCtx &remote_io_ctx,
- const std::string &global_image_id,
- const std::string &local_mirror_uuid,
- const std::string &local_image_id,
- const journal::Settings &journal_settings,
- std::string *remote_mirror_uuid,
- std::string *remote_image_id,
- Journaler **remote_journaler,
- cls::journal::ClientState *client_state,
- MirrorPeerClientMeta *client_meta,
- Context *on_finish)
+ librados::IoCtx &remote_io_ctx,
+ const std::string &global_image_id,
+ const std::string &local_mirror_uuid,
+ const std::string &local_image_id,
+ const journal::Settings &journal_settings,
+ journal::CacheManagerHandler *cache_manager_handler,
+ std::string *remote_mirror_uuid,
+ std::string *remote_image_id,
+ Journaler **remote_journaler,
+ cls::journal::ClientState *client_state,
+ MirrorPeerClientMeta *client_meta,
+ Context *on_finish)
: m_threads(threads), m_remote_io_ctx(remote_io_ctx),
m_global_image_id(global_image_id),
m_local_mirror_uuid(local_mirror_uuid), m_local_image_id(local_image_id),
m_journal_settings(journal_settings),
+ m_cache_manager_handler(cache_manager_handler),
m_remote_mirror_uuid(remote_mirror_uuid),
m_remote_image_id(remote_image_id),
m_remote_journaler(remote_journaler), m_client_state(client_state),
std::string m_local_mirror_uuid;
std::string m_local_image_id;
journal::Settings m_journal_settings;
+ journal::CacheManagerHandler *m_cache_manager_handler;
std::string *m_remote_mirror_uuid;
std::string *m_remote_image_id;
Journaler **m_remote_journaler;