]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-mirror: auto-tune journal fetch params based on memory target
authorMykola Golub <mgolub@suse.com>
Fri, 17 May 2019 15:01:04 +0000 (16:01 +0100)
committerMykola Golub <mgolub@suse.com>
Tue, 11 Jun 2019 07:00:27 +0000 (08:00 +0100)
Signed-off-by: Mykola Golub <mgolub@suse.com>
17 files changed:
src/common/options.cc
src/test/rbd_mirror/image_replayer/test_mock_PrepareRemoteImageRequest.cc
src/test/rbd_mirror/test_ImageReplayer.cc
src/test/rbd_mirror/test_mock_ImageReplayer.cc
src/test/rbd_mirror/test_mock_InstanceReplayer.cc
src/test/rbd_mirror/test_mock_PoolReplayer.cc
src/tools/rbd_mirror/CMakeLists.txt
src/tools/rbd_mirror/ImageReplayer.cc
src/tools/rbd_mirror/ImageReplayer.h
src/tools/rbd_mirror/InstanceReplayer.cc
src/tools/rbd_mirror/InstanceReplayer.h
src/tools/rbd_mirror/Mirror.cc
src/tools/rbd_mirror/Mirror.h
src/tools/rbd_mirror/PoolReplayer.cc
src/tools/rbd_mirror/PoolReplayer.h
src/tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.cc
src/tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h

index b1d6b2b7c737e7022585366588174e50fbd48d52..b5b26f9915ec18424be46f85058841c7bf769b02 100644 (file)
@@ -7484,6 +7484,42 @@ static std::vector<Option> get_rbd_mirror_options() {
                           "mgr_stats_threshold.")
     .set_min_max((int64_t)PerfCountersBuilder::PRIO_DEBUGONLY,
                  (int64_t)PerfCountersBuilder::PRIO_CRITICAL + 1),
+
+    Option("rbd_mirror_memory_autotune", Option::TYPE_BOOL, Option::LEVEL_DEV)
+    .set_default(true)
+    .add_see_also("rbd_mirror_memory_target")
+    .set_description("Automatically tune the ratio of caches while respecting min values."),
+
+    Option("rbd_mirror_memory_target", Option::TYPE_SIZE, Option::LEVEL_BASIC)
+    .set_default(4_G)
+    .add_see_also("rbd_mirror_memory_autotune")
+    .set_description("When tcmalloc and cache autotuning is enabled, try to keep this many bytes mapped in memory."),
+
+    Option("rbd_mirror_memory_base", Option::TYPE_SIZE, Option::LEVEL_DEV)
+    .set_default(768_M)
+    .add_see_also("rbd_mirror_memory_autotune")
+    .set_description("When tcmalloc and cache autotuning is enabled, estimate the minimum amount of memory in bytes the rbd-mirror daemon will need."),
+
+    Option("rbd_mirror_memory_expected_fragmentation", Option::TYPE_FLOAT, Option::LEVEL_DEV)
+    .set_default(0.15)
+    .set_min_max(0.0, 1.0)
+    .add_see_also("rbd_mirror_memory_autotune")
+    .set_description("When tcmalloc and cache autotuning is enabled, estimate the percent of memory fragmentation."),
+
+    Option("rbd_mirror_memory_cache_min", Option::TYPE_SIZE, Option::LEVEL_DEV)
+    .set_default(128_M)
+    .add_see_also("rbd_mirror_memory_autotune")
+    .set_description("When tcmalloc and cache autotuning is enabled, set the minimum amount of memory used for cache."),
+
+    Option("rbd_mirror_memory_cache_resize_interval", Option::TYPE_FLOAT, Option::LEVEL_DEV)
+    .set_default(5)
+    .add_see_also("rbd_mirror_memory_autotune")
+    .set_description("When tcmalloc and cache autotuning is enabled, wait this many seconds between resizing caches."),
+
+    Option("rbd_mirror_memory_cache_autotune_interval", Option::TYPE_FLOAT, Option::LEVEL_DEV)
+    .set_default(30)
+    .add_see_also("rbd_mirror_memory_autotune")
+    .set_description("The number of seconds to wait between rebalances when cache autotune is enabled."),
   });
 }
 
index 1b957ed177f1b4058814a4cc2c8fe29f57147ee1..4e74df82e10d6d764bb13603bfda3930c8ad9a68 100644 (file)
@@ -180,7 +180,7 @@ TEST_F(TestMockImageReplayerPrepareRemoteImageRequest, Success) {
                                                    "global image id",
                                                    "local mirror uuid",
                                                    "local image id", {},
-                                                   &remote_mirror_uuid,
+                                                   nullptr, &remote_mirror_uuid,
                                                    &remote_image_id,
                                                    &remote_journaler,
                                                    &client_state, &client_meta,
@@ -228,7 +228,7 @@ TEST_F(TestMockImageReplayerPrepareRemoteImageRequest, SuccessNotRegistered) {
                                                    "global image id",
                                                    "local mirror uuid",
                                                    "local image id", {},
-                                                   &remote_mirror_uuid,
+                                                   nullptr, &remote_mirror_uuid,
                                                    &remote_image_id,
                                                    &remote_journaler,
                                                    &client_state, &client_meta,
@@ -260,7 +260,7 @@ TEST_F(TestMockImageReplayerPrepareRemoteImageRequest, MirrorUuidError) {
                                                    m_remote_io_ctx,
                                                    "global image id",
                                                    "local mirror uuid",
-                                                   "", {},
+                                                   "", {}, nullptr,
                                                    &remote_mirror_uuid,
                                                    &remote_image_id,
                                                    &remote_journaler,
@@ -292,7 +292,7 @@ TEST_F(TestMockImageReplayerPrepareRemoteImageRequest, MirrorImageIdError) {
                                                    m_remote_io_ctx,
                                                    "global image id",
                                                    "local mirror uuid",
-                                                   "", {},
+                                                   "", {}, nullptr,
                                                    &remote_mirror_uuid,
                                                    &remote_image_id,
                                                    &remote_journaler,
@@ -332,7 +332,7 @@ TEST_F(TestMockImageReplayerPrepareRemoteImageRequest, GetClientError) {
                                                    "global image id",
                                                    "local mirror uuid",
                                                    "local image id", {},
-                                                   &remote_mirror_uuid,
+                                                   nullptr, &remote_mirror_uuid,
                                                    &remote_image_id,
                                                    &remote_journaler,
                                                    &client_state, &client_meta,
@@ -378,7 +378,7 @@ TEST_F(TestMockImageReplayerPrepareRemoteImageRequest, RegisterClientError) {
                                                    "global image id",
                                                    "local mirror uuid",
                                                    "local image id", {},
-                                                   &remote_mirror_uuid,
+                                                   nullptr, &remote_mirror_uuid,
                                                    &remote_image_id,
                                                    &remote_journaler,
                                                    &client_state, &client_meta,
index 7d3caaa2472d59a12ee326888e140b9688fb4422..4911d5e69701d40782711be23e9f6bb0041b0917 100644 (file)
@@ -145,7 +145,7 @@ public:
   template <typename ImageReplayerT = rbd::mirror::ImageReplayer<> >
   void create_replayer() {
     m_replayer = new ImageReplayerT(
-        m_threads.get(), m_instance_watcher,
+        m_threads.get(), m_instance_watcher, nullptr,
         rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
         m_local_mirror_uuid, m_local_ioctx.get_id(), m_global_image_id);
     m_replayer->add_peer("peer uuid", m_remote_ioctx);
index 1bd330fd8a4529bf1dd28478089c86103dd5d642..e25b88712a582c0dc2977a205648da262ccec1eb 100644 (file)
@@ -166,6 +166,7 @@ struct PrepareRemoteImageRequest<librbd::MockTestImageCtx> {
                                            const std::string &local_mirror_uuid,
                                            const std::string &local_image_id,
                                            const journal::Settings &settings,
+                                           journal::CacheManagerHandler *cache_manager_handler,
                                            std::string *remote_mirror_uuid,
                                            std::string *remote_image_id,
                                            ::journal::MockJournalerProxy **remote_journaler,
@@ -602,7 +603,7 @@ public:
 
   void create_image_replayer(MockThreads &mock_threads) {
     m_image_replayer = new MockImageReplayer(
-      &mock_threads, &m_instance_watcher,
+      &mock_threads, &m_instance_watcher, nullptr,
       rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
       "local_mirror_uuid", m_local_io_ctx.get_id(), "global image id");
     m_image_replayer->add_peer("peer_uuid", m_remote_io_ctx);
index 79dce7ac29674b8549d73138f588260dc879f758..747644beb5d380f677f5190df6205a7de5ece0ae 100644 (file)
@@ -67,6 +67,7 @@ struct ImageReplayer<librbd::MockTestImageCtx> {
   static ImageReplayer *create(
     Threads<librbd::MockTestImageCtx> *threads,
     InstanceWatcher<librbd::MockTestImageCtx> *instance_watcher,
+    journal::CacheManagerHandler *cache_manager_handler,
     RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
     const std::string &global_image_id) {
     ceph_assert(s_instance != nullptr);
@@ -170,7 +171,7 @@ TEST_F(TestMockInstanceReplayer, AcquireReleaseImage) {
   MockInstanceWatcher mock_instance_watcher;
   MockImageReplayer mock_image_replayer;
   MockInstanceReplayer instance_replayer(
-    &mock_threads, &mock_service_daemon,
+    &mock_threads, &mock_service_daemon, nullptr,
     rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
     "local_mirror_uuid", m_local_io_ctx.get_id());
   std::string global_image_id("global_image_id");
@@ -239,7 +240,7 @@ TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) {
   MockInstanceWatcher mock_instance_watcher;
   MockImageReplayer mock_image_replayer;
   MockInstanceReplayer instance_replayer(
-    &mock_threads, &mock_service_daemon,
+    &mock_threads, &mock_service_daemon, nullptr,
     rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
     "local_mirror_uuid", m_local_io_ctx.get_id());
   std::string global_image_id("global_image_id");
@@ -311,7 +312,7 @@ TEST_F(TestMockInstanceReplayer, Reacquire) {
   MockInstanceWatcher mock_instance_watcher;
   MockImageReplayer mock_image_replayer;
   MockInstanceReplayer instance_replayer(
-    &mock_threads, &mock_service_daemon,
+    &mock_threads, &mock_service_daemon, nullptr,
     rbd::mirror::RadosRef(new librados::Rados(m_local_io_ctx)),
     "local_mirror_uuid", m_local_io_ctx.get_id());
   std::string global_image_id("global_image_id");
index 4ceb9be3c4762ca2e6991e65b1ca7e9fb84acced..6879e036a496d571fe879488c5b328f1e8531373 100644 (file)
@@ -110,6 +110,7 @@ struct InstanceReplayer<librbd::MockTestImageCtx> {
 
   static InstanceReplayer* create(Threads<librbd::MockTestImageCtx> *threads,
                                   ServiceDaemon<librbd::MockTestImageCtx> *service_daemon,
+                                  journal::CacheManagerHandler *cache_manager_handler,
                                   RadosRef rados, const std::string& uuid,
                                   int64_t pool_id) {
     ceph_assert(s_instance != nullptr);
@@ -428,7 +429,7 @@ TEST_F(TestMockPoolReplayer, ConfigKeyOverride) {
   expect_leader_watcher_init(*mock_leader_watcher, 0);
 
   MockThreads mock_threads(m_threads);
-  MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon,
+  MockPoolReplayer pool_replayer(&mock_threads, &mock_service_daemon, nullptr,
                                  m_local_io_ctx.get_id(), peer_spec, {});
   pool_replayer.init();
 
index 6184e418d97bf169aef1069fbbac75cae5572c73..8c4156bcb713c3c88cfb81d376a1e7dfb31e8c5d 100644 (file)
@@ -48,7 +48,8 @@ set(rbd_mirror_internal
   service_daemon/Types.cc)
 
 add_library(rbd_mirror_internal STATIC
-  ${rbd_mirror_internal})
+  ${rbd_mirror_internal}
+  $<TARGET_OBJECTS:common_prioritycache_obj>)
 
 add_executable(rbd-mirror
   main.cc)
@@ -65,5 +66,6 @@ target_link_libraries(rbd-mirror
   cls_lock_client
   cls_journal_client
   global
+  heap_profiler
   ${ALLOC_LIBS})
 install(TARGETS rbd-mirror DESTINATION bin)
index 621b99a5ece96cec6626306e6054e0ddb3e5dde9..ccf8ef86f049d25c22902b9dccabd85d065d1d82 100644 (file)
@@ -70,7 +70,9 @@ struct ReplayHandler : public ::journal::ReplayHandler {
   }
   void handle_complete(int r) override {
     std::stringstream ss;
-    if (r < 0) {
+    if (r == -ENOMEM) {
+      ss << "not enough memory in autotune cache";
+    } else if (r < 0) {
       ss << "replay completed with error: " << cpp_strerror(r);
     }
     replayer->handle_replay_complete(r, ss.str());
@@ -255,14 +257,14 @@ void ImageReplayer<I>::RemoteJournalerListener::handle_update(
 }
 
 template <typename I>
-ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
-                                InstanceWatcher<I> *instance_watcher,
-                                RadosRef local,
-                                const std::string &local_mirror_uuid,
-                                int64_t local_pool_id,
-                                const std::string &global_image_id) :
+ImageReplayer<I>::ImageReplayer(
+    Threads<I> *threads, InstanceWatcher<I> *instance_watcher,
+    journal::CacheManagerHandler *cache_manager_handler, RadosRef local,
+    const std::string &local_mirror_uuid, int64_t local_pool_id,
+    const std::string &global_image_id) :
   m_threads(threads),
   m_instance_watcher(instance_watcher),
+  m_cache_manager_handler(cache_manager_handler),
   m_local(local),
   m_local_mirror_uuid(local_mirror_uuid),
   m_local_pool_id(local_pool_id),
@@ -446,9 +448,9 @@ void ImageReplayer<I>::prepare_remote_image() {
     ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
   auto req = PrepareRemoteImageRequest<I>::create(
     m_threads, m_remote_image.io_ctx, m_global_image_id, m_local_mirror_uuid,
-    m_local_image_id, journal_settings, &m_remote_image.mirror_uuid,
-    &m_remote_image.image_id, &m_remote_journaler, &m_client_state,
-    &m_client_meta, ctx);
+    m_local_image_id, journal_settings, m_cache_manager_handler,
+    &m_remote_image.mirror_uuid, &m_remote_image.image_id, &m_remote_journaler,
+    &m_client_state, &m_client_meta, ctx);
   req->send();
 }
 
index 9af3e9611cd99d108ca07975169178019797afb9..2881a9de86de348b2afb2b5816309dd6af76516b 100644 (file)
@@ -33,6 +33,8 @@ class PerfCounters;
 
 namespace journal {
 
+struct CacheManagerHandler;
+
 class Journaler;
 class ReplayHandler;
 
@@ -63,10 +65,12 @@ class ImageReplayer {
 public:
   static ImageReplayer *create(
     Threads<ImageCtxT> *threads, InstanceWatcher<ImageCtxT> *instance_watcher,
-    RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
+    journal::CacheManagerHandler *cache_manager_handler, RadosRef local,
+    const std::string &local_mirror_uuid, int64_t local_pool_id,
     const std::string &global_image_id) {
-    return new ImageReplayer(threads, instance_watcher, local,
-                             local_mirror_uuid, local_pool_id, global_image_id);
+    return new ImageReplayer(threads, instance_watcher, cache_manager_handler,
+                             local, local_mirror_uuid, local_pool_id,
+                             global_image_id);
   }
   void destroy() {
     delete this;
@@ -74,6 +78,7 @@ public:
 
   ImageReplayer(Threads<ImageCtxT> *threads,
                 InstanceWatcher<ImageCtxT> *instance_watcher,
+                journal::CacheManagerHandler *cache_manager_handler,
                 RadosRef local, const std::string &local_mirror_uuid,
                 int64_t local_pool_id, const std::string &global_image_id);
   virtual ~ImageReplayer();
@@ -267,6 +272,7 @@ private:
 
   Threads<ImageCtxT> *m_threads;
   InstanceWatcher<ImageCtxT> *m_instance_watcher;
+  journal::CacheManagerHandler *m_cache_manager_handler;
 
   Peers m_peers;
   RemoteImage m_remote_image;
index 6d6b03ea8dbc34f8e703083d90072dd561246f3a..ba40f8b51fcde0980d0713a3026cdfb3e018b131 100644 (file)
@@ -34,11 +34,11 @@ using librbd::util::create_context_callback;
 template <typename I>
 InstanceReplayer<I>::InstanceReplayer(
     Threads<I> *threads, ServiceDaemon<I>* service_daemon,
-    RadosRef local_rados, const std::string &local_mirror_uuid,
-    int64_t local_pool_id)
+    journal::CacheManagerHandler *cache_manager_handler, RadosRef local_rados,
+    const std::string &local_mirror_uuid, int64_t local_pool_id)
   : m_threads(threads), m_service_daemon(service_daemon),
-    m_local_rados(local_rados), m_local_mirror_uuid(local_mirror_uuid),
-    m_local_pool_id(local_pool_id),
+    m_cache_manager_handler(cache_manager_handler), m_local_rados(local_rados),
+    m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
     m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
 }
 
@@ -142,7 +142,7 @@ void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
   auto it = m_image_replayers.find(global_image_id);
   if (it == m_image_replayers.end()) {
     auto image_replayer = ImageReplayer<I>::create(
-        m_threads, instance_watcher, m_local_rados,
+        m_threads, instance_watcher, m_cache_manager_handler, m_local_rados,
         m_local_mirror_uuid, m_local_pool_id, global_image_id);
 
     dout(10) << global_image_id << ": creating replayer " << image_replayer
index 0321d78c5afb34faa872a8487569835446ddeee7..80d682c7b768a55d9e641b26309513d0613fba98 100644 (file)
@@ -12,6 +12,8 @@
 #include "common/Mutex.h"
 #include "tools/rbd_mirror/Types.h"
 
+namespace journal { struct CacheManagerHandler; }
+
 namespace librbd { class ImageCtx; }
 
 namespace rbd {
@@ -27,18 +29,20 @@ class InstanceReplayer {
 public:
   static InstanceReplayer* create(
       Threads<ImageCtxT> *threads,
-      ServiceDaemon<ImageCtxT>* service_daemon,
+      ServiceDaemon<ImageCtxT> *service_daemon,
+      journal::CacheManagerHandler *cache_manager_handler,
       RadosRef local_rados, const std::string &local_mirror_uuid,
       int64_t local_pool_id) {
-    return new InstanceReplayer(threads, service_daemon, local_rados,
-                                local_mirror_uuid, local_pool_id);
+    return new InstanceReplayer(threads, service_daemon, cache_manager_handler,
+                                local_rados, local_mirror_uuid, local_pool_id);
   }
   void destroy() {
     delete this;
   }
 
   InstanceReplayer(Threads<ImageCtxT> *threads,
-                   ServiceDaemon<ImageCtxT>* service_daemon,
+                   ServiceDaemon<ImageCtxT> *service_daemon,
+                   journal::CacheManagerHandler *cache_manager_handler,
                   RadosRef local_rados, const std::string &local_mirror_uuid,
                   int64_t local_pool_id);
   ~InstanceReplayer();
@@ -82,7 +86,8 @@ private:
    */
 
   Threads<ImageCtxT> *m_threads;
-  ServiceDaemon<ImageCtxT>* m_service_daemon;
+  ServiceDaemon<ImageCtxT> *m_service_daemon;
+  journal::CacheManagerHandler *m_cache_manager_handler;
   RadosRef m_local_rados;
   std::string m_local_mirror_uuid;
   int64_t m_local_pool_id;
index 144386cb47a7be706cc6aad2bb60bd7812efaef3..9fc6d0b3a321d0aa68fb1e48bd17f0e1d879f0c3 100644 (file)
@@ -4,19 +4,19 @@
 #include <boost/range/adaptor/map.hpp>
 
 #include "common/Formatter.h"
+#include "common/PriorityCache.h"
 #include "common/admin_socket.h"
 #include "common/debug.h"
 #include "common/errno.h"
+#include "journal/Types.h"
 #include "librbd/ImageCtx.h"
+#include "perfglue/heap_profiler.h"
 #include "Mirror.h"
 #include "ServiceDaemon.h"
 #include "Threads.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
-#undef dout_prefix
-#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
-                           << __func__ << ": "
 
 using std::list;
 using std::map;
@@ -118,8 +118,151 @@ private:
   Mirror *mirror;
 };
 
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \
+                           << m_name << " " << __func__ << ": "
+
+struct PriCache : public PriorityCache::PriCache {
+  std::string m_name;
+  int64_t m_base_cache_max_size;
+  int64_t m_extra_cache_max_size;
+
+  PriorityCache::Priority m_base_cache_pri = PriorityCache::Priority::PRI10;
+  PriorityCache::Priority m_extra_cache_pri = PriorityCache::Priority::PRI10;
+  int64_t m_base_cache_bytes = 0;
+  int64_t m_extra_cache_bytes = 0;
+  int64_t m_committed_bytes = 0;
+  double m_cache_ratio = 0;
+
+  PriCache(const std::string &name, uint64_t min_size, uint64_t max_size)
+    : m_name(name), m_base_cache_max_size(min_size),
+      m_extra_cache_max_size(max_size - min_size) {
+    ceph_assert(max_size >= min_size);
+  }
+
+  void prioritize() {
+    if (m_base_cache_pri == PriorityCache::Priority::PRI0) {
+      return;
+    }
+    auto pri = static_cast<uint8_t>(m_base_cache_pri);
+    m_base_cache_pri = static_cast<PriorityCache::Priority>(--pri);
+
+    dout(30) << m_base_cache_pri << dendl;
+  }
+
+  int64_t request_cache_bytes(PriorityCache::Priority pri,
+                              uint64_t total_cache) const override {
+    int64_t cache_bytes = 0;
+
+    if (pri == m_base_cache_pri) {
+      cache_bytes += m_base_cache_max_size;
+    }
+    if (pri == m_extra_cache_pri) {
+      cache_bytes += m_extra_cache_max_size;
+    }
+
+    dout(30) << cache_bytes << dendl;
+
+    return cache_bytes;
+  }
+
+  int64_t get_cache_bytes(PriorityCache::Priority pri) const override {
+    int64_t cache_bytes = 0;
+
+    if (pri == m_base_cache_pri) {
+      cache_bytes += m_base_cache_bytes;
+    }
+    if (pri == m_extra_cache_pri) {
+      cache_bytes += m_extra_cache_bytes;
+    }
+
+    dout(30) << "pri=" << pri << " " << cache_bytes << dendl;
+
+    return cache_bytes;
+  }
+
+  int64_t get_cache_bytes() const override {
+    auto cache_bytes = m_base_cache_bytes + m_extra_cache_bytes;
+
+    dout(30) << m_base_cache_bytes << "+" << m_extra_cache_bytes << "="
+             << cache_bytes << dendl;
+
+    return cache_bytes;
+  }
+
+  void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
+    ceph_assert(bytes >= 0);
+    ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri ||
+                bytes == 0);
+
+    dout(30) << "pri=" << pri << " " << bytes << dendl;
+
+    if (pri == m_base_cache_pri) {
+      m_base_cache_bytes = std::min(m_base_cache_max_size, bytes);
+      bytes -= std::min(m_base_cache_bytes, bytes);
+    }
+
+    if (pri == m_extra_cache_pri) {
+      m_extra_cache_bytes = bytes;
+    }
+  }
+
+  void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
+    ceph_assert(bytes >= 0);
+    ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri);
+
+    dout(30) << "pri=" << pri << " " << bytes << dendl;
+
+    if (pri == m_base_cache_pri) {
+      ceph_assert(m_base_cache_bytes <= m_base_cache_max_size);
+
+      auto chunk = std::min(m_base_cache_max_size - m_base_cache_bytes, bytes);
+      m_base_cache_bytes += chunk;
+      bytes -= chunk;
+    }
+
+    if (pri == m_extra_cache_pri) {
+      m_extra_cache_bytes += bytes;
+    }
+  }
+
+  int64_t commit_cache_size(uint64_t total_cache) override {
+    m_committed_bytes = p2roundup<int64_t>(get_cache_bytes(), 4096);
+
+    dout(30) << m_committed_bytes << dendl;
+
+    return m_committed_bytes;
+  }
+
+  int64_t get_committed_size() const override {
+    dout(30) << m_committed_bytes << dendl;
+
+    return m_committed_bytes;
+  }
+
+  double get_cache_ratio() const override {
+    dout(30) << m_cache_ratio << dendl;
+
+    return m_cache_ratio;
+  }
+
+  void set_cache_ratio(double ratio) override {
+    dout(30) << m_cache_ratio << dendl;
+
+    m_cache_ratio = ratio;
+  }
+
+  std::string get_cache_name() const override {
+    return m_name;
+  }
+};
+
 } // anonymous namespace
 
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
+                           << __func__ << ": "
+
 class MirrorAdminSocketHook : public AdminSocketHook {
 public:
   MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
@@ -197,11 +340,152 @@ private:
   Commands commands;
 };
 
+class CacheManagerHandler : public journal::CacheManagerHandler {
+public:
+  CacheManagerHandler(CephContext *cct)
+    : m_cct(cct), m_lock("rbd::mirror::CacheManagerHandler") {
+
+    if (!m_cct->_conf.get_val<bool>("rbd_mirror_memory_autotune")) {
+      return;
+    }
+
+    uint64_t base = m_cct->_conf.get_val<Option::size_t>(
+        "rbd_mirror_memory_base");
+    double fragmentation = m_cct->_conf.get_val<double>(
+        "rbd_mirror_memory_expected_fragmentation");
+    uint64_t target = m_cct->_conf.get_val<Option::size_t>(
+        "rbd_mirror_memory_target");
+    uint64_t min = m_cct->_conf.get_val<Option::size_t>(
+        "rbd_mirror_memory_cache_min");
+    uint64_t max = min;
+
+    // When setting the maximum amount of memory to use for cache, first
+    // assume some base amount of memory for the daemon and then fudge in
+    // some overhead for fragmentation that scales with cache usage.
+    uint64_t ltarget = (1.0 - fragmentation) * target;
+    if (ltarget > base + min) {
+      max = ltarget - base;
+    }
+
+    m_next_balance = ceph_clock_now();
+    m_next_resize = ceph_clock_now();
+
+    m_cache_manager = std::make_unique<PriorityCache::Manager>(
+      m_cct, min, max, target, false);
+  }
+
+  ~CacheManagerHandler() {
+    Mutex::Locker locker(m_lock);
+
+    ceph_assert(m_caches.empty());
+  }
+
+  void register_cache(const std::string &cache_name,
+                      uint64_t min_size, uint64_t max_size,
+                      journal::CacheRebalanceHandler* handler) override {
+    if (!m_cache_manager) {
+      handler->handle_cache_rebalanced(max_size);
+      return;
+    }
+
+    dout(20) << cache_name << " min_size=" << min_size << " max_size="
+             << max_size << " handler=" << handler << dendl;
+
+    Mutex::Locker locker(m_lock);
+
+    auto p = m_caches.insert(
+        {cache_name, {cache_name, min_size, max_size, handler}});
+    ceph_assert(p.second == true);
+
+    m_cache_manager->insert(cache_name, p.first->second.pri_cache, false);
+    m_next_balance = ceph_clock_now();
+  }
+
+  void unregister_cache(const std::string &cache_name) override {
+    if (!m_cache_manager) {
+      return;
+    }
+
+    dout(20) << cache_name << dendl;
+
+    Mutex::Locker locker(m_lock);
+
+    auto it = m_caches.find(cache_name);
+    ceph_assert(it != m_caches.end());
+
+    m_cache_manager->erase(cache_name);
+    m_caches.erase(it);
+    m_next_balance = ceph_clock_now();
+  }
+
+  void run_cache_manager() {
+    if (!m_cache_manager) {
+      return;
+    }
+
+    Mutex::Locker locker(m_lock);
+
+    // Before we trim, check and see if it's time to rebalance/resize.
+    auto autotune_interval = m_cct->_conf.get_val<double>(
+        "rbd_mirror_memory_cache_autotune_interval");
+    auto resize_interval = m_cct->_conf.get_val<double>(
+        "rbd_mirror_memory_cache_resize_interval");
+
+    utime_t now = ceph_clock_now();
+
+    if (autotune_interval > 0 && m_next_balance <= now) {
+      dout(20) << "balance" << dendl;
+      m_cache_manager->balance();
+
+      for (auto &it : m_caches) {
+        auto pri_cache = static_cast<PriCache *>(it.second.pri_cache.get());
+        auto new_cache_bytes = pri_cache->get_cache_bytes();
+        it.second.handler->handle_cache_rebalanced(new_cache_bytes);
+        pri_cache->prioritize();
+      }
+
+      m_next_balance = ceph_clock_now();
+      m_next_balance += autotune_interval;
+    }
+
+    if (resize_interval > 0 && m_next_resize < now) {
+      if (ceph_using_tcmalloc()) {
+        dout(20) << "tune memory" << dendl;
+        m_cache_manager->tune_memory();
+      }
+
+      m_next_resize = ceph_clock_now();
+      m_next_resize += resize_interval;
+    }
+  }
+
+private:
+  struct Cache {
+    std::shared_ptr<PriorityCache::PriCache> pri_cache;
+    journal::CacheRebalanceHandler *handler;
+
+    Cache(const std::string name, uint64_t min_size, uint64_t max_size,
+          journal::CacheRebalanceHandler *handler)
+      : pri_cache(new PriCache(name, min_size, max_size)), handler(handler) {
+    }
+  };
+
+  CephContext *m_cct;
+
+  mutable Mutex m_lock;
+  std::unique_ptr<PriorityCache::Manager> m_cache_manager;
+  std::map<std::string, Cache> m_caches;
+
+  utime_t m_next_balance;
+  utime_t m_next_resize;
+};
+
 Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
   m_cct(cct),
   m_args(args),
   m_lock("rbd::mirror::Mirror"),
   m_local(new librados::Rados()),
+  m_cache_manager_handler(new CacheManagerHandler(cct)),
   m_asok_hook(new MirrorAdminSocketHook(cct, this))
 {
   m_threads =
@@ -252,15 +536,26 @@ int Mirror::init()
 void Mirror::run()
 {
   dout(20) << "enter" << dendl;
+
+  utime_t next_refresh_pools = ceph_clock_now();
+
   while (!m_stopping) {
-    m_local_cluster_watcher->refresh_pools();
+    utime_t now = ceph_clock_now();
+    bool refresh_pools = next_refresh_pools <= now;
+    if (refresh_pools) {
+      m_local_cluster_watcher->refresh_pools();
+      next_refresh_pools = ceph_clock_now();
+      next_refresh_pools += m_cct->_conf.get_val<uint64_t>(
+          "rbd_mirror_pool_replayers_refresh_interval");
+    }
     Mutex::Locker l(m_lock);
     if (!m_manual_stop) {
-      update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
+      if (refresh_pools) {
+        update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
+      }
+      m_cache_manager_handler->run_cache_manager();
     }
-    m_cond.WaitInterval(
-      m_lock,
-      utime_t(m_cct->_conf.get_val<uint64_t>("rbd_mirror_pool_replayers_refresh_interval"), 0));
+    m_cond.WaitInterval(m_lock, {1, 0});
   }
 
   // stop all pool replayers in parallel
@@ -411,8 +706,10 @@ void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
         }
       } else {
         dout(20) << "starting pool replayer for " << peer << dendl;
-        unique_ptr<PoolReplayer<>> pool_replayer(new PoolReplayer<>(
-         m_threads, m_service_daemon.get(), kv.first, peer, m_args));
+        unique_ptr<PoolReplayer<>> pool_replayer(
+            new PoolReplayer<>(m_threads, m_service_daemon.get(),
+                               m_cache_manager_handler.get(), kv.first, peer,
+                               m_args));
 
         // TODO: make async
         pool_replayer->init();
index 153c0bc52a337f4313b79183dea393a7cfcb894b..12076be2c77c1597c8a00ce7b44a5e916a71c909 100644 (file)
@@ -7,6 +7,7 @@
 #include "common/ceph_context.h"
 #include "common/Mutex.h"
 #include "include/rados/librados.hpp"
+#include "include/utime.h"
 #include "ClusterWatcher.h"
 #include "PoolReplayer.h"
 #include "tools/rbd_mirror/Types.h"
@@ -16,6 +17,8 @@
 #include <memory>
 #include <atomic>
 
+namespace journal { class CacheManagerHandler; }
+
 namespace librbd { struct ImageCtx; }
 
 namespace rbd {
@@ -23,6 +26,7 @@ namespace mirror {
 
 template <typename> struct ServiceDaemon;
 template <typename> struct Threads;
+class CacheManagerHandler;
 class MirrorAdminSocketHook;
 
 /**
@@ -55,6 +59,9 @@ private:
 
   void update_pool_replayers(const PoolPeers &pool_peers);
 
+  void create_cache_manager();
+  void run_cache_manager(utime_t *next_run_interval);
+
   CephContext *m_cct;
   std::vector<const char*> m_args;
   Threads<librbd::ImageCtx> *m_threads = nullptr;
@@ -65,6 +72,7 @@ private:
 
   // monitor local cluster for config changes in peers
   std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
+  std::unique_ptr<CacheManagerHandler> m_cache_manager_handler;
   std::map<PoolPeer, std::unique_ptr<PoolReplayer<>>> m_pool_replayers;
   std::atomic<bool> m_stopping = { false };
   bool m_manual_stop = false;
index 370d81f447065601c13d8af7cc6d787c00bcacc9..7f9f224a025af81d5ee57fd69f585621c950571f 100644 (file)
@@ -227,12 +227,13 @@ private:
 } // anonymous namespace
 
 template <typename I>
-PoolReplayer<I>::PoolReplayer(Threads<I> *threads,
-                              ServiceDaemon<I>* service_daemon,
-                             int64_t local_pool_id, const PeerSpec &peer,
-                             const std::vector<const char*> &args) :
+PoolReplayer<I>::PoolReplayer(
+    Threads<I> *threads, ServiceDaemon<I> *service_daemon,
+    journal::CacheManagerHandler *cache_manager_handler, int64_t local_pool_id,
+    const PeerSpec &peer, const std::vector<const char*> &args) :
   m_threads(threads),
   m_service_daemon(service_daemon),
+  m_cache_manager_handler(cache_manager_handler),
   m_local_pool_id(local_pool_id),
   m_peer(peer),
   m_args(args),
@@ -336,8 +337,8 @@ void PoolReplayer<I>::init()
   dout(10) << "connected to " << m_peer << dendl;
 
   m_instance_replayer.reset(InstanceReplayer<I>::create(
-    m_threads, m_service_daemon, m_local_rados, local_mirror_uuid,
-    m_local_pool_id));
+    m_threads, m_service_daemon, m_cache_manager_handler, m_local_rados,
+    local_mirror_uuid, m_local_pool_id));
   m_instance_replayer->init();
   m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
 
index ef6d8276452831ebde86d36f97f52923e6dae753..204d88781b37445fb9c57863aceb01e58cf1e7a9 100644 (file)
@@ -29,6 +29,8 @@
 
 class AdminSocketHook;
 
+namespace journal { struct CacheManagerHandler; }
+
 namespace librbd { class ImageCtx; }
 
 namespace rbd {
@@ -47,7 +49,8 @@ template <typename ImageCtxT = librbd::ImageCtx>
 class PoolReplayer {
 public:
   PoolReplayer(Threads<ImageCtxT> *threads,
-               ServiceDaemon<ImageCtxT>* service_daemon,
+               ServiceDaemon<ImageCtxT> *service_daemon,
+               journal::CacheManagerHandler *cache_manager_handler,
               int64_t local_pool_id, const PeerSpec &peer,
               const std::vector<const char*> &args);
   ~PoolReplayer();
@@ -211,7 +214,8 @@ private:
   void handle_instances_removed(const InstanceIds &instance_ids);
 
   Threads<ImageCtxT> *m_threads;
-  ServiceDaemon<ImageCtxT>* m_service_daemon;
+  ServiceDaemon<ImageCtxT> *m_service_daemon;
+  journal::CacheManagerHandler *m_cache_manager_handler;
   int64_t m_local_pool_id = -1;
   PeerSpec m_peer;
   std::vector<const char*> m_args;
index 331c5f9e1fbb532aa6cd39a0deff73e1f45bd4a0..e89ab6dfaf3ffd6a60ca2f21956aac7eaf46657e 100644 (file)
@@ -109,7 +109,8 @@ void PrepareRemoteImageRequest<I>::get_client() {
   *m_remote_journaler = new Journaler(m_threads->work_queue, m_threads->timer,
                                       &m_threads->timer_lock, m_remote_io_ctx,
                                       *m_remote_image_id, m_local_mirror_uuid,
-                                      m_journal_settings, nullptr);
+                                      m_journal_settings,
+                                      m_cache_manager_handler);
 
   Context *ctx = create_async_context_callback(
     m_threads->work_queue, create_context_callback<
index 100a066bcad1a7b7621b94924f1bd426703f2ebc..e6bd76cbfb9dfd67afe2cdc65db51d9c0d9eb3e1 100644 (file)
@@ -13,6 +13,7 @@
 
 namespace journal { class Journaler; }
 namespace journal { class Settings; }
+namespace journal { struct CacheManagerHandler; }
 namespace librbd { struct ImageCtx; }
 namespace librbd { namespace journal { struct MirrorPeerClientMeta; } }
 
@@ -39,6 +40,7 @@ public:
                                            const std::string &local_mirror_uuid,
                                            const std::string &local_image_id,
                                            const journal::Settings &settings,
+                                           journal::CacheManagerHandler *cache_manager_handler,
                                            std::string *remote_mirror_uuid,
                                            std::string *remote_image_id,
                                            Journaler **remote_journaler,
@@ -48,27 +50,30 @@ public:
     return new PrepareRemoteImageRequest(threads, remote_io_ctx,
                                          global_image_id, local_mirror_uuid,
                                          local_image_id, settings,
-                                         remote_mirror_uuid, remote_image_id,
-                                         remote_journaler, client_state,
-                                         client_meta, on_finish);
+                                         cache_manager_handler,
+                                         remote_mirror_uuid,
+                                         remote_image_id, remote_journaler,
+                                         client_state, client_meta, on_finish);
   }
 
   PrepareRemoteImageRequest(Threads<ImageCtxT> *threads,
-                           librados::IoCtx &remote_io_ctx,
-                           const std::string &global_image_id,
-                           const std::string &local_mirror_uuid,
-                           const std::string &local_image_id,
-                           const journal::Settings &journal_settings,
-                           std::string *remote_mirror_uuid,
-                           std::string *remote_image_id,
-                           Journaler **remote_journaler,
-                           cls::journal::ClientState *client_state,
-                           MirrorPeerClientMeta *client_meta,
-                           Context *on_finish)
+                            librados::IoCtx &remote_io_ctx,
+                            const std::string &global_image_id,
+                            const std::string &local_mirror_uuid,
+                            const std::string &local_image_id,
+                            const journal::Settings &journal_settings,
+                            journal::CacheManagerHandler *cache_manager_handler,
+                            std::string *remote_mirror_uuid,
+                            std::string *remote_image_id,
+                            Journaler **remote_journaler,
+                            cls::journal::ClientState *client_state,
+                            MirrorPeerClientMeta *client_meta,
+                            Context *on_finish)
     : m_threads(threads), m_remote_io_ctx(remote_io_ctx),
       m_global_image_id(global_image_id),
       m_local_mirror_uuid(local_mirror_uuid), m_local_image_id(local_image_id),
       m_journal_settings(journal_settings),
+      m_cache_manager_handler(cache_manager_handler),
       m_remote_mirror_uuid(remote_mirror_uuid),
       m_remote_image_id(remote_image_id),
       m_remote_journaler(remote_journaler), m_client_state(client_state),
@@ -107,6 +112,7 @@ private:
   std::string m_local_mirror_uuid;
   std::string m_local_image_id;
   journal::Settings m_journal_settings;
+  journal::CacheManagerHandler *m_cache_manager_handler;
   std::string *m_remote_mirror_uuid;
   std::string *m_remote_image_id;
   Journaler **m_remote_journaler;