From cd784eab22a329cd3eb3798feb9d975a4506c57e Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Mon, 27 Oct 2014 14:47:19 -0400 Subject: [PATCH] osdc: Constrain max number of in-flight read requests Constrain the number of in-flight RADOS read requests to the cache size. This reduces the chance of the cache memory ballooning during certain scenarios like copy-up which can invoke many concurrent read requests. Fixes: #9854 Backport: giant, firefly, dumpling Signed-off-by: Jason Dillaman --- src/osdc/ObjectCacher.cc | 47 ++++++++++++++++++++++++++++------ src/osdc/ObjectCacher.h | 3 +++ src/test/librbd/test_librbd.cc | 47 ++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 8 deletions(-) diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 8455fb6a5240d..01cf2baba0835 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -795,6 +795,8 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, ldout(cct, 20) << "finishing waiters " << ls << dendl; finish_contexts(cct, ls, err); + retry_waiting_reads(); + --reads_outstanding; read_cond.Signal(); } @@ -1104,18 +1106,35 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, // TODO: make read path not call _readx for every completion hits.insert(errors.begin(), errors.end()); } - + if (!missing.empty() || !rx.empty()) { // read missing for (map::iterator bh_it = missing.begin(); bh_it != missing.end(); ++bh_it) { - bh_read(bh_it->second); - if (success && onfinish) { - ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second - << " off " << bh_it->first << dendl; - bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) ); - } + uint64_t rx_bytes = static_cast( + stat_rx + bh_it->second->length()); + if (!waitfor_read.empty() || rx_bytes > max_size) { + // cache is full with concurrent reads -- wait for rx's to complete + // to constrain memory growth (especially during copy-ups) + if (success) { + ldout(cct, 10) << "readx missed, waiting on cache to complete " + << waitfor_read.size() << " blocked reads, " + << (MAX(rx_bytes, max_size) - max_size) + << " read bytes" << dendl; + waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish)); + } + + bh_remove(o, bh_it->second); + delete bh_it->second; + } else { + bh_read(bh_it->second); + if (success && onfinish) { + ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second + << " off " << bh_it->first << dendl; + bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) ); + } + } bytes_not_in_cache += bh_it->second->length(); success = false; } @@ -1229,7 +1248,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, // no misses... success! do the read. assert(!hit_ls.empty()); ldout(cct, 10) << "readx has all buffers" << dendl; - + // ok, assemble into result buffer. uint64_t pos = 0; if (rd->bl && !error) { @@ -1262,6 +1281,18 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, return ret; } +void ObjectCacher::retry_waiting_reads() +{ + list ls; + ls.swap(waitfor_read); + + while (!ls.empty() && waitfor_read.empty()) { + Context *ctx = ls.front(); + ls.pop_front(); + ctx->complete(0); + } + waitfor_read.splice(waitfor_read.end(), ls); +} int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock, Context *onfreespace) diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index ca16138fa2d00..bed12342d07b6 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -341,6 +341,8 @@ class ObjectCacher { vector > objects; // indexed by pool_id + list waitfor_read; + ceph_tid_t last_read_tid; set dirty_or_tx_bh; @@ -457,6 +459,7 @@ class ObjectCacher { int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish, bool external_call); + void retry_waiting_reads(); public: void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid, diff --git a/src/test/librbd/test_librbd.cc b/src/test/librbd/test_librbd.cc index e0f26ff5c25e2..e4e3a1c69cdbe 100644 --- a/src/test/librbd/test_librbd.cc +++ b/src/test/librbd/test_librbd.cc @@ -21,6 +21,7 @@ #include "global/global_context.h" #include "global/global_init.h" #include "common/ceph_argparse.h" +#include "common/config.h" #include "gtest/gtest.h" @@ -42,6 +43,8 @@ #include "include/interval_set.h" #include "include/stringify.h" +#include + using namespace std; static int get_features(bool *old_format, uint64_t *features) @@ -69,6 +72,8 @@ static int create_image_full(rados_ioctx_t ioctx, const char *name, { if (old_format) { return rbd_create(ioctx, name, size, order); + } else if ((features & RBD_FEATURE_STRIPINGV2) != 0) { + return rbd_create3(ioctx, name, size, features, order, 65536, 16); } else { return rbd_create2(ioctx, name, size, features, order); } @@ -1971,6 +1976,48 @@ TEST_F(TestLibRBD, ZeroLengthRead) rados_ioctx_destroy(ioctx); } +TEST_F(TestLibRBD, LargeCacheRead) +{ + if (!g_conf->rbd_cache) { + std::cout << "SKIPPING due to disabled cache" << std::endl; + return; + } + + rados_ioctx_t ioctx; + rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx); + + uint64_t orig_cache_size = g_conf->rbd_cache_size; + g_conf->set_val("rbd_cache_size", "16777216"); + BOOST_SCOPE_EXIT( (orig_cache_size) ) { + g_conf->set_val("rbd_cache_size", stringify(orig_cache_size).c_str()); + } BOOST_SCOPE_EXIT_END; + ASSERT_EQ(16777216, g_conf->rbd_cache_size); + + rbd_image_t image; + int order = 0; + const char *name = "testimg"; + uint64_t size = g_conf->rbd_cache_size + 1; + + ASSERT_EQ(0, create_image(ioctx, name, size, &order)); + ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL)); + + std::string buffer(1 << order, '1'); + for (size_t offs = 0; offs < size; offs += buffer.size()) { + size_t len = std::min(buffer.size(), size - offs); + ASSERT_EQ(static_cast(len), + rbd_write(image, offs, len, buffer.c_str())); + } + + ASSERT_EQ(0, rbd_invalidate_cache(image)); + + buffer.resize(size); + ASSERT_EQ(static_cast(size-1024), rbd_read(image, 1024, size, &buffer[0])); + + ASSERT_EQ(0, rbd_close(image)); + + rados_ioctx_destroy(ioctx); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); -- 2.39.5