]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rados_bencher: restructure code, create RadosBencher class
authorYehuda Sadeh <yehuda@hq.newdream.net>
Mon, 26 Mar 2012 21:53:37 +0000 (14:53 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 4 May 2012 22:53:26 +0000 (15:53 -0700)
Preparing for different benchmark backend.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
src/Makefile.am
src/osdc/rados_bencher.cc [new file with mode: 0644]
src/osdc/rados_bencher.h
src/rados.cc

index 5fc16629eb74989424a38a152326c8594b2bce37..7cf36644e73744a1babfd90682d59432e568f120 100644 (file)
@@ -285,7 +285,8 @@ librados_SOURCES = \
        librados/librados.cc \
        librados/RadosClient.cc \
        librados/IoCtxImpl.cc \
-       osdc/Objecter.cc
+       osdc/Objecter.cc \
+       osdc/rados_bencher.cc
 librados_la_SOURCES = ${librados_SOURCES}
 librados_la_CFLAGS = ${CRYPTO_CFLAGS} ${AM_CFLAGS}
 librados_la_CXXFLAGS = ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS}
diff --git a/src/osdc/rados_bencher.cc b/src/osdc/rados_bencher.cc
new file mode 100644 (file)
index 0000000..7800cc9
--- /dev/null
@@ -0,0 +1,536 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2009 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ * Series of functions to test your rados installation. Notice
+ * that this code is not terribly robust -- for instance, if you
+ * try and bench on a pool you don't have permission to access
+ * it will just loop forever.
+ */
+#include "include/rados/librados.hpp"
+#include "common/config.h"
+#include "global/global_init.h"
+#include "common/Cond.h"
+#include "rados_bencher.h"
+#include <iostream>
+#include <fstream>
+
+#include <stdlib.h>
+#include <time.h>
+#include <sstream>
+
+
+const char *BENCH_DATA = "benchmark_write_data";
+
+static void generate_object_name(char *s, size_t size, int objnum, int pid = 0)
+{
+  char hostname[30];
+  gethostname(hostname, sizeof(hostname)-1);
+  hostname[sizeof(hostname)-1] = 0;
+  if (pid) {
+    snprintf(s, size, "%s_%d_object%d", hostname, pid, objnum);
+  } else {
+    snprintf(s, size, "%s_%d_object%d", hostname, getpid(), objnum);
+  }
+}
+
+static void sanitize_object_contents (bench_data *data, int length) {
+  for (int i = 0; i < length; ++i) {
+    data->object_contents[i] = i % sizeof(char);
+  }
+}
+
+void *RadosBencher::status_printer(void *_bencher) {
+  RadosBencher *bencher = (RadosBencher *)_bencher;
+  bench_data& data = bencher->data;
+  Cond cond;
+  int i = 0;
+  int previous_writes = 0;
+  int cycleSinceChange = 0;
+  double avg_bandwidth;
+  double bandwidth;
+  utime_t ONE_SECOND;
+  ONE_SECOND.set_from_double(1.0);
+  bencher->lock.Lock();
+  while(!data.done) {
+    if (i % 20 == 0) {
+      if (i > 0)
+       cout << "min lat: " << data.min_latency
+            << " max lat: " << data.max_latency
+            << " avg lat: " << data.avg_latency << std::endl;
+      //I'm naughty and don't reset the fill
+      cout << setfill(' ')
+          << setw(5) << "sec"
+          << setw(8) << "Cur ops"
+          << setw(10) << "started"
+          << setw(10) << "finished"
+          << setw(10) << "avg MB/s"
+          << setw(10) << "cur MB/s"
+          << setw(10) << "last lat"
+          << setw(10) << "avg lat" << std::endl;
+    }
+    bandwidth = (double)(data.finished - previous_writes)
+      * (data.trans_size)
+      / (1024*1024)
+      / cycleSinceChange;
+    avg_bandwidth = (double) (data.trans_size) * (data.finished)
+      / (double)(ceph_clock_now(g_ceph_context) - data.start_time) / (1024*1024);
+    if (previous_writes != data.finished) {
+      previous_writes = data.finished;
+      cycleSinceChange = 0;
+      cout << setfill(' ')
+          << setw(5) << i
+          << setw(8) << data.in_flight
+          << setw(10) << data.started
+          << setw(10) << data.finished
+          << setw(10) << avg_bandwidth
+          << setw(10) << bandwidth
+          << setw(10) << (double)data.cur_latency
+          << setw(10) << data.avg_latency << std::endl;
+    }
+    else {
+      cout << setfill(' ')
+          << setw(5) << i
+          << setw(8) << data.in_flight
+          << setw(10) << data.started
+          << setw(10) << data.finished
+          << setw(10) << avg_bandwidth
+          << setw(10) << '0'
+          << setw(10) << '-'
+          << setw(10) << data.avg_latency << std::endl;
+    }
+    ++i;
+    ++cycleSinceChange;
+    cond.WaitInterval(g_ceph_context, bencher->lock, ONE_SECOND);
+  }
+  bencher->lock.Unlock();
+  return NULL;
+}
+
+int RadosBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) {
+  int object_size = op_size;
+  int num_objects = 0;
+  char* contentsChars = new char[op_size];
+  int r = 0;
+  int prevPid = 0;
+
+  //get data from previous write run, if available
+  if (operation != OP_WRITE) {
+    bufferlist object_data;
+    r = io_ctx.read(BENCH_DATA, object_data, sizeof(int)*3, 0);
+    if (r <= 0) {
+      delete[] contentsChars;
+      if (r == -2)
+       cerr << "Must write data before running a read benchmark!" << std::endl;
+      return r;
+    }
+    bufferlist::iterator p = object_data.begin();
+    ::decode(object_size, p);
+    ::decode(num_objects, p);
+    ::decode(prevPid, p);
+  } else {
+    object_size = op_size;
+  }
+
+  lock.Lock();
+  data.done = false;
+  data.object_size = object_size;
+  data.trans_size = op_size;
+  data.in_flight = 0;
+  data.started = 0;
+  data.finished = num_objects;
+  data.min_latency = 9999.0; // this better be higher than initial latency!
+  data.max_latency = 0;
+  data.avg_latency = 0;
+  data.object_contents = contentsChars;
+  lock.Unlock();
+
+  //fill in contentsChars deterministically so we can check returns
+  sanitize_object_contents(&data, data.object_size);
+
+  if (OP_WRITE == operation) {
+    r = write_bench(secondsToRun, concurrentios);
+    if (r != 0) goto out;
+  }
+  else if (OP_SEQ_READ == operation) {
+    r = seq_read_bench(secondsToRun, concurrentios, num_objects, prevPid);
+    if (r != 0) goto out;
+  }
+  else if (OP_RAND_READ == operation) {
+    cerr << "Random test not implemented yet!" << std::endl;
+    r = -1;
+  }
+
+ out:
+  delete[] contentsChars;
+  return r;
+}
+
+struct lock_cond {
+  lock_cond(Mutex *_lock) : lock(_lock) {}
+  Mutex *lock;
+  Cond cond;
+};
+
+void _aio_cb(void *cb, void *arg) {
+  struct lock_cond *lc = (struct lock_cond *)arg;
+  lc->lock->Lock();
+  lc->cond.Signal();
+  lc->lock->Unlock();
+}
+
+int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
+  cout << "Maintaining " << concurrentios << " concurrent writes of "
+       << data.object_size << " bytes for at least "
+       << secondsToRun << " seconds." << std::endl;
+
+  librados::AioCompletion* completions[concurrentios];
+  char* name[concurrentios];
+  bufferlist* contents[concurrentios];
+  double total_latency = 0;
+  utime_t start_times[concurrentios];
+  utime_t stopTime;
+  int r = 0;
+  bufferlist b_write;
+  lock_cond lc(&lock);
+  utime_t runtime;
+  utime_t timePassed;
+
+  //set up writes so I can start them together
+  for (int i = 0; i<concurrentios; ++i) {
+    name[i] = new char[128];
+    contents[i] = new bufferlist();
+    generate_object_name(name[i], 128, i);
+    snprintf(data.object_contents, data.object_size, "I'm the %dth object!", i);
+    contents[i]->append(data.object_contents, data.object_size);
+  }
+
+  pthread_t print_thread;
+
+  pthread_create(&print_thread, NULL, RadosBencher::status_printer, (void *)this);
+  lock.Lock();
+  data.start_time = ceph_clock_now(g_ceph_context);
+  lock.Unlock();
+  for (int i = 0; i<concurrentios; ++i) {
+    start_times[i] = ceph_clock_now(g_ceph_context);
+    completions[i] = rados.aio_create_completion((void *) &lc, 0,
+                                                &_aio_cb);
+    r = io_ctx.aio_write(name[i], completions[i], *contents[i], data.object_size, 0);
+    if (r < 0) { //naughty, doesn't clean up heap
+      goto ERR;
+    }
+    lock.Lock();
+    ++data.started;
+    ++data.in_flight;
+    lock.Unlock();
+  }
+
+  //keep on adding new writes as old ones complete until we've passed minimum time
+  int slot;
+  bufferlist* newContents;
+  char* newName;
+
+  //don't need locking for reads because other thread doesn't write
+
+  runtime.set_from_double(secondsToRun);
+  stopTime = data.start_time + runtime;
+  while( ceph_clock_now(g_ceph_context) < stopTime ) {
+    lock.Lock();
+    while (1) {
+      for (slot = 0; slot < concurrentios; ++slot) {
+       if (completions[slot]->is_safe()) {
+         break;
+       }
+      }
+      if (slot < concurrentios) {
+       break;
+      }
+      lc.cond.Wait(lock);
+    }
+    lock.Unlock();
+    //create new contents and name on the heap, and fill them
+    newContents = new bufferlist();
+    newName = new char[128];
+    generate_object_name(newName, 128, data.started);
+    snprintf(data.object_contents, data.object_size, "I'm the %dth object!", data.started);
+    newContents->append(data.object_contents, data.object_size);
+    completions[slot]->wait_for_safe();
+    lock.Lock();
+    r = completions[slot]->get_return_value();
+    if (r != 0) {
+      lock.Unlock();
+      goto ERR;
+    }
+    data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+    total_latency += data.cur_latency;
+    if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+    if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+    ++data.finished;
+    data.avg_latency = total_latency / data.finished;
+    --data.in_flight;
+    lock.Unlock();
+    completions[slot]->release();
+    completions[slot] = 0;
+    timePassed = ceph_clock_now(g_ceph_context) - data.start_time;
+
+    //write new stuff to rados, then delete old stuff
+    //and save locations of new stuff for later deletion
+    start_times[slot] = ceph_clock_now(g_ceph_context);
+    completions[slot] = rados.aio_create_completion((void *) &lc, 0, &_aio_cb);
+    r = io_ctx.aio_write(newName, completions[slot], *newContents, data.object_size, 0);
+    if (r < 0) {//naughty; doesn't clean up heap space.
+      goto ERR;
+    }
+    lock.Lock();
+    ++data.started;
+    ++data.in_flight;
+    lock.Unlock();
+    delete[] name[slot];
+    delete contents[slot];
+    name[slot] = newName;
+    contents[slot] = newContents;
+  }
+
+  while (data.finished < data.started) {
+    slot = data.finished % concurrentios;
+    completions[slot]->wait_for_safe();
+    lock.Lock();
+    r = completions[slot]->get_return_value();
+    if (r != 0) {
+      lock.Unlock();
+      goto ERR;
+    }
+    data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+    total_latency += data.cur_latency;
+    if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+    if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+    ++data.finished;
+    data.avg_latency = total_latency / data.finished;
+    --data.in_flight;
+    lock.Unlock();
+    completions[slot]->release();
+    completions[slot] = 0;
+    delete[] name[slot];
+    delete contents[slot];
+  }
+
+  timePassed = ceph_clock_now(g_ceph_context) - data.start_time;
+  lock.Lock();
+  data.done = true;
+  lock.Unlock();
+
+  pthread_join(print_thread, NULL);
+
+  double bandwidth;
+  bandwidth = ((double)data.finished)*((double)data.object_size)/(double)timePassed;
+  bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
+  char bw[20];
+  snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth);
+
+  cout << "Total time run:        " << timePassed << std::endl
+       << "Total writes made:     " << data.finished << std::endl
+       << "Write size:            " << data.object_size << std::endl
+       << "Bandwidth (MB/sec):    " << bw << std::endl
+       << "Average Latency:       " << data.avg_latency << std::endl
+       << "Max latency:           " << data.max_latency << std::endl
+       << "Min latency:           " << data.min_latency << std::endl;
+
+  //write object size/number data for read benchmarks
+  ::encode(data.object_size, b_write);
+  ::encode(data.finished, b_write);
+  ::encode(getpid(), b_write);
+  io_ctx.write(BENCH_DATA, b_write, sizeof(int)*3, 0);
+  return 0;
+
+ ERR:
+  lock.Lock();
+  data.done = 1;
+  lock.Unlock();
+  pthread_join(print_thread, NULL);
+  return -5;
+}
+
+int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) {
+  data.finished = 0;
+
+  lock_cond lc(&lock);
+  librados::AioCompletion* completions[concurrentios];
+  char* name[concurrentios];
+  bufferlist* contents[concurrentios];
+  int index[concurrentios];
+  int errors = 0;
+  utime_t start_time;
+  utime_t start_times[concurrentios];
+  utime_t time_to_run;
+  time_to_run.set_from_double(seconds_to_run);
+  double total_latency = 0;
+  int r = 0;
+  utime_t runtime;
+  sanitize_object_contents(&data, 128); //clean it up once; subsequent
+  //changes will be safe because string length monotonically increases
+
+  //set up initial reads
+  for (int i = 0; i < concurrentios; ++i) {
+    name[i] = new char[128];
+    generate_object_name(name[i], 128, i, pid);
+    contents[i] = new bufferlist();
+  }
+
+  pthread_t print_thread;
+  pthread_create(&print_thread, NULL, status_printer, (void *)this);
+
+  lock.Lock();
+  data.start_time = ceph_clock_now(g_ceph_context);
+  lock.Unlock();
+  utime_t finish_time = data.start_time + time_to_run;
+  //start initial reads
+  for (int i = 0; i < concurrentios; ++i) {
+    index[i] = i;
+    start_times[i] = ceph_clock_now(g_ceph_context);
+    completions[i] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0);
+    r = io_ctx.aio_read(name[i], completions[i], contents[i], data.object_size, 0);
+    if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
+      cerr << "r = " << r << std::endl;
+      goto ERR;
+    }
+    lock.Lock();
+    ++data.started;
+    ++data.in_flight;
+    lock.Unlock();
+  }
+
+  //keep on adding new reads as old ones complete
+  int slot;
+  char* newName;
+  bufferlist *cur_contents;
+
+  while (seconds_to_run && (ceph_clock_now(g_ceph_context) < finish_time) &&
+      num_objects > data.started) {
+    lock.Lock();
+    while (1) {
+      for (slot = 0; slot < concurrentios; ++slot) {
+       if (completions[slot]->is_complete()) {
+         break;
+       }
+      }
+      if (slot < concurrentios) {
+       break;
+      }
+      lc.cond.Wait(lock);
+    }
+    lock.Unlock();
+    newName = new char[128];
+    generate_object_name(newName, 128, data.started, pid);
+    int current_index = index[slot];
+    index[slot] = data.started;
+    completions[slot]->wait_for_complete();
+    lock.Lock();
+    r = completions[slot]->get_return_value();
+    if (r != 0) {
+      cerr << "read got " << r << std::endl;
+      lock.Unlock();
+      goto ERR;
+    }
+    data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+    total_latency += data.cur_latency;
+    if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+    if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+    ++data.finished;
+    data.avg_latency = total_latency / data.finished;
+    --data.in_flight;
+    lock.Unlock();
+    completions[slot]->release();
+    completions[slot] = 0;
+    cur_contents = contents[slot];
+
+    //start new read and check data if requested
+    start_times[slot] = ceph_clock_now(g_ceph_context);
+    contents[slot] = new bufferlist();
+    completions[slot] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0);
+    r = io_ctx.aio_read(newName, completions[slot], contents[slot], data.object_size, 0);
+    if (r < 0) {
+      goto ERR;
+    }
+    lock.Lock();
+    ++data.started;
+    ++data.in_flight;
+    snprintf(data.object_contents, data.object_size, "I'm the %dth object!", current_index);
+    lock.Unlock();
+    if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) {
+      cerr << name[slot] << " is not correct!" << std::endl;
+      ++errors;
+    }
+    delete name[slot];
+    name[slot] = newName;
+    delete cur_contents;
+  }
+
+  //wait for final reads to complete
+  while (data.finished < data.started) {
+    slot = data.finished % concurrentios;
+    completions[slot]->wait_for_complete();
+    lock.Lock();
+    r = completions[slot]->get_return_value();
+    if (r != 0) {
+      cerr << "read got " << r << std::endl;
+      lock.Unlock();
+      goto ERR;
+    }
+    data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+    total_latency += data.cur_latency;
+    if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+    if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+    ++data.finished;
+    data.avg_latency = total_latency / data.finished;
+    --data.in_flight;
+    completions[slot]-> release();
+    completions[slot] = 0;
+    snprintf(data.object_contents, data.object_size, "I'm the %dth object!", index[slot]);
+    lock.Unlock();
+    if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
+      cerr << name[slot] << " is not correct!" << std::endl;
+      ++errors;
+    }
+    delete name[slot];
+    delete contents[slot];
+  }
+
+  runtime = ceph_clock_now(g_ceph_context) - data.start_time;
+  lock.Lock();
+  data.done = true;
+  lock.Unlock();
+
+  pthread_join(print_thread, NULL);
+
+  double bandwidth;
+  bandwidth = ((double)data.finished)*((double)data.object_size)/(double)runtime;
+  bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
+  char bw[20];
+  snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth);
+
+  cout << "Total time run:        " << runtime << std::endl
+       << "Total reads made:     " << data.finished << std::endl
+       << "Read size:            " << data.object_size << std::endl
+       << "Bandwidth (MB/sec):    " << bw << std::endl
+       << "Average Latency:       " << data.avg_latency << std::endl
+       << "Max latency:           " << data.max_latency << std::endl
+       << "Min latency:           " << data.min_latency << std::endl;
+
+  return 0;
+
+ ERR:
+  lock.Lock();
+  data.done = 1;
+  lock.Unlock();
+  pthread_join(print_thread, NULL);
+  return -5;
+}
+
+
index 7800cc99df66fe0fbffff0c4c8afb398fe9736dd..cd685b8509448e3fb8162be297e7d04543d39de7 100644 (file)
  * try and bench on a pool you don't have permission to access
  * it will just loop forever.
  */
+
+#ifndef CEPH_RADOS_BENCHER_H
+#define CEPH_RADOS_BENCHER_H
+
 #include "include/rados/librados.hpp"
 #include "common/config.h"
-#include "global/global_init.h"
 #include "common/Cond.h"
-#include "rados_bencher.h"
-#include <iostream>
-#include <fstream>
-
-#include <stdlib.h>
-#include <time.h>
-#include <sstream>
-
-
-const char *BENCH_DATA = "benchmark_write_data";
-
-static void generate_object_name(char *s, size_t size, int objnum, int pid = 0)
-{
-  char hostname[30];
-  gethostname(hostname, sizeof(hostname)-1);
-  hostname[sizeof(hostname)-1] = 0;
-  if (pid) {
-    snprintf(s, size, "%s_%d_object%d", hostname, pid, objnum);
-  } else {
-    snprintf(s, size, "%s_%d_object%d", hostname, getpid(), objnum);
-  }
-}
-
-static void sanitize_object_contents (bench_data *data, int length) {
-  for (int i = 0; i < length; ++i) {
-    data->object_contents[i] = i % sizeof(char);
-  }
-}
-
-void *RadosBencher::status_printer(void *_bencher) {
-  RadosBencher *bencher = (RadosBencher *)_bencher;
-  bench_data& data = bencher->data;
-  Cond cond;
-  int i = 0;
-  int previous_writes = 0;
-  int cycleSinceChange = 0;
-  double avg_bandwidth;
-  double bandwidth;
-  utime_t ONE_SECOND;
-  ONE_SECOND.set_from_double(1.0);
-  bencher->lock.Lock();
-  while(!data.done) {
-    if (i % 20 == 0) {
-      if (i > 0)
-       cout << "min lat: " << data.min_latency
-            << " max lat: " << data.max_latency
-            << " avg lat: " << data.avg_latency << std::endl;
-      //I'm naughty and don't reset the fill
-      cout << setfill(' ')
-          << setw(5) << "sec"
-          << setw(8) << "Cur ops"
-          << setw(10) << "started"
-          << setw(10) << "finished"
-          << setw(10) << "avg MB/s"
-          << setw(10) << "cur MB/s"
-          << setw(10) << "last lat"
-          << setw(10) << "avg lat" << std::endl;
-    }
-    bandwidth = (double)(data.finished - previous_writes)
-      * (data.trans_size)
-      / (1024*1024)
-      / cycleSinceChange;
-    avg_bandwidth = (double) (data.trans_size) * (data.finished)
-      / (double)(ceph_clock_now(g_ceph_context) - data.start_time) / (1024*1024);
-    if (previous_writes != data.finished) {
-      previous_writes = data.finished;
-      cycleSinceChange = 0;
-      cout << setfill(' ')
-          << setw(5) << i
-          << setw(8) << data.in_flight
-          << setw(10) << data.started
-          << setw(10) << data.finished
-          << setw(10) << avg_bandwidth
-          << setw(10) << bandwidth
-          << setw(10) << (double)data.cur_latency
-          << setw(10) << data.avg_latency << std::endl;
-    }
-    else {
-      cout << setfill(' ')
-          << setw(5) << i
-          << setw(8) << data.in_flight
-          << setw(10) << data.started
-          << setw(10) << data.finished
-          << setw(10) << avg_bandwidth
-          << setw(10) << '0'
-          << setw(10) << '-'
-          << setw(10) << data.avg_latency << std::endl;
-    }
-    ++i;
-    ++cycleSinceChange;
-    cond.WaitInterval(g_ceph_context, bencher->lock, ONE_SECOND);
-  }
-  bencher->lock.Unlock();
-  return NULL;
-}
-
-int RadosBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) {
-  int object_size = op_size;
-  int num_objects = 0;
-  char* contentsChars = new char[op_size];
-  int r = 0;
-  int prevPid = 0;
-
-  //get data from previous write run, if available
-  if (operation != OP_WRITE) {
-    bufferlist object_data;
-    r = io_ctx.read(BENCH_DATA, object_data, sizeof(int)*3, 0);
-    if (r <= 0) {
-      delete[] contentsChars;
-      if (r == -2)
-       cerr << "Must write data before running a read benchmark!" << std::endl;
-      return r;
-    }
-    bufferlist::iterator p = object_data.begin();
-    ::decode(object_size, p);
-    ::decode(num_objects, p);
-    ::decode(prevPid, p);
-  } else {
-    object_size = op_size;
-  }
-
-  lock.Lock();
-  data.done = false;
-  data.object_size = object_size;
-  data.trans_size = op_size;
-  data.in_flight = 0;
-  data.started = 0;
-  data.finished = num_objects;
-  data.min_latency = 9999.0; // this better be higher than initial latency!
-  data.max_latency = 0;
-  data.avg_latency = 0;
-  data.object_contents = contentsChars;
-  lock.Unlock();
 
-  //fill in contentsChars deterministically so we can check returns
-  sanitize_object_contents(&data, data.object_size);
-
-  if (OP_WRITE == operation) {
-    r = write_bench(secondsToRun, concurrentios);
-    if (r != 0) goto out;
-  }
-  else if (OP_SEQ_READ == operation) {
-    r = seq_read_bench(secondsToRun, concurrentios, num_objects, prevPid);
-    if (r != 0) goto out;
-  }
-  else if (OP_RAND_READ == operation) {
-    cerr << "Random test not implemented yet!" << std::endl;
-    r = -1;
-  }
-
- out:
-  delete[] contentsChars;
-  return r;
-}
-
-struct lock_cond {
-  lock_cond(Mutex *_lock) : lock(_lock) {}
-  Mutex *lock;
-  Cond cond;
+struct bench_data {
+  bool done; //is the benchmark is done
+  int object_size; //the size of the objects
+  int trans_size; //size of the write/read to perform
+  // same as object_size for write tests
+  int in_flight; //number of reads/writes being waited on
+  int started;
+  int finished;
+  double min_latency;
+  double max_latency;
+  double avg_latency;
+  utime_t cur_latency; //latency of last completed transaction
+  utime_t start_time; //start time for benchmark
+  char *object_contents; //pointer to the contents written to each object
 };
 
-void _aio_cb(void *cb, void *arg) {
-  struct lock_cond *lc = (struct lock_cond *)arg;
-  lc->lock->Lock();
-  lc->cond.Signal();
-  lc->lock->Unlock();
-}
-
-int RadosBencher::write_bench(int secondsToRun, int concurrentios) {
-  cout << "Maintaining " << concurrentios << " concurrent writes of "
-       << data.object_size << " bytes for at least "
-       << secondsToRun << " seconds." << std::endl;
-
-  librados::AioCompletion* completions[concurrentios];
-  char* name[concurrentios];
-  bufferlist* contents[concurrentios];
-  double total_latency = 0;
-  utime_t start_times[concurrentios];
-  utime_t stopTime;
-  int r = 0;
-  bufferlist b_write;
-  lock_cond lc(&lock);
-  utime_t runtime;
-  utime_t timePassed;
-
-  //set up writes so I can start them together
-  for (int i = 0; i<concurrentios; ++i) {
-    name[i] = new char[128];
-    contents[i] = new bufferlist();
-    generate_object_name(name[i], 128, i);
-    snprintf(data.object_contents, data.object_size, "I'm the %dth object!", i);
-    contents[i]->append(data.object_contents, data.object_size);
-  }
-
-  pthread_t print_thread;
-
-  pthread_create(&print_thread, NULL, RadosBencher::status_printer, (void *)this);
-  lock.Lock();
-  data.start_time = ceph_clock_now(g_ceph_context);
-  lock.Unlock();
-  for (int i = 0; i<concurrentios; ++i) {
-    start_times[i] = ceph_clock_now(g_ceph_context);
-    completions[i] = rados.aio_create_completion((void *) &lc, 0,
-                                                &_aio_cb);
-    r = io_ctx.aio_write(name[i], completions[i], *contents[i], data.object_size, 0);
-    if (r < 0) { //naughty, doesn't clean up heap
-      goto ERR;
-    }
-    lock.Lock();
-    ++data.started;
-    ++data.in_flight;
-    lock.Unlock();
-  }
-
-  //keep on adding new writes as old ones complete until we've passed minimum time
-  int slot;
-  bufferlist* newContents;
-  char* newName;
-
-  //don't need locking for reads because other thread doesn't write
+const int OP_WRITE     = 1;
+const int OP_SEQ_READ  = 2;
+const int OP_RAND_READ = 3;
 
-  runtime.set_from_double(secondsToRun);
-  stopTime = data.start_time + runtime;
-  while( ceph_clock_now(g_ceph_context) < stopTime ) {
-    lock.Lock();
-    while (1) {
-      for (slot = 0; slot < concurrentios; ++slot) {
-       if (completions[slot]->is_safe()) {
-         break;
-       }
-      }
-      if (slot < concurrentios) {
-       break;
-      }
-      lc.cond.Wait(lock);
-    }
-    lock.Unlock();
-    //create new contents and name on the heap, and fill them
-    newContents = new bufferlist();
-    newName = new char[128];
-    generate_object_name(newName, 128, data.started);
-    snprintf(data.object_contents, data.object_size, "I'm the %dth object!", data.started);
-    newContents->append(data.object_contents, data.object_size);
-    completions[slot]->wait_for_safe();
-    lock.Lock();
-    r = completions[slot]->get_return_value();
-    if (r != 0) {
-      lock.Unlock();
-      goto ERR;
-    }
-    data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
-    total_latency += data.cur_latency;
-    if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
-    if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
-    ++data.finished;
-    data.avg_latency = total_latency / data.finished;
-    --data.in_flight;
-    lock.Unlock();
-    completions[slot]->release();
-    completions[slot] = 0;
-    timePassed = ceph_clock_now(g_ceph_context) - data.start_time;
+class RadosBencher {
+  Mutex lock;
+  librados::Rados& rados;
+  librados::IoCtx& io_ctx;
 
-    //write new stuff to rados, then delete old stuff
-    //and save locations of new stuff for later deletion
-    start_times[slot] = ceph_clock_now(g_ceph_context);
-    completions[slot] = rados.aio_create_completion((void *) &lc, 0, &_aio_cb);
-    r = io_ctx.aio_write(newName, completions[slot], *newContents, data.object_size, 0);
-    if (r < 0) {//naughty; doesn't clean up heap space.
-      goto ERR;
-    }
-    lock.Lock();
-    ++data.started;
-    ++data.in_flight;
-    lock.Unlock();
-    delete[] name[slot];
-    delete contents[slot];
-    name[slot] = newName;
-    contents[slot] = newContents;
-  }
+  static void *status_printer(void *bencher);
 
-  while (data.finished < data.started) {
-    slot = data.finished % concurrentios;
-    completions[slot]->wait_for_safe();
-    lock.Lock();
-    r = completions[slot]->get_return_value();
-    if (r != 0) {
-      lock.Unlock();
-      goto ERR;
-    }
-    data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
-    total_latency += data.cur_latency;
-    if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
-    if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
-    ++data.finished;
-    data.avg_latency = total_latency / data.finished;
-    --data.in_flight;
-    lock.Unlock();
-    completions[slot]->release();
-    completions[slot] = 0;
-    delete[] name[slot];
-    delete contents[slot];
-  }
+  struct bench_data data;
 
-  timePassed = ceph_clock_now(g_ceph_context) - data.start_time;
-  lock.Lock();
-  data.done = true;
-  lock.Unlock();
-
-  pthread_join(print_thread, NULL);
-
-  double bandwidth;
-  bandwidth = ((double)data.finished)*((double)data.object_size)/(double)timePassed;
-  bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
-  char bw[20];
-  snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth);
-
-  cout << "Total time run:        " << timePassed << std::endl
-       << "Total writes made:     " << data.finished << std::endl
-       << "Write size:            " << data.object_size << std::endl
-       << "Bandwidth (MB/sec):    " << bw << std::endl
-       << "Average Latency:       " << data.avg_latency << std::endl
-       << "Max latency:           " << data.max_latency << std::endl
-       << "Min latency:           " << data.min_latency << std::endl;
-
-  //write object size/number data for read benchmarks
-  ::encode(data.object_size, b_write);
-  ::encode(data.finished, b_write);
-  ::encode(getpid(), b_write);
-  io_ctx.write(BENCH_DATA, b_write, sizeof(int)*3, 0);
-  return 0;
-
- ERR:
-  lock.Lock();
-  data.done = 1;
-  lock.Unlock();
-  pthread_join(print_thread, NULL);
-  return -5;
-}
-
-int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) {
-  data.finished = 0;
-
-  lock_cond lc(&lock);
-  librados::AioCompletion* completions[concurrentios];
-  char* name[concurrentios];
-  bufferlist* contents[concurrentios];
-  int index[concurrentios];
-  int errors = 0;
-  utime_t start_time;
-  utime_t start_times[concurrentios];
-  utime_t time_to_run;
-  time_to_run.set_from_double(seconds_to_run);
-  double total_latency = 0;
-  int r = 0;
-  utime_t runtime;
-  sanitize_object_contents(&data, 128); //clean it up once; subsequent
-  //changes will be safe because string length monotonically increases
-
-  //set up initial reads
-  for (int i = 0; i < concurrentios; ++i) {
-    name[i] = new char[128];
-    generate_object_name(name[i], 128, i, pid);
-    contents[i] = new bufferlist();
-  }
-
-  pthread_t print_thread;
-  pthread_create(&print_thread, NULL, status_printer, (void *)this);
-
-  lock.Lock();
-  data.start_time = ceph_clock_now(g_ceph_context);
-  lock.Unlock();
-  utime_t finish_time = data.start_time + time_to_run;
-  //start initial reads
-  for (int i = 0; i < concurrentios; ++i) {
-    index[i] = i;
-    start_times[i] = ceph_clock_now(g_ceph_context);
-    completions[i] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0);
-    r = io_ctx.aio_read(name[i], completions[i], contents[i], data.object_size, 0);
-    if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
-      cerr << "r = " << r << std::endl;
-      goto ERR;
-    }
-    lock.Lock();
-    ++data.started;
-    ++data.in_flight;
-    lock.Unlock();
-  }
-
-  //keep on adding new reads as old ones complete
-  int slot;
-  char* newName;
-  bufferlist *cur_contents;
-
-  while (seconds_to_run && (ceph_clock_now(g_ceph_context) < finish_time) &&
-      num_objects > data.started) {
-    lock.Lock();
-    while (1) {
-      for (slot = 0; slot < concurrentios; ++slot) {
-       if (completions[slot]->is_complete()) {
-         break;
-       }
-      }
-      if (slot < concurrentios) {
-       break;
-      }
-      lc.cond.Wait(lock);
-    }
-    lock.Unlock();
-    newName = new char[128];
-    generate_object_name(newName, 128, data.started, pid);
-    int current_index = index[slot];
-    index[slot] = data.started;
-    completions[slot]->wait_for_complete();
-    lock.Lock();
-    r = completions[slot]->get_return_value();
-    if (r != 0) {
-      cerr << "read got " << r << std::endl;
-      lock.Unlock();
-      goto ERR;
-    }
-    data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
-    total_latency += data.cur_latency;
-    if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
-    if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
-    ++data.finished;
-    data.avg_latency = total_latency / data.finished;
-    --data.in_flight;
-    lock.Unlock();
-    completions[slot]->release();
-    completions[slot] = 0;
-    cur_contents = contents[slot];
-
-    //start new read and check data if requested
-    start_times[slot] = ceph_clock_now(g_ceph_context);
-    contents[slot] = new bufferlist();
-    completions[slot] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0);
-    r = io_ctx.aio_read(newName, completions[slot], contents[slot], data.object_size, 0);
-    if (r < 0) {
-      goto ERR;
-    }
-    lock.Lock();
-    ++data.started;
-    ++data.in_flight;
-    snprintf(data.object_contents, data.object_size, "I'm the %dth object!", current_index);
-    lock.Unlock();
-    if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) {
-      cerr << name[slot] << " is not correct!" << std::endl;
-      ++errors;
-    }
-    delete name[slot];
-    name[slot] = newName;
-    delete cur_contents;
-  }
-
-  //wait for final reads to complete
-  while (data.finished < data.started) {
-    slot = data.finished % concurrentios;
-    completions[slot]->wait_for_complete();
-    lock.Lock();
-    r = completions[slot]->get_return_value();
-    if (r != 0) {
-      cerr << "read got " << r << std::endl;
-      lock.Unlock();
-      goto ERR;
-    }
-    data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
-    total_latency += data.cur_latency;
-    if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
-    if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
-    ++data.finished;
-    data.avg_latency = total_latency / data.finished;
-    --data.in_flight;
-    completions[slot]-> release();
-    completions[slot] = 0;
-    snprintf(data.object_contents, data.object_size, "I'm the %dth object!", index[slot]);
-    lock.Unlock();
-    if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
-      cerr << name[slot] << " is not correct!" << std::endl;
-      ++errors;
-    }
-    delete name[slot];
-    delete contents[slot];
-  }
-
-  runtime = ceph_clock_now(g_ceph_context) - data.start_time;
-  lock.Lock();
-  data.done = true;
-  lock.Unlock();
-
-  pthread_join(print_thread, NULL);
-
-  double bandwidth;
-  bandwidth = ((double)data.finished)*((double)data.object_size)/(double)runtime;
-  bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
-  char bw[20];
-  snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth);
-
-  cout << "Total time run:        " << runtime << std::endl
-       << "Total reads made:     " << data.finished << std::endl
-       << "Read size:            " << data.object_size << std::endl
-       << "Bandwidth (MB/sec):    " << bw << std::endl
-       << "Average Latency:       " << data.avg_latency << std::endl
-       << "Max latency:           " << data.max_latency << std::endl
-       << "Min latency:           " << data.min_latency << std::endl;
-
-  return 0;
-
- ERR:
-  lock.Lock();
-  data.done = 1;
-  lock.Unlock();
-  pthread_join(print_thread, NULL);
-  return -5;
-}
+  int write_bench(int secondsToRun, int concurrentios);
+  int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid);
+public:
+  RadosBencher(librados::Rados& _r, librados::IoCtx& _i) : lock("RadosBencher::lock"), rados(_r), io_ctx(_i) {}
+  int aio_bench(int operation, int secondsToRun, int concurrentios, int op_size);
+};
 
 
+#endif
index 9f6959204c9a5b00da30da423fcbb3e902a14206..690d72cc8398dac8e6a8bf994a95fdc7bcbf5758 100644 (file)
@@ -1218,7 +1218,8 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
       operation = OP_RAND_READ;
     else
       usage_exit();
-    ret = aio_bench(rados, io_ctx, operation, seconds, concurrent_ios, op_size);
+    RadosBencher bencher(rados, io_ctx);
+    ret = bencher.aio_bench(operation, seconds, concurrent_ios, op_size);
     if (ret != 0)
       cerr << "error during benchmark: " << ret << std::endl;
   }