* try and bench on a pool you don't have permission to access
* it will just loop forever.
*/
-#include "include/rados/librados.hpp"
#include "common/Cond.h"
#include "obj_bencher.h"
}
}
-void *RadosBencher::status_printer(void *_bencher) {
- RadosBencher *bencher = (RadosBencher *)_bencher;
+void *ObjBencher::status_printer(void *_bencher) {
+ ObjBencher *bencher = (ObjBencher *)_bencher;
bench_data& data = bencher->data;
Cond cond;
int i = 0;
return NULL;
}
-int RadosBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) {
+int ObjBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) {
int object_size = op_size;
int num_objects = 0;
char* contentsChars = new char[op_size];
//get data from previous write run, if available
if (operation != OP_WRITE) {
bufferlist object_data;
- r = io_ctx.read(BENCH_DATA, object_data, sizeof(int)*3, 0);
+ r = sync_read(BENCH_DATA, object_data, sizeof(int)*3);
if (r <= 0) {
delete[] contentsChars;
if (r == -2)
lc->lock->Unlock();
}
-int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
+int ObjBencher::write_bench(int secondsToRun, int concurrentios) {
cout << "Maintaining " << concurrentios << " concurrent writes of "
<< data.object_size << " bytes for at least "
<< secondsToRun << " seconds." << std::endl;
- librados::AioCompletion* completions[concurrentios];
char* name[concurrentios];
bufferlist* contents[concurrentios];
double total_latency = 0;
utime_t runtime;
utime_t timePassed;
+ r = completions_init(concurrentios);
+
//set up writes so I can start them together
for (int i = 0; i<concurrentios; ++i) {
name[i] = new char[128];
pthread_t print_thread;
- pthread_create(&print_thread, NULL, RadosBencher::status_printer, (void *)this);
+ pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
lock.Lock();
data.start_time = ceph_clock_now(g_ceph_context);
lock.Unlock();
for (int i = 0; i<concurrentios; ++i) {
start_times[i] = ceph_clock_now(g_ceph_context);
- completions[i] = rados.aio_create_completion((void *) &lc, 0,
- &_aio_cb);
- r = io_ctx.aio_write(name[i], completions[i], *contents[i], data.object_size, 0);
+ r = create_completion(i, _aio_cb, (void *)&lc);
+ if (r < 0)
+ goto ERR;
+ r = aio_write(name[i], i, *contents[i], data.object_size);
if (r < 0) { //naughty, doesn't clean up heap
goto ERR;
}
lock.Lock();
while (1) {
for (slot = 0; slot < concurrentios; ++slot) {
- if (completions[slot]->is_safe()) {
+ if (completion_is_done(slot)) {
break;
}
}
generate_object_name(newName, 128, data.started);
snprintf(data.object_contents, data.object_size, "I'm the %dth object!", data.started);
newContents->append(data.object_contents, data.object_size);
- completions[slot]->wait_for_safe();
+ completion_wait(slot);
lock.Lock();
- r = completions[slot]->get_return_value();
+ r = completion_ret(slot);
if (r != 0) {
lock.Unlock();
goto ERR;
data.avg_latency = total_latency / data.finished;
--data.in_flight;
lock.Unlock();
- completions[slot]->release();
- completions[slot] = 0;
+ release_completion(slot);
timePassed = ceph_clock_now(g_ceph_context) - data.start_time;
- //write new stuff to rados, then delete old stuff
+ //write new stuff to backend, then delete old stuff
//and save locations of new stuff for later deletion
start_times[slot] = ceph_clock_now(g_ceph_context);
- completions[slot] = rados.aio_create_completion((void *) &lc, 0, &_aio_cb);
- r = io_ctx.aio_write(newName, completions[slot], *newContents, data.object_size, 0);
+ r = create_completion(slot, _aio_cb, &lc);
+ if (r < 0)
+ goto ERR;
+ r = aio_write(newName, slot, *newContents, data.object_size);
if (r < 0) {//naughty; doesn't clean up heap space.
goto ERR;
}
while (data.finished < data.started) {
slot = data.finished % concurrentios;
- completions[slot]->wait_for_safe();
+ completion_wait(slot);
lock.Lock();
- r = completions[slot]->get_return_value();
+ r = completion_ret(slot);
if (r != 0) {
lock.Unlock();
goto ERR;
data.avg_latency = total_latency / data.finished;
--data.in_flight;
lock.Unlock();
- completions[slot]->release();
- completions[slot] = 0;
+ release_completion(slot);
delete[] name[slot];
delete contents[slot];
}
::encode(data.object_size, b_write);
::encode(data.finished, b_write);
::encode(getpid(), b_write);
- io_ctx.write(BENCH_DATA, b_write, sizeof(int)*3, 0);
+ sync_write(BENCH_DATA, b_write, sizeof(int)*3);
+
+ completions_done();
+
return 0;
ERR:
return -5;
}
-int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) {
+int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) {
data.finished = 0;
lock_cond lc(&lock);
- librados::AioCompletion* completions[concurrentios];
char* name[concurrentios];
bufferlist* contents[concurrentios];
int index[concurrentios];
sanitize_object_contents(&data, 128); //clean it up once; subsequent
//changes will be safe because string length monotonically increases
+ r = completions_init(concurrentios);
+ if (r < 0)
+ return r;
+
//set up initial reads
for (int i = 0; i < concurrentios; ++i) {
name[i] = new char[128];
for (int i = 0; i < concurrentios; ++i) {
index[i] = i;
start_times[i] = ceph_clock_now(g_ceph_context);
- completions[i] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0);
- r = io_ctx.aio_read(name[i], completions[i], contents[i], data.object_size, 0);
+ create_completion(i, _aio_cb, (void *)&lc);
+ r = aio_read(name[i], i, contents[i], data.object_size);
if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
cerr << "r = " << r << std::endl;
goto ERR;
lock.Lock();
while (1) {
for (slot = 0; slot < concurrentios; ++slot) {
- if (completions[slot]->is_complete()) {
+ if (completion_is_done(slot)) {
break;
}
}
generate_object_name(newName, 128, data.started, pid);
int current_index = index[slot];
index[slot] = data.started;
- completions[slot]->wait_for_complete();
+ completion_wait(slot);
lock.Lock();
- r = completions[slot]->get_return_value();
+ r = completion_ret(slot);
if (r != 0) {
cerr << "read got " << r << std::endl;
lock.Unlock();
data.avg_latency = total_latency / data.finished;
--data.in_flight;
lock.Unlock();
- completions[slot]->release();
- completions[slot] = 0;
+ release_completion(slot);
cur_contents = contents[slot];
//start new read and check data if requested
start_times[slot] = ceph_clock_now(g_ceph_context);
contents[slot] = new bufferlist();
- completions[slot] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0);
- r = io_ctx.aio_read(newName, completions[slot], contents[slot], data.object_size, 0);
+ create_completion(slot, _aio_cb, (void *)&lc);
+ r = aio_read(newName, slot, contents[slot], data.object_size);
if (r < 0) {
goto ERR;
}
//wait for final reads to complete
while (data.finished < data.started) {
slot = data.finished % concurrentios;
- completions[slot]->wait_for_complete();
+ completion_wait(slot);
lock.Lock();
- r = completions[slot]->get_return_value();
+ r = completion_ret(slot);
if (r != 0) {
cerr << "read got " << r << std::endl;
lock.Unlock();
++data.finished;
data.avg_latency = total_latency / data.finished;
--data.in_flight;
- completions[slot]-> release();
- completions[slot] = 0;
+ release_completion(slot);
snprintf(data.object_contents, data.object_size, "I'm the %dth object!", index[slot]);
lock.Unlock();
if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
<< "Max latency: " << data.max_latency << std::endl
<< "Min latency: " << data.min_latency << std::endl;
+ completions_done();
+
return 0;
ERR:
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
- * Series of functions to test your rados installation. Notice
- * that this code is not terribly robust -- for instance, if you
- * try and bench on a pool you don't have permission to access
- * it will just loop forever.
*/
-#ifndef CEPH_RADOS_BENCHER_H
-#define CEPH_RADOS_BENCHER_H
+#ifndef CEPH_OBJ_BENCHER_H
+#define CEPH_OBJ_BENCHER_H
-#include "include/rados/librados.hpp"
#include "common/config.h"
#include "common/Cond.h"
const int OP_SEQ_READ = 2;
const int OP_RAND_READ = 3;
-class RadosBencher {
+class ObjBencher {
+protected:
Mutex lock;
- librados::Rados& rados;
- librados::IoCtx& io_ctx;
static void *status_printer(void *bencher);
int write_bench(int secondsToRun, int concurrentios);
int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid);
+
+ virtual int completions_init(int concurrentios) = 0;
+ virtual void completions_done() = 0;
+
+ virtual int create_completion(int i, void (*cb)(void *, void*), void *arg) = 0;
+ virtual void release_completion(int slot) = 0;
+
+ virtual bool completion_is_done(int slot) = 0;
+ virtual int completion_wait(int slot) = 0;
+ virtual int completion_ret(int slot) = 0;
+
+ virtual int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) = 0;
+ virtual int aio_write(const std::string& oid, int slot, const bufferlist& bl, size_t len) = 0;
+ virtual int sync_read(const std::string& oid, bufferlist& bl, size_t len) = 0;
+ virtual int sync_write(const std::string& oid, bufferlist& bl, size_t len) = 0;
public:
- RadosBencher(librados::Rados& _r, librados::IoCtx& _i) : lock("RadosBencher::lock"), rados(_r), io_ctx(_i) {}
+ ObjBencher() : lock("ObjBencher::lock") {}
+ virtual ~ObjBencher() {}
int aio_bench(int operation, int secondsToRun, int concurrentios, int op_size);
};
#include "common/Cond.h"
#include "common/debug.h"
#include "common/Formatter.h"
+#include "common/obj_bencher.h"
#include "mds/inode_backtrace.h"
#include "auth/Crypto.h"
#include <iostream>
}
}
+
+class RadosBencher : public ObjBencher {
+ librados::AioCompletion **completions;
+ librados::Rados& rados;
+ librados::IoCtx& io_ctx;
+protected:
+ int completions_init(int concurrentios) {
+ completions = new librados::AioCompletion *[concurrentios];
+ return 0;
+ }
+ void completions_done() {
+ delete[] completions;
+ completions = NULL;
+ }
+ int create_completion(int slot, void (*cb)(void *, void*), void *arg) {
+ completions[slot] = rados.aio_create_completion((void *) arg, 0, cb);
+
+ if (!completions[slot])
+ return -EINVAL;
+
+ return 0;
+ }
+ void release_completion(int slot) {
+ completions[slot]->release();
+ completions[slot] = 0;
+ }
+
+ int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) {
+ return io_ctx.aio_read(oid, completions[slot], pbl, len, 0);
+ }
+
+ int aio_write(const std::string& oid, int slot, const bufferlist& bl, size_t len) {
+ return io_ctx.aio_write(oid, completions[slot], bl, len, 0);
+ }
+
+ int sync_read(const std::string& oid, bufferlist& bl, size_t len) {
+ return io_ctx.read(oid, bl, len, 0);
+ }
+ int sync_write(const std::string& oid, bufferlist& bl, size_t len) {
+ return io_ctx.write(oid, bl, len, 0);
+ }
+
+ bool completion_is_done(int slot) {
+ return completions[slot]->is_safe();
+ }
+
+ int completion_wait(int slot) {
+ return completions[slot]->wait_for_safe();
+ }
+ int completion_ret(int slot) {
+ return completions[slot]->get_return_value();
+ }
+
+public:
+ RadosBencher(librados::Rados& _r, librados::IoCtx& _i) : completions(NULL), rados(_r), io_ctx(_i) {}
+ ~RadosBencher() { }
+};
+
/**********************************************
**********************************************/