From f9d9fb6a68c86f3a5413483973bc7f4134567fd0 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 20 Apr 2012 14:55:42 -0700 Subject: [PATCH] rados_bencher: abstract away rados specific operations Signed-off-by: Yehuda Sadeh --- src/Makefile.am | 2 +- src/common/obj_bencher.cc | 85 +++++++++++++++++++++------------------ src/common/obj_bencher.h | 32 ++++++++++----- src/rados.cc | 59 +++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 51 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index a28ddb10c3e3d..33b2e8d2115bd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1221,6 +1221,7 @@ noinst_HEADERS = \ common/environment.h\ common/likely.h\ common/lockdep.h\ + common/obj_bencher.h\ common/snap_types.h\ common/Clock.h\ common/Cond.h\ @@ -1557,7 +1558,6 @@ noinst_HEADERS = \ osd/ReplicatedPG.h\ osd/Watch.h\ osd/osd_types.h\ - osdc/rados_bencher.h\ osdc/Blinker.h\ osdc/Filer.h\ osdc/Journaler.h\ diff --git a/src/common/obj_bencher.cc b/src/common/obj_bencher.cc index db0afaead27f8..0cb59d21d9527 100644 --- a/src/common/obj_bencher.cc +++ b/src/common/obj_bencher.cc @@ -14,7 +14,6 @@ * try and bench on a pool you don't have permission to access * it will just loop forever. */ -#include "include/rados/librados.hpp" #include "common/Cond.h" #include "obj_bencher.h" @@ -46,8 +45,8 @@ static void sanitize_object_contents (bench_data *data, int length) { } } -void *RadosBencher::status_printer(void *_bencher) { - RadosBencher *bencher = (RadosBencher *)_bencher; +void *ObjBencher::status_printer(void *_bencher) { + ObjBencher *bencher = (ObjBencher *)_bencher; bench_data& data = bencher->data; Cond cond; int i = 0; @@ -113,7 +112,7 @@ void *RadosBencher::status_printer(void *_bencher) { return NULL; } -int RadosBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) { +int ObjBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) { int object_size = op_size; int num_objects = 0; char* contentsChars = new char[op_size]; @@ -123,7 +122,7 @@ int RadosBencher::aio_bench(int operation, int secondsToRun, int concurrentios, //get data from previous write run, if available if (operation != OP_WRITE) { bufferlist object_data; - r = io_ctx.read(BENCH_DATA, object_data, sizeof(int)*3, 0); + r = sync_read(BENCH_DATA, object_data, sizeof(int)*3); if (r <= 0) { delete[] contentsChars; if (r == -2) @@ -185,12 +184,11 @@ void _aio_cb(void *cb, void *arg) { lc->lock->Unlock(); } -int RadosBencher::write_bench(int secondsToRun, int concurrentios) { +int ObjBencher::write_bench(int secondsToRun, int concurrentios) { cout << "Maintaining " << concurrentios << " concurrent writes of " << data.object_size << " bytes for at least " << secondsToRun << " seconds." << std::endl; - librados::AioCompletion* completions[concurrentios]; char* name[concurrentios]; bufferlist* contents[concurrentios]; double total_latency = 0; @@ -202,6 +200,8 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) { utime_t runtime; utime_t timePassed; + r = completions_init(concurrentios); + //set up writes so I can start them together for (int i = 0; iis_safe()) { + if (completion_is_done(slot)) { break; } } @@ -260,9 +261,9 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) { generate_object_name(newName, 128, data.started); snprintf(data.object_contents, data.object_size, "I'm the %dth object!", data.started); newContents->append(data.object_contents, data.object_size); - completions[slot]->wait_for_safe(); + completion_wait(slot); lock.Lock(); - r = completions[slot]->get_return_value(); + r = completion_ret(slot); if (r != 0) { lock.Unlock(); goto ERR; @@ -275,15 +276,16 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) { data.avg_latency = total_latency / data.finished; --data.in_flight; lock.Unlock(); - completions[slot]->release(); - completions[slot] = 0; + release_completion(slot); timePassed = ceph_clock_now(g_ceph_context) - data.start_time; - //write new stuff to rados, then delete old stuff + //write new stuff to backend, then delete old stuff //and save locations of new stuff for later deletion start_times[slot] = ceph_clock_now(g_ceph_context); - completions[slot] = rados.aio_create_completion((void *) &lc, 0, &_aio_cb); - r = io_ctx.aio_write(newName, completions[slot], *newContents, data.object_size, 0); + r = create_completion(slot, _aio_cb, &lc); + if (r < 0) + goto ERR; + r = aio_write(newName, slot, *newContents, data.object_size); if (r < 0) {//naughty; doesn't clean up heap space. goto ERR; } @@ -299,9 +301,9 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) { while (data.finished < data.started) { slot = data.finished % concurrentios; - completions[slot]->wait_for_safe(); + completion_wait(slot); lock.Lock(); - r = completions[slot]->get_return_value(); + r = completion_ret(slot); if (r != 0) { lock.Unlock(); goto ERR; @@ -314,8 +316,7 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) { data.avg_latency = total_latency / data.finished; --data.in_flight; lock.Unlock(); - completions[slot]->release(); - completions[slot] = 0; + release_completion(slot); delete[] name[slot]; delete contents[slot]; } @@ -345,7 +346,10 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) { ::encode(data.object_size, b_write); ::encode(data.finished, b_write); ::encode(getpid(), b_write); - io_ctx.write(BENCH_DATA, b_write, sizeof(int)*3, 0); + sync_write(BENCH_DATA, b_write, sizeof(int)*3); + + completions_done(); + return 0; ERR: @@ -356,11 +360,10 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) { return -5; } -int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) { +int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) { data.finished = 0; lock_cond lc(&lock); - librados::AioCompletion* completions[concurrentios]; char* name[concurrentios]; bufferlist* contents[concurrentios]; int index[concurrentios]; @@ -375,6 +378,10 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur sanitize_object_contents(&data, 128); //clean it up once; subsequent //changes will be safe because string length monotonically increases + r = completions_init(concurrentios); + if (r < 0) + return r; + //set up initial reads for (int i = 0; i < concurrentios; ++i) { name[i] = new char[128]; @@ -393,8 +400,8 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur for (int i = 0; i < concurrentios; ++i) { index[i] = i; start_times[i] = ceph_clock_now(g_ceph_context); - completions[i] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0); - r = io_ctx.aio_read(name[i], completions[i], contents[i], data.object_size, 0); + create_completion(i, _aio_cb, (void *)&lc); + r = aio_read(name[i], i, contents[i], data.object_size); if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread! cerr << "r = " << r << std::endl; goto ERR; @@ -415,7 +422,7 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur lock.Lock(); while (1) { for (slot = 0; slot < concurrentios; ++slot) { - if (completions[slot]->is_complete()) { + if (completion_is_done(slot)) { break; } } @@ -429,9 +436,9 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur generate_object_name(newName, 128, data.started, pid); int current_index = index[slot]; index[slot] = data.started; - completions[slot]->wait_for_complete(); + completion_wait(slot); lock.Lock(); - r = completions[slot]->get_return_value(); + r = completion_ret(slot); if (r != 0) { cerr << "read got " << r << std::endl; lock.Unlock(); @@ -445,15 +452,14 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur data.avg_latency = total_latency / data.finished; --data.in_flight; lock.Unlock(); - completions[slot]->release(); - completions[slot] = 0; + release_completion(slot); cur_contents = contents[slot]; //start new read and check data if requested start_times[slot] = ceph_clock_now(g_ceph_context); contents[slot] = new bufferlist(); - completions[slot] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0); - r = io_ctx.aio_read(newName, completions[slot], contents[slot], data.object_size, 0); + create_completion(slot, _aio_cb, (void *)&lc); + r = aio_read(newName, slot, contents[slot], data.object_size); if (r < 0) { goto ERR; } @@ -474,9 +480,9 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur //wait for final reads to complete while (data.finished < data.started) { slot = data.finished % concurrentios; - completions[slot]->wait_for_complete(); + completion_wait(slot); lock.Lock(); - r = completions[slot]->get_return_value(); + r = completion_ret(slot); if (r != 0) { cerr << "read got " << r << std::endl; lock.Unlock(); @@ -489,8 +495,7 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur ++data.finished; data.avg_latency = total_latency / data.finished; --data.in_flight; - completions[slot]-> release(); - completions[slot] = 0; + release_completion(slot); snprintf(data.object_contents, data.object_size, "I'm the %dth object!", index[slot]); lock.Unlock(); if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) { @@ -522,6 +527,8 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur << "Max latency: " << data.max_latency << std::endl << "Min latency: " << data.min_latency << std::endl; + completions_done(); + return 0; ERR: diff --git a/src/common/obj_bencher.h b/src/common/obj_bencher.h index cd685b8509448..0a2ea583b383c 100644 --- a/src/common/obj_bencher.h +++ b/src/common/obj_bencher.h @@ -9,16 +9,11 @@ * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * - * Series of functions to test your rados installation. Notice - * that this code is not terribly robust -- for instance, if you - * try and bench on a pool you don't have permission to access - * it will just loop forever. */ -#ifndef CEPH_RADOS_BENCHER_H -#define CEPH_RADOS_BENCHER_H +#ifndef CEPH_OBJ_BENCHER_H +#define CEPH_OBJ_BENCHER_H -#include "include/rados/librados.hpp" #include "common/config.h" #include "common/Cond.h" @@ -42,10 +37,9 @@ const int OP_WRITE = 1; const int OP_SEQ_READ = 2; const int OP_RAND_READ = 3; -class RadosBencher { +class ObjBencher { +protected: Mutex lock; - librados::Rados& rados; - librados::IoCtx& io_ctx; static void *status_printer(void *bencher); @@ -53,8 +47,24 @@ class RadosBencher { int write_bench(int secondsToRun, int concurrentios); int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid); + + virtual int completions_init(int concurrentios) = 0; + virtual void completions_done() = 0; + + virtual int create_completion(int i, void (*cb)(void *, void*), void *arg) = 0; + virtual void release_completion(int slot) = 0; + + virtual bool completion_is_done(int slot) = 0; + virtual int completion_wait(int slot) = 0; + virtual int completion_ret(int slot) = 0; + + virtual int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) = 0; + virtual int aio_write(const std::string& oid, int slot, const bufferlist& bl, size_t len) = 0; + virtual int sync_read(const std::string& oid, bufferlist& bl, size_t len) = 0; + virtual int sync_write(const std::string& oid, bufferlist& bl, size_t len) = 0; public: - RadosBencher(librados::Rados& _r, librados::IoCtx& _i) : lock("RadosBencher::lock"), rados(_r), io_ctx(_i) {} + ObjBencher() : lock("ObjBencher::lock") {} + virtual ~ObjBencher() {} int aio_bench(int operation, int secondsToRun, int concurrentios, int op_size); }; diff --git a/src/rados.cc b/src/rados.cc index b88b4c6213ab7..3e985b96fa4ac 100644 --- a/src/rados.cc +++ b/src/rados.cc @@ -25,6 +25,7 @@ using namespace librados; #include "common/Cond.h" #include "common/debug.h" #include "common/Formatter.h" +#include "common/obj_bencher.h" #include "mds/inode_backtrace.h" #include "auth/Crypto.h" #include @@ -577,6 +578,64 @@ void LoadGen::cleanup() } } + +class RadosBencher : public ObjBencher { + librados::AioCompletion **completions; + librados::Rados& rados; + librados::IoCtx& io_ctx; +protected: + int completions_init(int concurrentios) { + completions = new librados::AioCompletion *[concurrentios]; + return 0; + } + void completions_done() { + delete[] completions; + completions = NULL; + } + int create_completion(int slot, void (*cb)(void *, void*), void *arg) { + completions[slot] = rados.aio_create_completion((void *) arg, 0, cb); + + if (!completions[slot]) + return -EINVAL; + + return 0; + } + void release_completion(int slot) { + completions[slot]->release(); + completions[slot] = 0; + } + + int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) { + return io_ctx.aio_read(oid, completions[slot], pbl, len, 0); + } + + int aio_write(const std::string& oid, int slot, const bufferlist& bl, size_t len) { + return io_ctx.aio_write(oid, completions[slot], bl, len, 0); + } + + int sync_read(const std::string& oid, bufferlist& bl, size_t len) { + return io_ctx.read(oid, bl, len, 0); + } + int sync_write(const std::string& oid, bufferlist& bl, size_t len) { + return io_ctx.write(oid, bl, len, 0); + } + + bool completion_is_done(int slot) { + return completions[slot]->is_safe(); + } + + int completion_wait(int slot) { + return completions[slot]->wait_for_safe(); + } + int completion_ret(int slot) { + return completions[slot]->get_return_value(); + } + +public: + RadosBencher(librados::Rados& _r, librados::IoCtx& _i) : completions(NULL), rados(_r), io_ctx(_i) {} + ~RadosBencher() { } +}; + /********************************************** **********************************************/ -- 2.39.5