]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rados_bencher: abstract away rados specific operations
authorYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 20 Apr 2012 21:55:42 +0000 (14:55 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 4 May 2012 22:53:26 +0000 (15:53 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
src/Makefile.am
src/common/obj_bencher.cc
src/common/obj_bencher.h
src/rados.cc

index a28ddb10c3e3dacb974a21b0f2f6dbd5c3022356..33b2e8d2115bd874decb940b9ab851baa8e8e6e9 100644 (file)
@@ -1221,6 +1221,7 @@ noinst_HEADERS = \
        common/environment.h\
        common/likely.h\
        common/lockdep.h\
+       common/obj_bencher.h\
        common/snap_types.h\
         common/Clock.h\
         common/Cond.h\
@@ -1557,7 +1558,6 @@ noinst_HEADERS = \
         osd/ReplicatedPG.h\
         osd/Watch.h\
         osd/osd_types.h\
-       osdc/rados_bencher.h\
         osdc/Blinker.h\
         osdc/Filer.h\
         osdc/Journaler.h\
index db0afaead27f8db8e8f62022039f22a48e3695b5..0cb59d21d952772c9275c701816aebdc9aa3930c 100644 (file)
@@ -14,7 +14,6 @@
  * try and bench on a pool you don't have permission to access
  * it will just loop forever.
  */
-#include "include/rados/librados.hpp"
 #include "common/Cond.h"
 #include "obj_bencher.h"
 
@@ -46,8 +45,8 @@ static void sanitize_object_contents (bench_data *data, int length) {
   }
 }
 
-void *RadosBencher::status_printer(void *_bencher) {
-  RadosBencher *bencher = (RadosBencher *)_bencher;
+void *ObjBencher::status_printer(void *_bencher) {
+  ObjBencher *bencher = (ObjBencher *)_bencher;
   bench_data& data = bencher->data;
   Cond cond;
   int i = 0;
@@ -113,7 +112,7 @@ void *RadosBencher::status_printer(void *_bencher) {
   return NULL;
 }
 
-int RadosBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) {
+int ObjBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) {
   int object_size = op_size;
   int num_objects = 0;
   char* contentsChars = new char[op_size];
@@ -123,7 +122,7 @@ int RadosBencher::aio_bench(int operation, int secondsToRun, int concurrentios,
   //get data from previous write run, if available
   if (operation != OP_WRITE) {
     bufferlist object_data;
-    r = io_ctx.read(BENCH_DATA, object_data, sizeof(int)*3, 0);
+    r = sync_read(BENCH_DATA, object_data, sizeof(int)*3);
     if (r <= 0) {
       delete[] contentsChars;
       if (r == -2)
@@ -185,12 +184,11 @@ void _aio_cb(void *cb, void *arg) {
   lc->lock->Unlock();
 }
 
-int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
+int ObjBencher::write_bench(int secondsToRun, int concurrentios) {
   cout << "Maintaining " << concurrentios << " concurrent writes of "
        << data.object_size << " bytes for at least "
        << secondsToRun << " seconds." << std::endl;
 
-  librados::AioCompletion* completions[concurrentios];
   char* name[concurrentios];
   bufferlist* contents[concurrentios];
   double total_latency = 0;
@@ -202,6 +200,8 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
   utime_t runtime;
   utime_t timePassed;
 
+  r = completions_init(concurrentios);
+
   //set up writes so I can start them together
   for (int i = 0; i<concurrentios; ++i) {
     name[i] = new char[128];
@@ -213,15 +213,16 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
 
   pthread_t print_thread;
 
-  pthread_create(&print_thread, NULL, RadosBencher::status_printer, (void *)this);
+  pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
   lock.Lock();
   data.start_time = ceph_clock_now(g_ceph_context);
   lock.Unlock();
   for (int i = 0; i<concurrentios; ++i) {
     start_times[i] = ceph_clock_now(g_ceph_context);
-    completions[i] = rados.aio_create_completion((void *) &lc, 0,
-                                                &_aio_cb);
-    r = io_ctx.aio_write(name[i], completions[i], *contents[i], data.object_size, 0);
+    r = create_completion(i, _aio_cb, (void *)&lc);
+    if (r < 0)
+      goto ERR;
+    r = aio_write(name[i], i, *contents[i], data.object_size);
     if (r < 0) { //naughty, doesn't clean up heap
       goto ERR;
     }
@@ -244,7 +245,7 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
     lock.Lock();
     while (1) {
       for (slot = 0; slot < concurrentios; ++slot) {
-       if (completions[slot]->is_safe()) {
+       if (completion_is_done(slot)) {
          break;
        }
       }
@@ -260,9 +261,9 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
     generate_object_name(newName, 128, data.started);
     snprintf(data.object_contents, data.object_size, "I'm the %dth object!", data.started);
     newContents->append(data.object_contents, data.object_size);
-    completions[slot]->wait_for_safe();
+    completion_wait(slot);
     lock.Lock();
-    r = completions[slot]->get_return_value();
+    r = completion_ret(slot);
     if (r != 0) {
       lock.Unlock();
       goto ERR;
@@ -275,15 +276,16 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
     data.avg_latency = total_latency / data.finished;
     --data.in_flight;
     lock.Unlock();
-    completions[slot]->release();
-    completions[slot] = 0;
+    release_completion(slot);
     timePassed = ceph_clock_now(g_ceph_context) - data.start_time;
 
-    //write new stuff to rados, then delete old stuff
+    //write new stuff to backend, then delete old stuff
     //and save locations of new stuff for later deletion
     start_times[slot] = ceph_clock_now(g_ceph_context);
-    completions[slot] = rados.aio_create_completion((void *) &lc, 0, &_aio_cb);
-    r = io_ctx.aio_write(newName, completions[slot], *newContents, data.object_size, 0);
+    r = create_completion(slot, _aio_cb, &lc);
+    if (r < 0)
+      goto ERR;
+    r = aio_write(newName, slot, *newContents, data.object_size);
     if (r < 0) {//naughty; doesn't clean up heap space.
       goto ERR;
     }
@@ -299,9 +301,9 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
 
   while (data.finished < data.started) {
     slot = data.finished % concurrentios;
-    completions[slot]->wait_for_safe();
+    completion_wait(slot);
     lock.Lock();
-    r = completions[slot]->get_return_value();
+    r = completion_ret(slot);
     if (r != 0) {
       lock.Unlock();
       goto ERR;
@@ -314,8 +316,7 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
     data.avg_latency = total_latency / data.finished;
     --data.in_flight;
     lock.Unlock();
-    completions[slot]->release();
-    completions[slot] = 0;
+    release_completion(slot);
     delete[] name[slot];
     delete contents[slot];
   }
@@ -345,7 +346,10 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
   ::encode(data.object_size, b_write);
   ::encode(data.finished, b_write);
   ::encode(getpid(), b_write);
-  io_ctx.write(BENCH_DATA, b_write, sizeof(int)*3, 0);
+  sync_write(BENCH_DATA, b_write, sizeof(int)*3);
+
+  completions_done();
+
   return 0;
 
  ERR:
@@ -356,11 +360,10 @@ int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
   return -5;
 }
 
-int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) {
+int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) {
   data.finished = 0;
 
   lock_cond lc(&lock);
-  librados::AioCompletion* completions[concurrentios];
   char* name[concurrentios];
   bufferlist* contents[concurrentios];
   int index[concurrentios];
@@ -375,6 +378,10 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur
   sanitize_object_contents(&data, 128); //clean it up once; subsequent
   //changes will be safe because string length monotonically increases
 
+  r = completions_init(concurrentios);
+  if (r < 0)
+    return r;
+
   //set up initial reads
   for (int i = 0; i < concurrentios; ++i) {
     name[i] = new char[128];
@@ -393,8 +400,8 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur
   for (int i = 0; i < concurrentios; ++i) {
     index[i] = i;
     start_times[i] = ceph_clock_now(g_ceph_context);
-    completions[i] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0);
-    r = io_ctx.aio_read(name[i], completions[i], contents[i], data.object_size, 0);
+    create_completion(i, _aio_cb, (void *)&lc);
+    r = aio_read(name[i], i, contents[i], data.object_size);
     if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
       cerr << "r = " << r << std::endl;
       goto ERR;
@@ -415,7 +422,7 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur
     lock.Lock();
     while (1) {
       for (slot = 0; slot < concurrentios; ++slot) {
-       if (completions[slot]->is_complete()) {
+       if (completion_is_done(slot)) {
          break;
        }
       }
@@ -429,9 +436,9 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur
     generate_object_name(newName, 128, data.started, pid);
     int current_index = index[slot];
     index[slot] = data.started;
-    completions[slot]->wait_for_complete();
+    completion_wait(slot);
     lock.Lock();
-    r = completions[slot]->get_return_value();
+    r = completion_ret(slot);
     if (r != 0) {
       cerr << "read got " << r << std::endl;
       lock.Unlock();
@@ -445,15 +452,14 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur
     data.avg_latency = total_latency / data.finished;
     --data.in_flight;
     lock.Unlock();
-    completions[slot]->release();
-    completions[slot] = 0;
+    release_completion(slot);
     cur_contents = contents[slot];
 
     //start new read and check data if requested
     start_times[slot] = ceph_clock_now(g_ceph_context);
     contents[slot] = new bufferlist();
-    completions[slot] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0);
-    r = io_ctx.aio_read(newName, completions[slot], contents[slot], data.object_size, 0);
+    create_completion(slot, _aio_cb, (void *)&lc);
+    r = aio_read(newName, slot, contents[slot], data.object_size);
     if (r < 0) {
       goto ERR;
     }
@@ -474,9 +480,9 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur
   //wait for final reads to complete
   while (data.finished < data.started) {
     slot = data.finished % concurrentios;
-    completions[slot]->wait_for_complete();
+    completion_wait(slot);
     lock.Lock();
-    r = completions[slot]->get_return_value();
+    r = completion_ret(slot);
     if (r != 0) {
       cerr << "read got " << r << std::endl;
       lock.Unlock();
@@ -489,8 +495,7 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur
     ++data.finished;
     data.avg_latency = total_latency / data.finished;
     --data.in_flight;
-    completions[slot]-> release();
-    completions[slot] = 0;
+    release_completion(slot);
     snprintf(data.object_contents, data.object_size, "I'm the %dth object!", index[slot]);
     lock.Unlock();
     if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
@@ -522,6 +527,8 @@ int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concur
        << "Max latency:           " << data.max_latency << std::endl
        << "Min latency:           " << data.min_latency << std::endl;
 
+  completions_done();
+
   return 0;
 
  ERR:
index cd685b8509448e3fb8162be297e7d04543d39de7..0a2ea583b383cc46ba85284eca4dfebbeddafddb 100644 (file)
@@ -9,16 +9,11 @@
  * License version 2.1, as published by the Free Software
  * Foundation.  See file COPYING.
  *
- * Series of functions to test your rados installation. Notice
- * that this code is not terribly robust -- for instance, if you
- * try and bench on a pool you don't have permission to access
- * it will just loop forever.
  */
 
-#ifndef CEPH_RADOS_BENCHER_H
-#define CEPH_RADOS_BENCHER_H
+#ifndef CEPH_OBJ_BENCHER_H
+#define CEPH_OBJ_BENCHER_H
 
-#include "include/rados/librados.hpp"
 #include "common/config.h"
 #include "common/Cond.h"
 
@@ -42,10 +37,9 @@ const int OP_WRITE     = 1;
 const int OP_SEQ_READ  = 2;
 const int OP_RAND_READ = 3;
 
-class RadosBencher {
+class ObjBencher {
+protected:
   Mutex lock;
-  librados::Rados& rados;
-  librados::IoCtx& io_ctx;
 
   static void *status_printer(void *bencher);
 
@@ -53,8 +47,24 @@ class RadosBencher {
 
   int write_bench(int secondsToRun, int concurrentios);
   int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid);
+
+  virtual int completions_init(int concurrentios) = 0;
+  virtual void completions_done() = 0;
+
+  virtual int create_completion(int i, void (*cb)(void *, void*), void *arg) = 0;
+  virtual void release_completion(int slot) = 0;
+
+  virtual bool completion_is_done(int slot) = 0;
+  virtual int completion_wait(int slot) = 0;
+  virtual int completion_ret(int slot) = 0;
+
+  virtual int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) = 0;
+  virtual int aio_write(const std::string& oid, int slot, const bufferlist& bl, size_t len) = 0;
+  virtual int sync_read(const std::string& oid, bufferlist& bl, size_t len) = 0;
+  virtual int sync_write(const std::string& oid, bufferlist& bl, size_t len) = 0;
 public:
-  RadosBencher(librados::Rados& _r, librados::IoCtx& _i) : lock("RadosBencher::lock"), rados(_r), io_ctx(_i) {}
+  ObjBencher() : lock("ObjBencher::lock") {}
+  virtual ~ObjBencher() {}
   int aio_bench(int operation, int secondsToRun, int concurrentios, int op_size);
 };
 
index b88b4c6213ab7131bee13da8d63ca9dbdf2f309a..3e985b96fa4ac55b09adf3572762fadf7f9c7011 100644 (file)
@@ -25,6 +25,7 @@ using namespace librados;
 #include "common/Cond.h"
 #include "common/debug.h"
 #include "common/Formatter.h"
+#include "common/obj_bencher.h"
 #include "mds/inode_backtrace.h"
 #include "auth/Crypto.h"
 #include <iostream>
@@ -577,6 +578,64 @@ void LoadGen::cleanup()
   }
 }
 
+
+class RadosBencher : public ObjBencher {
+  librados::AioCompletion **completions;
+  librados::Rados& rados;
+  librados::IoCtx& io_ctx;
+protected:
+  int completions_init(int concurrentios) {
+    completions = new librados::AioCompletion *[concurrentios];
+    return 0;
+  }
+  void completions_done() {
+    delete[] completions;
+    completions = NULL;
+  }
+  int create_completion(int slot, void (*cb)(void *, void*), void *arg) {
+    completions[slot] = rados.aio_create_completion((void *) arg, 0, cb);
+
+    if (!completions[slot])
+      return -EINVAL;
+
+    return 0;
+  }
+  void release_completion(int slot) {
+    completions[slot]->release();
+    completions[slot] = 0;
+  }
+
+  int aio_read(const std::string& oid, int slot, bufferlist *pbl, size_t len) {
+    return io_ctx.aio_read(oid, completions[slot], pbl, len, 0);
+  }
+
+  int aio_write(const std::string& oid, int slot, const bufferlist& bl, size_t len) {
+    return io_ctx.aio_write(oid, completions[slot], bl, len, 0);
+  }
+
+  int sync_read(const std::string& oid, bufferlist& bl, size_t len) {
+    return io_ctx.read(oid, bl, len, 0);
+  }
+  int sync_write(const std::string& oid, bufferlist& bl, size_t len) {
+    return io_ctx.write(oid, bl, len, 0);
+  }
+
+  bool completion_is_done(int slot) {
+    return completions[slot]->is_safe();
+  }
+
+  int completion_wait(int slot) {
+    return completions[slot]->wait_for_safe();
+  }
+  int completion_ret(int slot) {
+    return completions[slot]->get_return_value();
+  }
+
+public:
+  RadosBencher(librados::Rados& _r, librados::IoCtx& _i) : completions(NULL), rados(_r), io_ctx(_i) {}
+  ~RadosBencher() { }
+};
+
 /**********************************************
 
 **********************************************/