ldout(cct, 20) << "finishing waiters " << ls << dendl;
finish_contexts(cct, ls, err);
+ retry_waiting_reads();
+
--reads_outstanding;
read_cond.Signal();
}
// TODO: make read path not call _readx for every completion
hits.insert(errors.begin(), errors.end());
}
-
+
if (!missing.empty() || !rx.empty()) {
// read missing
for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
bh_it != missing.end();
++bh_it) {
- bh_read(bh_it->second);
- if (success && onfinish) {
- ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
- << " off " << bh_it->first << dendl;
- bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
- }
+ uint64_t rx_bytes = static_cast<uint64_t>(
+ stat_rx + bh_it->second->length());
+ if (!waitfor_read.empty() || rx_bytes > max_size) {
+ // cache is full with concurrent reads -- wait for rx's to complete
+ // to constrain memory growth (especially during copy-ups)
+ if (success) {
+ ldout(cct, 10) << "readx missed, waiting on cache to complete "
+ << waitfor_read.size() << " blocked reads, "
+ << (MAX(rx_bytes, max_size) - max_size)
+ << " read bytes" << dendl;
+ waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish));
+ }
+
+ bh_remove(o, bh_it->second);
+ delete bh_it->second;
+ } else {
+ bh_read(bh_it->second);
+ if (success && onfinish) {
+ ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
+ << " off " << bh_it->first << dendl;
+ bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
+ }
+ }
bytes_not_in_cache += bh_it->second->length();
success = false;
}
// no misses... success! do the read.
assert(!hit_ls.empty());
ldout(cct, 10) << "readx has all buffers" << dendl;
-
+
// ok, assemble into result buffer.
uint64_t pos = 0;
if (rd->bl && !error) {
return ret;
}
+void ObjectCacher::retry_waiting_reads()
+{
+ list<Context *> ls;
+ ls.swap(waitfor_read);
+
+ while (!ls.empty() && waitfor_read.empty()) {
+ Context *ctx = ls.front();
+ ls.pop_front();
+ ctx->complete(0);
+ }
+ waitfor_read.splice(waitfor_read.end(), ls);
+}
int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
Context *onfreespace)
vector<ceph::unordered_map<sobject_t, Object*> > objects; // indexed by pool_id
+ list<Context*> waitfor_read;
+
ceph_tid_t last_read_tid;
set<BufferHead*> dirty_or_tx_bh;
int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
bool external_call);
+ void retry_waiting_reads();
public:
void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
#include "global/global_context.h"
#include "global/global_init.h"
#include "common/ceph_argparse.h"
+#include "common/config.h"
#include "gtest/gtest.h"
#include "include/interval_set.h"
#include "include/stringify.h"
+#include <boost/scope_exit.hpp>
+
using namespace std;
static int get_features(bool *old_format, uint64_t *features)
{
if (old_format) {
return rbd_create(ioctx, name, size, order);
+ } else if ((features & RBD_FEATURE_STRIPINGV2) != 0) {
+ return rbd_create3(ioctx, name, size, features, order, 65536, 16);
} else {
return rbd_create2(ioctx, name, size, features, order);
}
rados_ioctx_destroy(ioctx);
}
+TEST_F(TestLibRBD, LargeCacheRead)
+{
+ if (!g_conf->rbd_cache) {
+ std::cout << "SKIPPING due to disabled cache" << std::endl;
+ return;
+ }
+
+ rados_ioctx_t ioctx;
+ rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+ uint64_t orig_cache_size = g_conf->rbd_cache_size;
+ g_conf->set_val("rbd_cache_size", "16777216");
+ BOOST_SCOPE_EXIT( (orig_cache_size) ) {
+ g_conf->set_val("rbd_cache_size", stringify(orig_cache_size).c_str());
+ } BOOST_SCOPE_EXIT_END;
+ ASSERT_EQ(16777216, g_conf->rbd_cache_size);
+
+ rbd_image_t image;
+ int order = 0;
+ const char *name = "testimg";
+ uint64_t size = g_conf->rbd_cache_size + 1;
+
+ ASSERT_EQ(0, create_image(ioctx, name, size, &order));
+ ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));
+
+ std::string buffer(1 << order, '1');
+ for (size_t offs = 0; offs < size; offs += buffer.size()) {
+ size_t len = std::min<uint64_t>(buffer.size(), size - offs);
+ ASSERT_EQ(static_cast<ssize_t>(len),
+ rbd_write(image, offs, len, buffer.c_str()));
+ }
+
+ ASSERT_EQ(0, rbd_invalidate_cache(image));
+
+ buffer.resize(size);
+ ASSERT_EQ(static_cast<ssize_t>(size-1024), rbd_read(image, 1024, size, &buffer[0]));
+
+ ASSERT_EQ(0, rbd_close(image));
+
+ rados_ioctx_destroy(ioctx);
+}
+
int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);