]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rados: read benchmark is asynchronous.
authorGreg Farnum <gregf@hq.newdream.net>
Tue, 1 Dec 2009 21:55:19 +0000 (13:55 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Wed, 2 Dec 2009 01:32:38 +0000 (17:32 -0800)
src/rados_bencher.h

index 64a61ec64a9663a360e30eba5c7ba742ababf5cf..c8ee1980a7d398983fbe43d9aaba970b6cc1f5ee 100644 (file)
@@ -185,6 +185,11 @@ int write_bench(Rados& rados, rados_pool_t pool,
     slot = data->finished % concurrentios;
     completions[slot]->wait_for_safe();
     dataLock.Lock();
+    r = completions[slot]->get_return_value();
+    if (r != 0) {
+      dataLock.Unlock();
+      return r;
+    }
     data->cur_latency = g_clock.now() - start_times[slot];
     total_latency += data->cur_latency;
     if (data->cur_latency > data->max_latency) data->max_latency = data->cur_latency;
@@ -221,45 +226,165 @@ int write_bench(Rados& rados, rados_pool_t pool,
   return 0;
 }
 
-int seq_read_bench(Rados& rados, rados_pool_t pool, bench_data *data) {
+int seq_read_bench(Rados& rados, rados_pool_t pool,
+                  int concurrentios, bench_data *write_data, int verify) {
+  bench_data *data = new bench_data();
+  data->done = false;
+  data->object_size = write_data->object_size;
+  data->trans_size = data->object_size;
+  data->in_flight= 0;
+  data->started = 0;
+  data->finished = 0;
+  data->min_latency = 9999.0;
+  data->max_latency = 0;
+  data->avg_latency = 0;
+  data->object_contents = write_data->object_contents;
+
+  Rados::AioCompletion* completions[concurrentios];
+  char* name[concurrentios];
+  bufferlist* contents[concurrentios];
   int errors = 0;
-  char matchName[128];
-  object_t oid;
-  bufferlist actualContents;
   utime_t start_time;
-  utime_t last_start;
+  utime_t start_times[concurrentios];
   double total_latency = 0;
-  double avg_latency;
-  double avg_bw;
   int r = 0;
-  sanitize_object_contents(data, 128);
-  start_time = g_clock.now();
-  for (int i = 0; i < data->finished; ++i ) {
-    snprintf(matchName, 128, "Object %d", i);
-    oid = object_t(matchName);
-    snprintf(data->object_contents, data->object_size, "I'm the %dth object!", i);
-    last_start = g_clock.now();
-    r = rados.read(pool, oid, 0, actualContents, data->object_size);
-    if (r != data->object_size) {
-      if (r < 0) return r;
-      else return -5; //EIO
+  sanitize_object_contents(data, 128); //clean it up once; subsequent
+  //changes will be safe because string length monotonically increases
+
+  //set up initial reads
+  for (int i = 0; i < concurrentios; ++i) {
+    name[i] = new char[128];
+    snprintf(name[i], 128, "Object %d", i);
+    contents[i] = new bufferlist();
+  }
+
+  pthread_t print_thread;
+  pthread_create(&print_thread, NULL, status_printer, (void *)data);
+
+  dataLock.Lock();
+  data->start_time = g_clock.now();
+  //start initial reads
+  for (int i = 0; i < concurrentios; ++i) {
+    start_times[i] = g_clock.now();
+    r = rados.aio_read(pool, name[i], 0, contents[i], data->object_size, &completions[i]);
+    if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
+      cerr << "r = " << r << std::endl;
+      dataLock.Unlock();
+      return -5; //EIO
     }
-    total_latency += (double) g_clock.now() - last_start;
-    if (memcmp(data->object_contents, actualContents.c_str(), data->object_size) != 0 ) {
-      cerr << "Object " << matchName << " is not correct!";
-      ++errors;
+    ++data->started;
+    ++data->in_flight;
+  }
+  dataLock.Unlock();
+
+  //keep on adding new reads as old ones complete
+  int slot;
+  char* newName;
+  utime_t runtime;
+  bufferlist *cur_contents;
+
+  for (int i = 0; i < write_data->finished - concurrentios; ++i) {
+    slot = data->finished % concurrentios;
+    newName = new char[128];
+    snprintf(newName, 128, "Object %d", data->started);
+    completions[slot]->wait_for_complete();
+    dataLock.Lock();
+    r = completions[slot]->get_return_value();
+    if (r != 0) {
+      cerr << "read got " << r << std::endl;
+      dataLock.Unlock();
+      return r;
     }
-    actualContents.clear();
+    data->cur_latency = g_clock.now() - start_times[slot];
+    total_latency += data->cur_latency;
+    if( data->cur_latency > data->max_latency) data->max_latency = data->cur_latency;
+    if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency;
+    ++data->finished;
+    data->avg_latency = total_latency / data->finished;
+    --data->in_flight;
+    dataLock.Unlock();
+    completions[slot]->release();
+    cur_contents = contents[slot];
+
+    //start new read and check data if requested
+    start_times[slot] = g_clock.now();
+    contents[slot] = new bufferlist();
+    r = rados.aio_read(pool, newName, 0, contents[slot], data->object_size, &completions[slot]);
+    if (r < 0)
+      return r;
+    dataLock.Lock();
+    ++data->started;
+    ++data->in_flight;
+    dataLock.Unlock();
+    if (verify) {
+      dataLock.Lock();
+      snprintf(data->object_contents, data->object_size, "I'm the %dth object!", i);
+      dataLock.Unlock();
+      if (memcmp(data->object_contents, cur_contents->c_str(), data->object_size) != 0) {
+       cerr << name[slot] << " is not correct!";
+       ++errors;
+      }
+    }
+    delete name[slot];
+    name[slot] = newName;
+    delete cur_contents;
   }
-  last_start = g_clock.now();
-  avg_latency = total_latency / data->finished;
-  avg_bw = data->finished * data->object_size /
-    (double)(last_start - start_time) / (1024 *1024);
-  cout << "read avg latency: " << avg_latency
-       << " read avg bw: " << avg_bw << std::endl;
-  
-  if (errors) cout << "WARNING: There were " << errors << " total errors in copying!\n";
-  else cout << "No errors in copying!\n";
+
+  //wait for final reads to complete
+  while (data->finished < data->started) {
+    slot = data->finished % concurrentios;
+    completions[slot]->wait_for_complete();
+    dataLock.Lock();
+    r = completions[slot]->get_return_value();
+    if (r != 0) {
+      cerr << "read got " << r << std::endl;
+      dataLock.Unlock();
+      return r;
+    }
+    data->cur_latency = g_clock.now() - start_times[slot];
+    total_latency += data->cur_latency;
+    if (data->cur_latency > data->max_latency) data->max_latency = data->cur_latency;
+    if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency;
+    ++data->finished;
+    data->avg_latency = total_latency / data->finished;
+    --data->in_flight;
+    dataLock.Unlock();
+    completions[slot]-> release();
+    if (verify) {
+      dataLock.Lock();
+      snprintf(data->object_contents, data->object_size, "I'm the %dth object!", data->finished-1);
+      dataLock.Unlock();
+      if (memcmp(data->object_contents, contents[slot]->c_str(), data->object_size) != 0) {
+       cerr << name[slot] << " is not correct!" << std::endl;
+       ++errors;
+      }
+    }
+    delete name[slot];
+    delete contents[slot];
+  }
+
+  runtime = g_clock.now() - data->start_time;
+  dataLock.Lock();
+  data->done = true;
+  dataLock.Unlock();
+
+  pthread_join(print_thread, NULL);
+
+  double bandwidth;
+  bandwidth = ((double)data->finished)*((double)data->object_size)/(double)runtime;
+  bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
+  char bw[20];
+  sprintf(bw, "%.3lf \n", bandwidth);
+
+  cout << "Total time run:        " << runtime << std::endl
+       << "Total reads made:     " << data->finished << std::endl
+       << "Read size:            " << data->object_size << std::endl
+       << "Bandwidth (MB/sec):    " << bw << std::endl
+       << "Average Latency:       " << data->avg_latency << std::endl
+       << "Max latency:           " << data->max_latency << std::endl
+       << "Min latency:           " << data->min_latency << std::endl;
+
+  delete data;
   return 0;
 }