]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ObjBencher: add rand_read_bench functions to support rand test in rados-bench 782/head
authordanchai <tengweicai@gmail.com>
Tue, 29 Oct 2013 10:10:48 +0000 (18:10 +0800)
committerdanchai <tengweicai@gmail.com>
Wed, 4 Dec 2013 07:41:42 +0000 (15:41 +0800)
Signed-off-by: Tengwei Cai <tengweicai@gmail.com>
src/common/obj_bencher.cc
src/common/obj_bencher.h

index 599cbbc62fb20c0014d0c2cf70dda4e8ba5a6f28..d3c3b44692cf88c6c496035eaaf42d688397c608 100644 (file)
@@ -214,8 +214,8 @@ int ObjBencher::aio_bench(
     if (r != 0) goto out;
   }
   else if (OP_RAND_READ == operation) {
-    cerr << "Random test not implemented yet!" << std::endl;
-    r = -1;
+    r = rand_read_bench(secondsToRun, num_objects, concurrentios, prevPid);
+    if (r != 0) goto out;
   }
 
   if (OP_WRITE == operation && cleanup) {
@@ -683,6 +683,193 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
   return -5;
 }
 
+int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid)
+{
+  lock_cond lc(&lock);
+  std::vector<string> name(concurrentios);
+  std::string newName;
+  bufferlist* contents[concurrentios];
+  int index[concurrentios];
+  int errors = 0;
+  utime_t start_time;
+  std::vector<utime_t> start_times(concurrentios);
+  utime_t time_to_run;
+  time_to_run.set_from_double(seconds_to_run);
+  double total_latency = 0;
+  int r = 0;
+  utime_t runtime;
+  sanitize_object_contents(&data, data.object_size); //clean it up once; subsequent
+  //changes will be safe because string length should remain the same
+
+  srand (time(NULL));
+
+  r = completions_init(concurrentios);
+  if (r < 0)
+    return r;
+
+  //set up initial reads
+  for (int i = 0; i < concurrentios; ++i) {
+    name[i] = generate_object_name(i, pid);
+    contents[i] = new bufferlist();
+  }
+
+  lock.Lock();
+  data.finished = 0;
+  data.start_time = ceph_clock_now(g_ceph_context);
+  lock.Unlock();
+
+  pthread_t print_thread;
+  pthread_create(&print_thread, NULL, status_printer, (void *)this);
+
+  utime_t finish_time = data.start_time + time_to_run;
+  //start initial reads
+  for (int i = 0; i < concurrentios; ++i) {
+    index[i] = i;
+    start_times[i] = ceph_clock_now(g_ceph_context);
+    create_completion(i, _aio_cb, (void *)&lc);
+    r = aio_read(name[i], i, contents[i], data.object_size);
+    if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
+      cerr << "r = " << r << std::endl;
+      goto ERR;
+    }
+    lock.Lock();
+    ++data.started;
+    ++data.in_flight;
+    lock.Unlock();
+  }
+
+  //keep on adding new reads as old ones complete
+  int slot;
+  bufferlist *cur_contents;
+  int rand_id;
+
+  slot = 0;
+  while (seconds_to_run && (ceph_clock_now(g_ceph_context) < finish_time)) {
+    lock.Lock();
+    int old_slot = slot;
+    bool found = false;
+    while (1) {
+      do {
+        if (completion_is_done(slot)) {
+          found = true;
+          break;
+        }
+        slot++;
+        if (slot == concurrentios) {
+          slot = 0;
+        }
+      } while (slot != old_slot);
+      if (found) {
+        break;
+      }
+      lc.cond.Wait(lock);
+    }
+    lock.Unlock();
+    rand_id = rand() % num_objects;
+    newName = generate_object_name(rand_id, pid);
+    int current_index = index[slot];
+    index[slot] = rand_id;
+    completion_wait(slot);
+    lock.Lock();
+    r = completion_ret(slot);
+    if (r != 0) {
+      cerr << "read got " << r << std::endl;
+      lock.Unlock();
+      goto ERR;
+    }
+    data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+    total_latency += data.cur_latency;
+    if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+    if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+    ++data.finished;
+    data.avg_latency = total_latency / data.finished;
+    --data.in_flight;
+    lock.Unlock();
+    release_completion(slot);
+    cur_contents = contents[slot];
+
+    //start new read and check data if requested
+    start_times[slot] = ceph_clock_now(g_ceph_context);
+    contents[slot] = new bufferlist();
+    create_completion(slot, _aio_cb, (void *)&lc);
+    r = aio_read(newName, slot, contents[slot], data.object_size);
+    if (r < 0) {
+      goto ERR;
+    }
+    lock.Lock();
+    ++data.started;
+    ++data.in_flight;
+    snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", current_index);
+    lock.Unlock();
+    if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) {
+      cerr << name[slot] << " is not correct!" << std::endl;
+      ++errors;
+    }
+    name[slot] = newName;
+    delete cur_contents;
+  }
+
+  //wait for final reads to complete
+  while (data.finished < data.started) {
+    slot = data.finished % concurrentios;
+    completion_wait(slot);
+    lock.Lock();
+    r = completion_ret(slot);
+    if (r != 0) {
+      cerr << "read got " << r << std::endl;
+      lock.Unlock();
+      goto ERR;
+    }
+    data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+    total_latency += data.cur_latency;
+    if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+    if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+    ++data.finished;
+    data.avg_latency = total_latency / data.finished;
+    --data.in_flight;
+    release_completion(slot);
+    snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", index[slot]);
+    lock.Unlock();
+    if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
+      cerr << name[slot] << " is not correct!" << std::endl;
+      ++errors;
+    }
+    delete contents[slot];
+  }
+
+  runtime = ceph_clock_now(g_ceph_context) - data.start_time;
+  lock.Lock();
+  data.done = true;
+  lock.Unlock();
+
+  pthread_join(print_thread, NULL);
+
+  double bandwidth;
+  bandwidth = ((double)data.finished)*((double)data.object_size)/(double)runtime;
+  bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
+  char bw[20];
+  snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth);
+
+  out(cout) << "Total time run:        " << runtime << std::endl
+       << "Total reads made:     " << data.finished << std::endl
+       << "Read size:            " << data.object_size << std::endl
+       << "Bandwidth (MB/sec):    " << bw << std::endl
+       << "Average Latency:       " << data.avg_latency << std::endl
+       << "Max latency:           " << data.max_latency << std::endl
+       << "Min latency:           " << data.min_latency << std::endl;
+
+  completions_done();
+
+  return 0;
+
+ ERR:
+  lock.Lock();
+  data.done = 1;
+  lock.Unlock();
+  pthread_join(print_thread, NULL);
+  return -5;
+}
+
 int ObjBencher::clean_up(const std::string& prefix, int concurrentios) {
   int r = 0;
   int object_size;
index c8f671f8c90841e9ca5e0939be4ed1cc8cc99dc9..b87821a7d5afa6a6953132c0e3c4d7ee7e1b73c5 100644 (file)
@@ -65,6 +65,7 @@ protected:
 
   int write_bench(int secondsToRun, int maxObjects, int concurrentios);
   int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid);
+  int rand_read_bench(int secondsToRun, int num_objects, int concurrentios, int writePid);
 
   int clean_up(int num_objects, int prevPid, int concurrentios);
   int clean_up_slow(const std::string& prefix, int concurrentios);