]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: Constrain max number of in-flight read requests 3410/head
authorJason Dillaman <dillaman@redhat.com>
Mon, 27 Oct 2014 18:47:19 +0000 (14:47 -0400)
committerJason Dillaman <dillaman@redhat.com>
Mon, 19 Jan 2015 17:47:28 +0000 (12:47 -0500)
Constrain the number of in-flight RADOS read requests to the
cache size.  This reduces the chance of the cache memory
ballooning during certain scenarios like copy-up which can
invoke many concurrent read requests.

Fixes: #9854
Backport: giant, firefly, dumpling
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 068d68850d09dfcaccc5a3ce85c80b2f6d808ea9)

src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h
src/test/librbd/test_librbd.cc

index e1499b4148541329c439b309b2fa04ddd438698d..95abee1c8bc73c27b96f69eab2e5e907fab6c3e2 100644 (file)
@@ -796,6 +796,8 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
   ldout(cct, 20) << "finishing waiters " << ls << dendl;
 
   finish_contexts(cct, ls, err);
+  retry_waiting_reads();
+
   --reads_outstanding;
   read_cond.Signal();
 }
@@ -1105,18 +1107,35 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
       // TODO: make read path not call _readx for every completion
       hits.insert(errors.begin(), errors.end());
     }
-    
+
     if (!missing.empty() || !rx.empty()) {
       // read missing
       for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
            bh_it != missing.end();
            ++bh_it) {
-        bh_read(bh_it->second);
-        if (success && onfinish) {
-          ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second 
-                   << " off " << bh_it->first << dendl;
-         bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
-        }
+       uint64_t rx_bytes = static_cast<uint64_t>(
+         stat_rx + bh_it->second->length());
+       if (!waitfor_read.empty() || rx_bytes > max_size) {
+         // cache is full with concurrent reads -- wait for rx's to complete
+         // to constrain memory growth (especially during copy-ups)
+         if (success) {
+           ldout(cct, 10) << "readx missed, waiting on cache to complete "
+                          << waitfor_read.size() << " blocked reads, "
+                          << (MAX(rx_bytes, max_size) - max_size)
+                          << " read bytes" << dendl;
+           waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish));
+         }
+
+         bh_remove(o, bh_it->second);
+         delete bh_it->second;
+       } else {
+         bh_read(bh_it->second);
+         if (success && onfinish) {
+           ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
+                          << " off " << bh_it->first << dendl;
+           bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
+         }
+       }
         bytes_not_in_cache += bh_it->second->length();
        success = false;
       }
@@ -1230,7 +1249,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
   // no misses... success!  do the read.
   assert(!hit_ls.empty());
   ldout(cct, 10) << "readx has all buffers" << dendl;
-  
+
   // ok, assemble into result buffer.
   uint64_t pos = 0;
   if (rd->bl && !error) {
@@ -1263,6 +1282,18 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
   return ret;
 }
 
+void ObjectCacher::retry_waiting_reads()
+{
+  list<Context *> ls;
+  ls.swap(waitfor_read);
+
+  while (!ls.empty() && waitfor_read.empty()) {
+    Context *ctx = ls.front();
+    ls.pop_front();
+    ctx->complete(0);
+  }
+  waitfor_read.splice(waitfor_read.end(), ls);
+}
 
 int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
                         Context *onfreespace)
index d2aebe984cdf4865590eabf8143e7c7487c45a94..b48f8ace6627df8516d00f0d2bc327e7fea788aa 100644 (file)
@@ -341,6 +341,8 @@ class ObjectCacher {
 
   vector<ceph::unordered_map<sobject_t, Object*> > objects; // indexed by pool_id
 
+  list<Context*> waitfor_read;
+
   ceph_tid_t last_read_tid;
 
   set<BufferHead*>    dirty_bh;
@@ -457,6 +459,7 @@ class ObjectCacher {
 
   int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
             bool external_call);
+  void retry_waiting_reads();
 
  public:
   void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
index 7f354187277ade26ab78aafa9b7dd64149baa00b..8997e8f121474ec9c23a17c90bdb57243c65637b 100644 (file)
@@ -21,6 +21,7 @@
 #include "global/global_context.h"
 #include "global/global_init.h"
 #include "common/ceph_argparse.h"
+#include "common/config.h"
 
 #include "gtest/gtest.h"
 
@@ -40,6 +41,8 @@
 #include "include/interval_set.h"
 #include "include/stringify.h"
 
+#include <boost/scope_exit.hpp>
+
 using namespace std;
 
 static int get_features(bool *old_format, uint64_t *features)
@@ -67,6 +70,8 @@ static int create_image_full(rados_ioctx_t ioctx, const char *name,
 {
   if (old_format) {
     return rbd_create(ioctx, name, size, order);
+  } else if ((features & RBD_FEATURE_STRIPINGV2) != 0) {
+    return rbd_create3(ioctx, name, size, features, order, 65536, 16);
   } else {
     return rbd_create2(ioctx, name, size, features, order);
   }
@@ -1859,6 +1864,52 @@ TEST(LibRBD, ZeroLengthRead)
   ASSERT_EQ(0, destroy_one_pool(pool_name, &cluster));
 }
 
+TEST(LibRBD, LargeCacheRead)
+{
+  if (!g_conf->rbd_cache) {
+    std::cout << "SKIPPING due to disabled cache" << std::endl;
+    return;
+  }
+
+  rados_t cluster;
+  rados_ioctx_t ioctx;
+  string pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool(pool_name, &cluster));
+  rados_ioctx_create(cluster, pool_name.c_str(), &ioctx);
+
+  uint64_t orig_cache_size = g_conf->rbd_cache_size;
+  g_conf->set_val("rbd_cache_size", "16777216");
+  BOOST_SCOPE_EXIT( (orig_cache_size) ) {
+    g_conf->set_val("rbd_cache_size", stringify(orig_cache_size).c_str());
+  } BOOST_SCOPE_EXIT_END;
+  ASSERT_EQ(16777216, g_conf->rbd_cache_size);
+
+  rbd_image_t image;
+  int order = 0;
+  const char *name = "testimg";
+  uint64_t size = g_conf->rbd_cache_size + 1;
+
+  ASSERT_EQ(0, create_image(ioctx, name, size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
+
+  std::string buffer(1 << order, '1');
+  for (size_t offs = 0; offs < size; offs += buffer.size()) {
+    size_t len = std::min<uint64_t>(buffer.size(), size - offs);
+    ASSERT_EQ(static_cast<ssize_t>(len),
+             rbd_write(image, offs, len, buffer.c_str()));
+  }
+
+  ASSERT_EQ(0, rbd_invalidate_cache(image));
+
+  buffer.resize(size);
+  ASSERT_EQ(static_cast<ssize_t>(size-1024), rbd_read(image, 1024, size, &buffer[0]));
+
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
+  ASSERT_EQ(0, destroy_one_pool(pool_name, &cluster));
+}
+
 int main(int argc, char **argv)
 {
   ::testing::InitGoogleTest(&argc, argv);