]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: Constrain max number of in-flight read requests 2820/head
authorJason Dillaman <dillaman@redhat.com>
Mon, 27 Oct 2014 18:47:19 +0000 (14:47 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 11 Nov 2014 03:20:58 +0000 (22:20 -0500)
Constrain the number of in-flight RADOS read requests to the
cache size.  This reduces the chance of the cache memory
ballooning during certain scenarios like copy-up which can
invoke many concurrent read requests.

Fixes: #9854
Backport: giant, firefly, dumpling
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h
src/test/librbd/test_librbd.cc

index 8455fb6a5240dd21e1a98ca2be25d7a64026972d..01cf2baba0835bcc5050c853cadd53b2466b1660 100644 (file)
@@ -795,6 +795,8 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
   ldout(cct, 20) << "finishing waiters " << ls << dendl;
 
   finish_contexts(cct, ls, err);
+  retry_waiting_reads();
+
   --reads_outstanding;
   read_cond.Signal();
 }
@@ -1104,18 +1106,35 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
       // TODO: make read path not call _readx for every completion
       hits.insert(errors.begin(), errors.end());
     }
-    
+
     if (!missing.empty() || !rx.empty()) {
       // read missing
       for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
            bh_it != missing.end();
            ++bh_it) {
-        bh_read(bh_it->second);
-        if (success && onfinish) {
-          ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second 
-                   << " off " << bh_it->first << dendl;
-         bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
-        }
+       uint64_t rx_bytes = static_cast<uint64_t>(
+         stat_rx + bh_it->second->length());
+       if (!waitfor_read.empty() || rx_bytes > max_size) {
+         // cache is full with concurrent reads -- wait for rx's to complete
+         // to constrain memory growth (especially during copy-ups)
+         if (success) {
+           ldout(cct, 10) << "readx missed, waiting on cache to complete "
+                          << waitfor_read.size() << " blocked reads, "
+                          << (MAX(rx_bytes, max_size) - max_size)
+                          << " read bytes" << dendl;
+           waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish));
+         }
+
+         bh_remove(o, bh_it->second);
+         delete bh_it->second;
+       } else {
+         bh_read(bh_it->second);
+         if (success && onfinish) {
+           ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
+                          << " off " << bh_it->first << dendl;
+           bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
+         }
+       }
         bytes_not_in_cache += bh_it->second->length();
        success = false;
       }
@@ -1229,7 +1248,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
   // no misses... success!  do the read.
   assert(!hit_ls.empty());
   ldout(cct, 10) << "readx has all buffers" << dendl;
-  
+
   // ok, assemble into result buffer.
   uint64_t pos = 0;
   if (rd->bl && !error) {
@@ -1262,6 +1281,18 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
   return ret;
 }
 
+void ObjectCacher::retry_waiting_reads()
+{
+  list<Context *> ls;
+  ls.swap(waitfor_read);
+
+  while (!ls.empty() && waitfor_read.empty()) {
+    Context *ctx = ls.front();
+    ls.pop_front();
+    ctx->complete(0);
+  }
+  waitfor_read.splice(waitfor_read.end(), ls);
+}
 
 int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
                         Context *onfreespace)
index ca16138fa2d00d764b88899a6bfd5ce010c1cfc1..bed12342d07b619457106c720d5ed22fb1fb31a1 100644 (file)
@@ -341,6 +341,8 @@ class ObjectCacher {
 
   vector<ceph::unordered_map<sobject_t, Object*> > objects; // indexed by pool_id
 
+  list<Context*> waitfor_read;
+
   ceph_tid_t last_read_tid;
 
   set<BufferHead*>    dirty_or_tx_bh;
@@ -457,6 +459,7 @@ class ObjectCacher {
 
   int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
             bool external_call);
+  void retry_waiting_reads();
 
  public:
   void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
index e0f26ff5c25e2eb4e9ce91b48496fff725132075..e4e3a1c69cdbefbf09ee04f8703608a4cc3b1f7e 100644 (file)
@@ -21,6 +21,7 @@
 #include "global/global_context.h"
 #include "global/global_init.h"
 #include "common/ceph_argparse.h"
+#include "common/config.h"
 
 #include "gtest/gtest.h"
 
@@ -42,6 +43,8 @@
 #include "include/interval_set.h"
 #include "include/stringify.h"
 
+#include <boost/scope_exit.hpp>
+
 using namespace std;
 
 static int get_features(bool *old_format, uint64_t *features)
@@ -69,6 +72,8 @@ static int create_image_full(rados_ioctx_t ioctx, const char *name,
 {
   if (old_format) {
     return rbd_create(ioctx, name, size, order);
+  } else if ((features & RBD_FEATURE_STRIPINGV2) != 0) {
+    return rbd_create3(ioctx, name, size, features, order, 65536, 16);
   } else {
     return rbd_create2(ioctx, name, size, features, order);
   }
@@ -1971,6 +1976,48 @@ TEST_F(TestLibRBD, ZeroLengthRead)
   rados_ioctx_destroy(ioctx);
 }
 
+TEST_F(TestLibRBD, LargeCacheRead)
+{
+  if (!g_conf->rbd_cache) {
+    std::cout << "SKIPPING due to disabled cache" << std::endl;
+    return;
+  }
+
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  uint64_t orig_cache_size = g_conf->rbd_cache_size;
+  g_conf->set_val("rbd_cache_size", "16777216");
+  BOOST_SCOPE_EXIT( (orig_cache_size) ) {
+    g_conf->set_val("rbd_cache_size", stringify(orig_cache_size).c_str());
+  } BOOST_SCOPE_EXIT_END;
+  ASSERT_EQ(16777216, g_conf->rbd_cache_size);
+
+  rbd_image_t image;
+  int order = 0;
+  const char *name = "testimg";
+  uint64_t size = g_conf->rbd_cache_size + 1;
+
+  ASSERT_EQ(0, create_image(ioctx, name, size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
+
+  std::string buffer(1 << order, '1');
+  for (size_t offs = 0; offs < size; offs += buffer.size()) {
+    size_t len = std::min<uint64_t>(buffer.size(), size - offs);
+    ASSERT_EQ(static_cast<ssize_t>(len),
+             rbd_write(image, offs, len, buffer.c_str()));
+  }
+
+  ASSERT_EQ(0, rbd_invalidate_cache(image));
+
+  buffer.resize(size);
+  ASSERT_EQ(static_cast<ssize_t>(size-1024), rbd_read(image, 1024, size, &buffer[0]));
+
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
+}
+
 int main(int argc, char **argv)
 {
   ::testing::InitGoogleTest(&argc, argv);