]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
tools: add throttle mechanism to immutable object cache 36551/head
authorYin Congmin <congmin.yin@intel.com>
Fri, 31 Jul 2020 06:15:24 +0000 (02:15 -0400)
committerYin Congmin <congmin.yin@intel.com>
Thu, 3 Dec 2020 11:14:44 +0000 (19:14 +0800)
Signed-off-by: Yin Congmin <congmin.yin@intel.com>
18 files changed:
qa/tasks/mgr/dashboard/test_cluster_configuration.py
src/common/options.cc
src/common/options.h
src/librbd/cache/ParentCacheObjectDispatch.cc
src/mgr/PyUtil.cc
src/test/immutable_object_cache/MockCacheDaemon.h
src/test/immutable_object_cache/test_DomainSocket.cc
src/test/immutable_object_cache/test_message.cc
src/test/immutable_object_cache/test_multi_session.cc
src/test/immutable_object_cache/test_object_store.cc
src/test/librbd/cache/test_mock_ParentCacheObjectDispatch.cc
src/tools/immutable_object_cache/CacheClient.cc
src/tools/immutable_object_cache/CacheClient.h
src/tools/immutable_object_cache/CacheController.cc
src/tools/immutable_object_cache/ObjectCacheStore.cc
src/tools/immutable_object_cache/ObjectCacheStore.h
src/tools/immutable_object_cache/Types.cc
src/tools/immutable_object_cache/Types.h

index dc96bced02aeaaed6463ccfe3a951d26385fc80e..9c8245d238d1f26a2f142ba9f09d1c4fcbfe79d3 100644 (file)
@@ -369,7 +369,7 @@ class ClusterConfigurationTest(DashboardTestCase):
         self.assertIn('type', data)
         self.assertIn('desc', data)
         self.assertIn(data['type'], ['str', 'bool', 'float', 'int', 'size', 'uint', 'addr',
-                                     'addrvec', 'uuid', 'secs'])
+                                     'addrvec', 'uuid', 'secs', 'millisecs'])
 
         if 'value' in data:
             self.assertIn('source', data)
index 806da4e1215a5f3749a45c77132caef494a18849..9127a061958fbc940480cd26eedc9f030ec462e1 100644 (file)
@@ -49,6 +49,9 @@ public:
   void operator()(const std::chrono::seconds v) const {
     out << v.count();
   }
+  void operator()(const std::chrono::milliseconds v) const {
+    out << v.count();
+  }
 };
 }
 
@@ -203,6 +206,13 @@ int Option::parse_value(
       *error_message = e.what();
       return -EINVAL;
     }
+  } else if (type == Option::TYPE_MILLISECS) {
+    try {
+      *out = boost::lexical_cast<uint64_t>(val);
+    } catch (const boost::bad_lexical_cast& e) {
+      *error_message = e.what();
+      return -EINVAL;
+    }
   } else {
     ceph_abort();
   }
@@ -7878,6 +7888,37 @@ static std::vector<Option> get_immutable_object_cache_options() {
     Option("immutable_object_cache_watermark", Option::TYPE_FLOAT, Option::LEVEL_ADVANCED)
     .set_default(0.1)
     .set_description("immutable object cache water mark"),
+
+    Option("immutable_object_cache_qos_schedule_tick_min", Option::TYPE_MILLISECS, Option::LEVEL_ADVANCED)
+    .set_default(50)
+    .set_min(1)
+    .set_description("minimum schedule tick for immutable object cache"),
+
+    Option("immutable_object_cache_qos_iops_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(0)
+    .set_description("the desired immutable object cache IO operations limit per second"),
+
+    Option("immutable_object_cache_qos_iops_burst", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(0)
+    .set_description("the desired burst limit of immutable object cache IO operations"),
+
+    Option("immutable_object_cache_qos_iops_burst_seconds", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
+    .set_default(1)
+    .set_min(1)
+    .set_description("the desired burst duration in seconds of immutable object cache IO operations"),
+
+    Option("immutable_object_cache_qos_bps_limit", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(0)
+    .set_description("the desired immutable object cache IO bytes limit per second"),
+
+    Option("immutable_object_cache_qos_bps_burst", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(0)
+    .set_description("the desired burst limit of immutable object cache IO bytes"),
+
+    Option("immutable_object_cache_qos_bps_burst_seconds", Option::TYPE_SECS, Option::LEVEL_ADVANCED)
+    .set_default(1)
+    .set_min(1)
+    .set_description("the desired burst duration in seconds of immutable object cache IO bytes"),
   });
 }
 
index 75812c234503c9d5aa75aef1adea6803bab397ea..15a0bf504658c88ec01c3c398ce64d4673229d2c 100644 (file)
@@ -23,6 +23,7 @@ struct Option {
     TYPE_UUID = 7,
     TYPE_SIZE = 8,
     TYPE_SECS = 9,
+    TYPE_MILLISECS = 10,
   };
 
   static const char *type_to_c_type_str(type_t t) {
@@ -37,6 +38,7 @@ struct Option {
     case TYPE_UUID: return "uuid_d";
     case TYPE_SIZE: return "size_t";
     case TYPE_SECS: return "secs";
+    case TYPE_MILLISECS: return "millisecs";
     default: return "unknown";
     }
   }
@@ -52,6 +54,7 @@ struct Option {
     case TYPE_UUID: return "uuid";
     case TYPE_SIZE: return "size";
     case TYPE_SECS: return "secs";
+    case TYPE_MILLISECS: return "millisecs";
     default: return "unknown";
     }
   }
@@ -86,6 +89,9 @@ struct Option {
     if (s == "secs") {
       return TYPE_SECS;
     }
+    if (s == "millisecs") {
+      return TYPE_MILLISECS;
+    }
     return -1;
   }
 
@@ -140,6 +146,7 @@ struct Option {
     entity_addr_t,
     entity_addrvec_t,
     std::chrono::seconds,
+    std::chrono::milliseconds,
     size_t,
     uuid_d>;
   const std::string name;
@@ -215,6 +222,8 @@ struct Option {
       value = size_t{0}; break;
     case TYPE_SECS:
       value = std::chrono::seconds{0}; break;
+    case TYPE_MILLISECS:
+      value = std::chrono::milliseconds{0}; break;
     default:
       ceph_abort();
     }
@@ -265,6 +274,8 @@ struct Option {
       v = size_t{static_cast<std::size_t>(new_value)}; break;
     case TYPE_SECS:
       v = std::chrono::seconds{new_value}; break;
+    case TYPE_MILLISECS:
+      v = std::chrono::milliseconds{new_value}; break;
     default:
       std::cerr << "Bad type in set_value: " << name << ": "
                 << typeid(T).name() << std::endl;
@@ -377,10 +388,11 @@ struct Option {
   {
     return
       (has_flag(FLAG_RUNTIME)
-       || (!has_flag(FLAG_MGR)
-          && (type == TYPE_BOOL || type == TYPE_INT
-              || type == TYPE_UINT || type == TYPE_FLOAT
-              || type == TYPE_SIZE || type == TYPE_SECS)))
+        || (!has_flag(FLAG_MGR)
+          && (type == TYPE_BOOL || type == TYPE_INT
+            || type == TYPE_UINT || type == TYPE_FLOAT
+            || type == TYPE_SIZE || type == TYPE_SECS
+            || type == TYPE_MILLISECS)))
       && !has_flag(FLAG_STARTUP)
       && !has_flag(FLAG_CLUSTER_CREATE)
       && !has_flag(FLAG_CREATE);
index 843bc845fcd58e26624d7b470dad39ecc03c65a7..762b18101f6974e1ea47db9a792c0331cb6c0016 100644 (file)
@@ -102,6 +102,7 @@ bool ParentCacheObjectDispatch<I>::read(
   m_cache_client->lookup_object(m_image_ctx->data_ctx.get_namespace(),
                                 m_image_ctx->data_ctx.get_id(),
                                 io_context->read_snap().value_or(CEPH_NOSNAP),
+                                m_image_ctx->layout.object_size,
                                 oid, std::move(ctx));
   return true;
 }
index bfecdb35423091577c060263fa6a94ad1e79b5e9..a8efc2f28005b80596ce830212c82f2f8e7c271d 100644 (file)
@@ -15,6 +15,7 @@ PyObject *get_python_typed_option_value(
   case Option::TYPE_SIZE:
     return PyLong_FromString((char *)value.c_str(), nullptr, 0);
   case Option::TYPE_SECS:
+  case Option::TYPE_MILLISECS:
   case Option::TYPE_FLOAT:
     {
       PyObject *s = PyUnicode_FromString(value.c_str());
index 3773e87ea9a2fcc10f4f7d86eeffc8c02a070f02..02e86acb2a709c833543f3622d1ac90e36d9a376 100644 (file)
@@ -24,8 +24,8 @@ class MockCacheClient {
   MOCK_METHOD0(stop, void());
   MOCK_METHOD0(connect, int());
   MOCK_METHOD1(connect, void(Context*));
-  MOCK_METHOD5(lookup_object, void(std::string, uint64_t, uint64_t, std::string,
-                                   CacheGenContextURef));
+  MOCK_METHOD6(lookup_object, void(std::string, uint64_t, uint64_t, uint64_t,
+                                  std::string, CacheGenContextURef));
   MOCK_METHOD1(register_client, int(Context*));
 };
 
index 3a538a3191cb83f51f190b2d35146b90e0472405..31d1b9adc2014419ce216344232ed68b8f9699a9 100644 (file)
@@ -122,7 +122,7 @@ public:
         usleep(1);
       }
 
-      m_cache_client->lookup_object("pool_nspace", 1, 2, "object_name", std::move(ctx));
+      m_cache_client->lookup_object("pool_nspace", 1, 2, 3, "object_name", std::move(ctx));
       m_send_request_index++;
     }
     m_wait_event.wait();
@@ -135,7 +135,7 @@ public:
        hit = ack->type == RBDSC_READ_REPLY;
        m_wait_event.signal();
     });
-    m_cache_client->lookup_object(pool_nspace, 1, 2, object_id, std::move(ctx));
+    m_cache_client->lookup_object(pool_nspace, 1, 2, 3, object_id, std::move(ctx));
     m_wait_event.wait();
     return hit;
   }
index b03fa35313d28f0ed58176dc45c04065e9ccccd8..bbd6ad165b91ad85cb4c09176b928a343f1bced2 100644 (file)
@@ -16,10 +16,11 @@ TEST(test_for_message, test_1)
   uint64_t read_len = 333333UL;
   uint64_t pool_id = 444444UL;
   uint64_t snap_id = 555555UL;
+  uint64_t object_size = 666666UL;
 
   // ObjectRequest --> bufferlist
   ObjectCacheRequest* req = new ObjectCacheReadData(type, seq, read_offset, read_len,
-                                    pool_id, snap_id, oid_name, pool_nspace);
+                                    pool_id, snap_id, object_size, oid_name, pool_nspace);
   req->encode();
   auto payload_bl = req->get_payload_bufferlist();
 
@@ -40,8 +41,9 @@ TEST(test_for_message, test_1)
   ASSERT_EQ(((ObjectCacheReadData*)req_decode)->read_len, 333333UL);
   ASSERT_EQ(((ObjectCacheReadData*)req_decode)->pool_id, 444444UL);
   ASSERT_EQ(((ObjectCacheReadData*)req_decode)->snap_id, 555555UL);
-  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->pool_namespace, pool_nspace);
   ASSERT_EQ(((ObjectCacheReadData*)req_decode)->oid, oid_name);
+  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->pool_namespace, pool_nspace);
+  ASSERT_EQ(((ObjectCacheReadData*)req_decode)->object_size, 666666UL);
 
   delete req;
   delete req_decode;
index e3a73bc373ca15e04fece6aecb49f5c5049fbd6b..c0c629ab036435bf1f7645427e4ca2ce07248069 100644 (file)
@@ -126,7 +126,7 @@ public:
       });
       m_send_request_index++;
       // here just for concurrently testing register + lookup, so fix object id.
-      m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, "1234", std::move(ctx));
+      m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, 3, "1234", std::move(ctx));
     }
 
     if (is_last) {
index 736928fe0e7dca794df70a4de7425564d0e15202..12e6e5aaa4a6d35ad37f56cfa8615d54253aca65 100644 (file)
@@ -57,7 +57,8 @@ public:
     m_object_cache_store = new ObjectCacheStore(m_ceph_context);
   }
 
-  void init_object_cache_store(std::string pool_name, std::string vol_name, uint64_t vol_size, bool reset) {
+  void init_object_cache_store(std::string pool_name, std::string vol_name,
+                              uint64_t vol_size, bool reset) {
     ASSERT_EQ(0, m_object_cache_store->init(reset));
     ASSERT_EQ(0, m_object_cache_store->init_cache());
   }
@@ -66,10 +67,11 @@ public:
     ASSERT_EQ(0, m_object_cache_store->shutdown());
   }
 
-  void lookup_object_cache_store(std::string pool_name, std::string vol_name, std::string obj_name, int& ret) {
+  void lookup_object_cache_store(std::string pool_name, std::string vol_name,
+                                std::string obj_name, int& ret) {
     std::string cache_path;
-    ret = m_object_cache_store->lookup_object(pool_name, 1, 2, obj_name, true,
-                                              cache_path);
+    ret = m_object_cache_store->lookup_object(pool_name, 1, 2, 3,
+                                            obj_name, true, cache_path);
   }
 
   void TearDown() override {
index 6609f1f02e4a3bf5b06767501679b403817b7273..2b26282512728927a2a6476db0b949bbd1f23cb8 100644 (file)
@@ -105,8 +105,8 @@ public :
   void expect_cache_lookup_object(MockParentImageCache& mparent_image_cache,
                                   const std::string &cache_path) {
     EXPECT_CALL(*(mparent_image_cache.get_cache_client()),
-                lookup_object(_, _, _, _, _))
-      .WillOnce(WithArg<4>(Invoke([cache_path](CacheGenContextURef on_finish) {
+                lookup_object(_, _, _, _, _, _))
+      .WillOnce(WithArg<5>(Invoke([cache_path](CacheGenContextURef on_finish) {
         auto ack = new ObjectCacheReadReplyData(RBDSC_READ_REPLY, 0, cache_path);
         on_finish.release()->complete(ack);
       })));
index d378992719c7bd8f8e4eec04c4cad8e1722a787b..e4ed6cb0ed0d481a2dddfcdc8ce1d45ed3c17610 100644 (file)
@@ -113,12 +113,13 @@ namespace immutable_obj_cache {
   }
 
   void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
-                                  uint64_t snap_id, std::string oid,
+                                  uint64_t snap_id, uint64_t object_size,
+                                  std::string oid,
                                   CacheGenContextURef&& on_finish) {
     ldout(m_cct, 20) << dendl;
     ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
-                                    ++m_sequence_id, 0, 0,
-                                    pool_id, snap_id, oid, pool_nspace);
+                                    ++m_sequence_id, 0, 0, pool_id,
+                                    snap_id, object_size, oid, pool_nspace);
     req->process_msg = std::move(on_finish);
     req->encode();
 
index e1e9a65927d438f8bba17067c142f36c23d56652..b2f749631258969f0b9b73c5e106828f0488bfc2 100644 (file)
@@ -32,7 +32,7 @@ class CacheClient {
   int connect();
   void connect(Context* on_finish);
   void lookup_object(std::string pool_nspace, uint64_t pool_id,
-                     uint64_t snap_id, std::string oid,
+                     uint64_t snap_id, uint64_t object_size, std::string oid,
                      CacheGenContextURef&& on_finish);
   int register_client(Context* on_finish);
 
index 1fade8f84b4b718b4fb5023e19e5290cbf33d006..ae16368392e9be0284dacb01a96263489e7f28c6 100644 (file)
@@ -25,7 +25,6 @@ CacheController::~CacheController() {
 
 int CacheController::init() {
   ldout(m_cct, 20) << dendl;
-
   m_object_cache_store = new ObjectCacheStore(m_cct);
   // TODO(dehao): make this configurable
   int r = m_object_cache_store->init(true);
@@ -118,8 +117,8 @@ void CacheController::handle_request(CacheSession* session,
       bool return_dne_path = session->client_version().empty();
       int ret = m_object_cache_store->lookup_object(
         req_read_data->pool_namespace, req_read_data->pool_id,
-        req_read_data->snap_id, req_read_data->oid, return_dne_path,
-        cache_path);
+        req_read_data->snap_id, req_read_data->object_size,
+        req_read_data->oid, return_dne_path, cache_path);
       ObjectCacheRequest* reply = nullptr;
       if (ret != OBJ_CACHE_PROMOTED && ret != OBJ_CACHE_DNE) {
         reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
index a0b2b27ce1629af4f2ce5852a106b8b870a2b4ba..d3faf515bea18f6338e27b175803942254a37b17 100644 (file)
@@ -16,6 +16,30 @@ namespace efs = std::experimental::filesystem;
 namespace ceph {
 namespace immutable_obj_cache {
 
+namespace {
+
+class SafeTimerSingleton : public SafeTimer {
+public:
+  ceph::mutex lock = ceph::make_mutex
+    ("ceph::immutable_object_cache::SafeTimerSingleton::lock");
+
+  explicit SafeTimerSingleton(CephContext *cct)
+      : SafeTimer(cct, lock, true) {
+    init();
+  }
+  ~SafeTimerSingleton() {
+    std::lock_guard locker{lock};
+    shutdown();
+  }
+};
+
+}  // anonymous namespace
+
+enum ThrottleTargetCode {
+  ROC_QOS_IOPS_THROTTLE = 1,
+  ROC_QOS_BPS_THROTTLE = 2
+};
+
 ObjectCacheStore::ObjectCacheStore(CephContext *cct)
       : m_cct(cct), m_rados(new librados::Rados()) {
 
@@ -35,12 +59,44 @@ ObjectCacheStore::ObjectCacheStore(CephContext *cct)
   uint64_t max_inflight_ops =
     m_cct->_conf.get_val<uint64_t>("immutable_object_cache_max_inflight_ops");
 
+  uint64_t limit = 0;
+  if ((limit = m_cct->_conf.get_val<uint64_t>
+                   ("immutable_object_cache_qos_iops_limit")) != 0) {
+    apply_qos_tick_and_limit(ROC_QOS_IOPS_THROTTLE,
+                  m_cct->_conf.get_val<std::chrono::milliseconds>
+                   ("immutable_object_cache_qos_schedule_tick_min"),
+                  limit,
+                  m_cct->_conf.get_val<uint64_t>
+                   ("immutable_object_cache_qos_iops_burst"),
+                  m_cct->_conf.get_val<std::chrono::seconds>
+                   ("immutable_object_cache_qos_iops_burst_seconds"));
+  }
+  if ((limit = m_cct->_conf.get_val<uint64_t>
+                   ("immutable_object_cache_qos_bps_limit")) != 0) {
+    apply_qos_tick_and_limit(ROC_QOS_BPS_THROTTLE,
+                  m_cct->_conf.get_val<std::chrono::milliseconds>
+                   ("immutable_object_cache_qos_schedule_tick_min"),
+                  limit,
+                  m_cct->_conf.get_val<uint64_t>
+                   ("immutable_object_cache_qos_bps_burst"),
+                  m_cct->_conf.get_val<std::chrono::seconds>
+                   ("immutable_object_cache_qos_bps_burst_seconds"));
+  }
+
   m_policy = new SimplePolicy(m_cct, cache_max_size, max_inflight_ops,
                               cache_watermark);
 }
 
 ObjectCacheStore::~ObjectCacheStore() {
   delete m_policy;
+  if (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE) {
+    ceph_assert(m_throttles[ROC_QOS_IOPS_THROTTLE] != nullptr);
+    delete m_throttles[ROC_QOS_IOPS_THROTTLE];
+  }
+  if (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE) {
+    ceph_assert(m_throttles[ROC_QOS_BPS_THROTTLE] != nullptr);
+    delete m_throttles[ROC_QOS_BPS_THROTTLE];
+  }
 }
 
 int ObjectCacheStore::init(bool reset) {
@@ -92,9 +148,8 @@ int ObjectCacheStore::init_cache() {
   return 0;
 }
 
-int ObjectCacheStore::do_promote(std::string pool_nspace,
-                                  uint64_t pool_id, uint64_t snap_id,
-                                  std::string object_name) {
+int ObjectCacheStore::do_promote(std::string pool_nspace, uint64_t pool_id,
+                                 uint64_t snap_id, std::string object_name) {
   ldout(m_cct, 20) << "to promote object: " << object_name
                    << " from pool id: " << pool_id
                    << " namespace: " << pool_nspace
@@ -178,8 +233,8 @@ int ObjectCacheStore::handle_promote_callback(int ret, bufferlist* read_buf,
   return ret;
 }
 
-int ObjectCacheStore::lookup_object(std::string pool_nspace,
-                                    uint64_t pool_id, uint64_t snap_id,
+int ObjectCacheStore::lookup_object(std::string pool_nspace, uint64_t pool_id,
+                                    uint64_t snap_id, uint64_t object_size,
                                     std::string object_name,
                                     bool return_dne_path,
                                     std::string& target_cache_file_path) {
@@ -193,9 +248,13 @@ int ObjectCacheStore::lookup_object(std::string pool_nspace,
 
   switch (ret) {
     case OBJ_CACHE_NONE: {
-      pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
-      if (pret < 0) {
-        lderr(m_cct) << "fail to start promote" << dendl;
+      if (take_token_from_throttle(object_size, 1)) {
+        pret = do_promote(pool_nspace, pool_id, snap_id, object_name);
+        if (pret < 0) {
+          lderr(m_cct) << "fail to start promote" << dendl;
+        }
+      } else {
+        m_policy->update_status(cache_file_name, OBJ_CACHE_NONE);
       }
       return ret;
     }
@@ -302,5 +361,95 @@ std::string ObjectCacheStore::get_cache_file_path(std::string cache_file_name,
   return m_cache_root_dir + cache_file_dir + cache_file_name;
 }
 
+void ObjectCacheStore::handle_throttle_ready(uint64_t tokens, uint64_t type) {
+  m_io_throttled = false;
+  std::lock_guard lock(m_throttle_lock);
+  if (type & ROC_QOS_IOPS_THROTTLE){
+    m_iops_tokens += tokens;
+  } else if (type & ROC_QOS_BPS_THROTTLE){
+    m_bps_tokens += tokens;
+  } else {
+    lderr(m_cct) << "unknow throttle type." << dendl;
+  }
+}
+
+bool ObjectCacheStore::take_token_from_throttle(uint64_t object_size,
+                                                uint64_t object_num) {
+  if (m_io_throttled == true) {
+    return false;
+  }
+
+  int flag = 0;
+  bool wait = false;
+  if (!wait && (m_qos_enabled_flag & ROC_QOS_IOPS_THROTTLE)) {
+    std::lock_guard lock(m_throttle_lock);
+    if (object_num > m_iops_tokens) {
+      wait = m_throttles[ROC_QOS_IOPS_THROTTLE]->get(object_num, this,
+          &ObjectCacheStore::handle_throttle_ready, object_num,
+          ROC_QOS_IOPS_THROTTLE);
+    } else {
+      m_iops_tokens -= object_num;
+      flag = 1;
+    }
+  }
+  if (!wait && (m_qos_enabled_flag & ROC_QOS_BPS_THROTTLE)) {
+    std::lock_guard lock(m_throttle_lock);
+    if (object_size > m_bps_tokens) {
+      wait = m_throttles[ROC_QOS_BPS_THROTTLE]->get(object_size, this,
+          &ObjectCacheStore::handle_throttle_ready, object_size,
+          ROC_QOS_BPS_THROTTLE);
+    } else {
+      m_bps_tokens -= object_size;
+    }
+  }
+
+  if (wait) {
+    m_io_throttled = true;
+    // when passing iops throttle, but limit in bps throttle, recovery
+    if (flag == 1) {
+      std::lock_guard lock(m_throttle_lock);
+      m_iops_tokens += object_num;
+    }
+  }
+
+  return !wait;
+}
+
+static const std::map<uint64_t, std::string> THROTTLE_FLAGS = {
+  { ROC_QOS_IOPS_THROTTLE, "roc_qos_iops_throttle" },
+  { ROC_QOS_BPS_THROTTLE, "roc_qos_bps_throttle" }
+};
+
+void ObjectCacheStore::apply_qos_tick_and_limit(
+    const uint64_t flag,
+    std::chrono::milliseconds min_tick,
+    uint64_t limit,
+    uint64_t burst,
+    std::chrono::seconds burst_seconds) {
+  SafeTimerSingleton* safe_timer_singleton = nullptr;
+  TokenBucketThrottle* throttle = nullptr;
+  safe_timer_singleton =
+    &m_cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
+      "tools::immutable_object_cache", false, m_cct);
+  SafeTimer* timer = safe_timer_singleton;
+  ceph::mutex* timer_lock = &safe_timer_singleton->lock;
+  m_qos_enabled_flag |= flag;
+  auto throttle_flags_it = THROTTLE_FLAGS.find(flag);
+  ceph_assert(throttle_flags_it != THROTTLE_FLAGS.end());
+  throttle = new TokenBucketThrottle(m_cct, throttle_flags_it->second,
+    0, 0, timer, timer_lock);
+  throttle->set_schedule_tick_min(min_tick.count());
+  int ret = throttle->set_limit(limit, burst, burst_seconds.count());
+  if (ret < 0) {
+    lderr(m_cct) << throttle->get_name() << ": invalid qos parameter: "
+                 << "burst(" << burst << ") is less than "
+                 << "limit(" << limit << ")" << dendl;
+    throttle->set_limit(limit, 0, 1);
+  }
+
+  ceph_assert(m_throttles.find(flag) == m_throttles.end());
+  m_throttles.insert({flag, throttle});
+}
+
 }  // namespace immutable_obj_cache
 }  // namespace ceph
index 270a93be452203d919b9b78e9387d3724d18078c..607921320aef4541f4f6c9565ab08ee501e15f11 100644 (file)
@@ -6,6 +6,8 @@
 
 #include "common/ceph_context.h"
 #include "common/ceph_mutex.h"
+#include "common/Throttle.h"
+#include "common/Cond.h"
 #include "include/rados/librados.hpp"
 
 #include "SimplePolicy.h"
@@ -30,11 +32,16 @@ class ObjectCacheStore {
   int init_cache();
   int lookup_object(std::string pool_nspace,
                     uint64_t pool_id, uint64_t snap_id,
+                    uint64_t object_size,
                     std::string object_name,
                     bool return_dne_path,
                     std::string& target_cache_file_path);
-
  private:
+  enum ThrottleTypeCode {
+    THROTTLE_CODE_BYTE,
+    THROTTLE_CODE_OBJECT
+  };
+
   std::string get_cache_file_name(std::string pool_nspace, uint64_t pool_id,
                                   uint64_t snap_id, std::string oid);
   std::string get_cache_file_path(std::string cache_file_name,
@@ -48,6 +55,13 @@ class ObjectCacheStore {
   int handle_promote_callback(int, bufferlist*, std::string);
   int do_evict(std::string cache_file);
 
+  bool take_token_from_throttle(uint64_t object_size, uint64_t object_num);
+  void handle_throttle_ready(uint64_t tokens, uint64_t type);
+  void apply_qos_tick_and_limit(const uint64_t flag,
+                                std::chrono::milliseconds min_tick,
+                                uint64_t limit, uint64_t burst,
+                                std::chrono::seconds burst_seconds);
+
   CephContext *m_cct;
   RadosRef m_rados;
   std::map<uint64_t, librados::IoCtx> m_ioctx_map;
@@ -55,6 +69,14 @@ class ObjectCacheStore {
     ceph::make_mutex("ceph::cache::ObjectCacheStore::m_ioctx_map_lock");
   Policy* m_policy;
   std::string m_cache_root_dir;
+  // throttle mechanism
+  uint64_t m_qos_enabled_flag{0};
+  std::map<uint64_t, TokenBucketThrottle*> m_throttles;
+  bool m_io_throttled{false};
+  ceph::mutex m_throttle_lock =
+    ceph::make_mutex("ceph::cache::ObjectCacheStore::m_throttle_lock");;
+  uint64_t m_iops_tokens{0};
+  uint64_t m_bps_tokens{0};
 };
 
 }  // namespace immutable_obj_cache
index e65e94bee1ed321c3493b1ef0d7a7525b233f345..860017d6aae0a7adffd32e36244a99294edcb922 100644 (file)
@@ -17,7 +17,7 @@ ObjectCacheRequest::ObjectCacheRequest(uint16_t t, uint64_t s)
 ObjectCacheRequest::~ObjectCacheRequest() {}
 
 void ObjectCacheRequest::encode() {
-  ENCODE_START(1, 1, payload);
+  ENCODE_START(2, 1, payload);
   ceph::encode(type, payload);
   ceph::encode(seq, payload);
   if (!payload_empty()) {
@@ -28,11 +28,11 @@ void ObjectCacheRequest::encode() {
 
 void ObjectCacheRequest::decode(bufferlist& bl) {
   auto i = bl.cbegin();
-  DECODE_START(1, i);
+  DECODE_START(2, i);
   ceph::decode(type, i);
   ceph::decode(seq, i);
   if (!payload_empty()) {
-    decode_payload(i);
+    decode_payload(i, struct_v);
   }
   DECODE_FINISH(i);
 }
@@ -52,7 +52,8 @@ void ObjectCacheRegData::encode_payload() {
   ceph::encode(version, payload);
 }
 
-void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i) {
+void ObjectCacheRegData::decode_payload(bufferlist::const_iterator i,
+                                        __u8 encode_version) {
   if (i.end()) {
     return;
   }
@@ -67,17 +68,19 @@ ObjectCacheRegReplyData::~ObjectCacheRegReplyData() {}
 
 void ObjectCacheRegReplyData::encode_payload() {}
 
-void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl) {}
+void ObjectCacheRegReplyData::decode_payload(bufferlist::const_iterator bl,
+                                            __u8 encode_version) {}
 
 ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s,
                                          uint64_t read_offset,
                                          uint64_t read_len,
                                          uint64_t pool_id, uint64_t snap_id,
+                                         uint64_t object_size,
                                          std::string oid,
                                          std::string pool_namespace)
   : ObjectCacheRequest(t, s), read_offset(read_offset),
     read_len(read_len), pool_id(pool_id), snap_id(snap_id),
-    oid(oid), pool_namespace(pool_namespace)
+    object_size(object_size), oid(oid), pool_namespace(pool_namespace)
 {}
 
 ObjectCacheReadData::ObjectCacheReadData(uint16_t t, uint64_t s)
@@ -92,15 +95,20 @@ void ObjectCacheReadData::encode_payload() {
   ceph::encode(snap_id, payload);
   ceph::encode(oid, payload);
   ceph::encode(pool_namespace, payload);
+  ceph::encode(object_size, payload);
 }
 
-void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i) {
+void ObjectCacheReadData::decode_payload(bufferlist::const_iterator i,
+                                        __u8 encode_version) {
   ceph::decode(read_offset, i);
   ceph::decode(read_len, i);
   ceph::decode(pool_id, i);
   ceph::decode(snap_id, i);
   ceph::decode(oid, i);
   ceph::decode(pool_namespace, i);
+  if (encode_version >= 2) {
+    ceph::decode(object_size, i);
+  }
 }
 
 ObjectCacheReadReplyData::ObjectCacheReadReplyData(uint16_t t, uint64_t s,
@@ -115,7 +123,8 @@ void ObjectCacheReadReplyData::encode_payload() {
   ceph::encode(cache_path, payload);
 }
 
-void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i) {
+void ObjectCacheReadReplyData::decode_payload(bufferlist::const_iterator i,
+                                              __u8 encode_version) {
   ceph::decode(cache_path, i);
 }
 
@@ -127,7 +136,8 @@ ObjectCacheReadRadosData::~ObjectCacheReadRadosData() {}
 
 void ObjectCacheReadRadosData::encode_payload() {}
 
-void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i) {}
+void ObjectCacheReadRadosData::decode_payload(bufferlist::const_iterator i,
+                                              __u8 encode_version) {}
 
 ObjectCacheRequest* decode_object_cache_request(bufferlist payload_buffer) {
   ObjectCacheRequest* req = nullptr;
index 5fab1ec4897ba9aeed34c8aa48f116b7f23c11a9..05394d843070c4cc9dbfdba4373f5bf57b5e1494 100644 (file)
@@ -50,7 +50,8 @@ class ObjectCacheRequest {
   bufferlist get_payload_bufferlist() { return payload; }
 
   virtual void encode_payload() = 0;
-  virtual void decode_payload(bufferlist::const_iterator bl_it) = 0;
+  virtual void decode_payload(bufferlist::const_iterator bl_it,
+                              __u8 encode_version) = 0;
   virtual uint16_t get_request_type() = 0;
   virtual bool payload_empty() = 0;
 };
@@ -63,7 +64,8 @@ class ObjectCacheRegData : public ObjectCacheRequest {
   ObjectCacheRegData(uint16_t t, uint64_t s);
   ~ObjectCacheRegData() override;
   void encode_payload() override;
-  void decode_payload(bufferlist::const_iterator bl) override;
+  void decode_payload(bufferlist::const_iterator bl,
+                      __u8 encode_version) override;
   uint16_t get_request_type() override { return RBDSC_REGISTER; }
   bool payload_empty() override { return false; }
 };
@@ -74,7 +76,8 @@ class ObjectCacheRegReplyData : public ObjectCacheRequest {
   ObjectCacheRegReplyData(uint16_t t, uint64_t s);
   ~ObjectCacheRegReplyData() override;
   void encode_payload() override;
-  void decode_payload(bufferlist::const_iterator iter) override;
+  void decode_payload(bufferlist::const_iterator iter,
+                      __u8 encode_version) override;
   uint16_t get_request_type() override { return RBDSC_REGISTER_REPLY; }
   bool payload_empty() override { return true; }
 };
@@ -85,16 +88,18 @@ class ObjectCacheReadData : public ObjectCacheRequest {
   uint64_t read_len;
   uint64_t pool_id;
   uint64_t snap_id;
+  uint64_t object_size = 0;
   std::string oid;
   std::string pool_namespace;
   ObjectCacheReadData(uint16_t t, uint64_t s, uint64_t read_offset,
                       uint64_t read_len, uint64_t pool_id,
-                      uint64_t snap_id, std::string oid,
-                      std::string pool_namespace);
+                      uint64_t snap_id, uint64_t object_size,
+                      std::string oid, std::string pool_namespace);
   ObjectCacheReadData(uint16_t t, uint64_t s);
   ~ObjectCacheReadData() override;
   void encode_payload() override;
-  void decode_payload(bufferlist::const_iterator bl) override;
+  void decode_payload(bufferlist::const_iterator bl,
+                      __u8 encode_version) override;
   uint16_t get_request_type() override { return RBDSC_READ; }
   bool payload_empty() override { return false; }
 };
@@ -106,7 +111,8 @@ class ObjectCacheReadReplyData : public ObjectCacheRequest {
   ObjectCacheReadReplyData(uint16_t t, uint64_t s);
   ~ObjectCacheReadReplyData() override;
   void encode_payload() override;
-  void decode_payload(bufferlist::const_iterator bl) override;
+  void decode_payload(bufferlist::const_iterator bl,
+                      __u8 encode_version) override;
   uint16_t get_request_type() override { return RBDSC_READ_REPLY; }
   bool payload_empty() override { return false; }
 };
@@ -117,7 +123,8 @@ class ObjectCacheReadRadosData : public ObjectCacheRequest {
   ObjectCacheReadRadosData(uint16_t t, uint64_t s);
   ~ObjectCacheReadRadosData() override;
   void encode_payload() override;
-  void decode_payload(bufferlist::const_iterator bl) override;
+  void decode_payload(bufferlist::const_iterator bl,
+                      __u8 encode_version) override;
   uint16_t get_request_type() override { return RBDSC_READ_RADOS; }
   bool payload_empty() override { return true; }
 };