]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rados bench: fix the delayed checking of completed ops 32928/head
authorJianshen Liu <jliu120@ucsc.edu>
Tue, 28 Jan 2020 05:52:36 +0000 (22:52 -0700)
committerJianshen Liu <jliu120@ucsc.edu>
Wed, 5 Feb 2020 00:44:42 +0000 (16:44 -0800)
When the submission process is done, rados bench will scan the
completed ops in a sequence manner. However, because the order of
completion may not be sequential, some of the completed ops suffer from
delayed checking, causing the average latency of all ops to be
prolonged. The solution is to reuse the checking mechanism used during
the submission process.

Signed-off-by: Jianshen Liu <jliu120@ucsc.edu>
src/common/obj_bencher.cc
src/tools/rados/rados.cc

index 27cc80d18979ecd819bf48b4d919995ae66a2b34..e94fd7c3e346c0872e3d70af1292d00ec02ebec5 100644 (file)
@@ -390,9 +390,9 @@ int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
 int ObjBencher::write_bench(int secondsToRun,
                            int concurrentios, const string& run_name_meta,
                            unsigned max_objects, int prev_pid) {
-  if (concurrentios <= 0) 
+  if (concurrentios <= 0)
     return -EINVAL;
-  
+
   if (!formatter) {
     out(cout) << "Maintaining " << concurrentios << " concurrent writes of "
              << data.op_size << " bytes to objects of size "
@@ -472,7 +472,7 @@ int ObjBencher::write_bench(int secondsToRun,
   stopTime = data.start_time + std::chrono::seconds(secondsToRun);
   slot = 0;
   locker.lock();
-  while (secondsToRun && mono_clock::now() < stopTime) {
+  while (data.finished < data.started) {
     bool found = false;
     while (1) {
       int old_slot = slot;
@@ -491,12 +491,6 @@ int ObjBencher::write_bench(int secondsToRun,
       lc.cond.wait(locker);
     }
     locker.unlock();
-    //create new contents and name on the heap, and fill them
-    newName = generate_object_name_fast(data.started / writes_per_object);
-    newContents = contents[slot].get();
-    snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
-    // we wrote to buffer, going around internal crc cache, so invalidate it now.
-    newContents->invalidate_crc();
 
     completion_wait(slot);
     locker.lock();
@@ -519,7 +513,28 @@ int ObjBencher::write_bench(int secondsToRun,
     locker.unlock();
     release_completion(slot);
 
+    if (!secondsToRun || mono_clock::now() >= stopTime) {
+      locker.lock();
+      continue;
+    }
+
+    if (data.op_size && max_objects &&
+        data.started >=
+            (int)((data.object_size * max_objects + data.op_size - 1) /
+                  data.op_size)) {
+      locker.lock();
+      continue;
+    }
+
     //write new stuff to backend
+
+    //create new contents and name on the heap, and fill them
+    newName = generate_object_name_fast(data.started / writes_per_object);
+    newContents = contents[slot].get();
+    snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
+    // we wrote to buffer, going around internal crc cache, so invalidate it now.
+    newContents->invalidate_crc();
+
     start_times[slot] = mono_clock::now();
     r = create_completion(slot, _aio_cb, &lc);
     if (r < 0)
@@ -533,39 +548,9 @@ int ObjBencher::write_bench(int secondsToRun,
     locker.lock();
     ++data.started;
     ++data.in_flight;
-    if (data.op_size) {
-      if (max_objects &&
-         data.started >= (int)((data.object_size * max_objects + data.op_size - 1) /
-                              data.op_size))
-        break;
-    }
   }
   locker.unlock();
 
-  while (data.finished < data.started) {
-    slot = data.finished % concurrentios;
-    completion_wait(slot);
-    locker.lock();
-    r = completion_ret(slot);
-    if (r != 0) {
-      locker.unlock();
-      goto ERR;
-    }
-    data.cur_latency = mono_clock::now() - start_times[slot];
-    total_latency += data.cur_latency.count();
-    if (data.cur_latency.count() > data.max_latency)
-      data.max_latency = data.cur_latency.count();
-    if (data.cur_latency.count() < data.min_latency)
-      data.min_latency = data.cur_latency.count();
-    ++data.finished;
-    double delta = data.cur_latency.count() - data.avg_latency;
-    data.avg_latency = total_latency / data.finished;
-    data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
-    --data.in_flight;
-    locker.unlock();
-    release_completion(slot);
-  }
-
   timePassed = mono_clock::now() - data.start_time;
   locker.lock();
   data.done = true;
@@ -601,7 +586,7 @@ int ObjBencher::write_bench(int secondsToRun,
     out(cout) << "Total time run:         " << timePassed.count() << std::endl
        << "Total writes made:      " << data.finished << std::endl
        << "Write size:             " << data.op_size << std::endl
-       << "Object size:            " << data.object_size << std::endl      
+       << "Object size:            " << data.object_size << std::endl
        << "Bandwidth (MB/sec):     " << setprecision(6) << bandwidth << std::endl
        << "Stddev Bandwidth:       " << bandwidth_stddev << std::endl
        << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl
@@ -659,7 +644,7 @@ int ObjBencher::seq_read_bench(
 
   lock_cond lc(&lock);
 
-  if (concurrentios <= 0) 
+  if (concurrentios <= 0)
     return -EINVAL;
 
   std::vector<string> name(concurrentios);
@@ -721,8 +706,7 @@ int ObjBencher::seq_read_bench(
   bufferlist *cur_contents;
 
   slot = 0;
-  while ((seconds_to_run && mono_clock::now() < finish_time) &&
-        num_ops > data.started) {
+  while (data.finished < data.started) {
     locker.lock();
     int old_slot = slot;
     bool found = false;
@@ -748,21 +732,26 @@ int ObjBencher::seq_read_bench(
 
     cur_contents = contents[slot].get();
     int current_index = index[slot];
-    
+
     // invalidate internal crc cache
     cur_contents->invalidate_crc();
-  
+
     if (!no_verify) {
       snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
-      if ( (cur_contents->length() != data.op_size) || 
+      if ( (cur_contents->length() != data.op_size) ||
            (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) {
         cerr << name[slot] << " is not correct!" << std::endl;
         ++errors;
       }
     }
 
-    newName = generate_object_name_fast(data.started / reads_per_object, pid);
-    index[slot] = data.started;
+    bool start_new_read = (seconds_to_run && mono_clock::now() < finish_time) &&
+                          num_ops > data.started;
+    if (start_new_read) {
+      newName = generate_object_name_fast(data.started / reads_per_object, pid);
+      index[slot] = data.started;
+    }
+
     locker.unlock();
     completion_wait(slot);
     locker.lock();
@@ -783,6 +772,9 @@ int ObjBencher::seq_read_bench(
     locker.unlock();
     release_completion(slot);
 
+    if (!start_new_read)
+      continue;
+
     //start new read and check data if requested
     start_times[slot] = mono_clock::now();
     create_completion(slot, _aio_cb, (void *)&lc);
@@ -798,40 +790,6 @@ int ObjBencher::seq_read_bench(
     name[slot] = newName;
   }
 
-  //wait for final reads to complete
-  while (data.finished < data.started) {
-    slot = data.finished % concurrentios;
-    completion_wait(slot);
-    locker.lock();
-    r = completion_ret(slot);
-    if (r < 0) {
-      cerr << "read got " << r << std::endl;
-      locker.unlock();
-      goto ERR;
-    }
-    data.cur_latency = mono_clock::now() - start_times[slot];
-    total_latency += data.cur_latency.count();
-    if (data.cur_latency.count() > data.max_latency)
-      data.max_latency = data.cur_latency.count();
-    if (data.cur_latency.count() < data.min_latency)
-      data.min_latency = data.cur_latency.count();
-    ++data.finished;
-    data.avg_latency = total_latency / data.finished;
-    --data.in_flight;
-    release_completion(slot);
-    if (!no_verify) {
-      snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
-      locker.unlock();
-      if ((contents[slot]->length() != data.op_size) || 
-         (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
-        cerr << name[slot] << " is not correct!" << std::endl;
-        ++errors;
-      }
-    } else {
-        locker.unlock();
-    }
-  }
-
   timePassed = mono_clock::now() - data.start_time;
   locker.lock();
   data.done = true;
@@ -842,7 +800,7 @@ int ObjBencher::seq_read_bench(
   double bandwidth;
   bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
   bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
-  
+
   double iops_stddev;
   if (data.idata.iops_cycles > 1) {
     iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
@@ -961,7 +919,7 @@ int ObjBencher::rand_read_bench(
   int rand_id;
 
   slot = 0;
-  while ((seconds_to_run && mono_clock::now() < finish_time)) {
+  while (data.finished < data.started) {
     locker.lock();
     int old_slot = slot;
     bool found = false;
@@ -1006,26 +964,31 @@ int ObjBencher::rand_read_bench(
     ++data.finished;
     data.avg_latency = total_latency / data.finished;
     --data.in_flight;
-    locker.unlock();
-    
+
     if (!no_verify) {
       snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
-      if ((cur_contents->length() != data.op_size) || 
+      if ((cur_contents->length() != data.op_size) ||
           (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) {
         cerr << name[slot] << " is not correct!" << std::endl;
         ++errors;
       }
-    } 
+    }
+
+    locker.unlock();
+    release_completion(slot);
+
+    if (!seconds_to_run || mono_clock::now() >= finish_time)
+      continue;
+
+    //start new read and check data if requested
 
     rand_id = rand() % num_ops;
     newName = generate_object_name_fast(rand_id / reads_per_object, pid);
     index[slot] = rand_id;
-    release_completion(slot);
 
     // invalidate internal crc cache
     cur_contents->invalidate_crc();
 
-    //start new read and check data if requested
     start_times[slot] = mono_clock::now();
     create_completion(slot, _aio_cb, (void *)&lc);
     r = aio_read(newName, slot, contents[slot].get(), data.op_size,
@@ -1040,41 +1003,6 @@ int ObjBencher::rand_read_bench(
     name[slot] = newName;
   }
 
-
-  //wait for final reads to complete
-  while (data.finished < data.started) {
-    slot = data.finished % concurrentios;
-    completion_wait(slot);
-    locker.lock();
-    r = completion_ret(slot);
-    if (r < 0) {
-      cerr << "read got " << r << std::endl;
-      locker.unlock();
-      goto ERR;
-    }
-    data.cur_latency = mono_clock::now() - start_times[slot];
-    total_latency += data.cur_latency.count();
-    if (data.cur_latency.count() > data.max_latency)
-      data.max_latency = data.cur_latency.count();
-    if (data.cur_latency.count() < data.min_latency)
-      data.min_latency = data.cur_latency.count();
-    ++data.finished;
-    data.avg_latency = total_latency / data.finished;
-    --data.in_flight;
-    release_completion(slot);
-    if (!no_verify) {
-      snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
-      locker.unlock();
-      if ((contents[slot]->length() != data.op_size) || 
-          (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
-        cerr << name[slot] << " is not correct!" << std::endl;
-        ++errors;
-      }
-    } else {
-        locker.unlock();
-    }
-  }
-
   timePassed = mono_clock::now() - data.start_time;
   locker.lock();
   data.done = true;
@@ -1085,7 +1013,7 @@ int ObjBencher::rand_read_bench(
   double bandwidth;
   bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
   bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
-  
+
   double iops_stddev;
   if (data.idata.iops_cycles > 1) {
     iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
@@ -1199,8 +1127,8 @@ int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, cons
 
 int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
   lock_cond lc(&lock);
-  
-  if (concurrentios <= 0) 
+
+  if (concurrentios <= 0)
     return -EINVAL;
 
   std::vector<string> name(concurrentios);
@@ -1246,7 +1174,7 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
   }
 
   //keep on adding new removes as old ones complete
-  while (data.started < num_objects) {
+  while (data.finished < data.started) {
     locker.lock();
     int old_slot = slot;
     bool found = false;
@@ -1267,7 +1195,6 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
       lc.cond.wait(locker);
     }
     locker.unlock();
-    newName = generate_object_name_fast(data.started, prevPid);
     completion_wait(slot);
     locker.lock();
     r = completion_ret(slot);
@@ -1281,7 +1208,11 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
     locker.unlock();
     release_completion(slot);
 
+    if (data.started >= num_objects)
+      continue;
+
     //start new remove and check data if requested
+    newName = generate_object_name_fast(data.started, prevPid);
     create_completion(slot, _aio_cb, (void *)&lc);
     r = aio_remove(newName, slot);
     if (r < 0) {
@@ -1294,23 +1225,6 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
     name[slot] = newName;
   }
 
-  //wait for final removes to complete
-  while (data.finished < data.started) {
-    slot = data.finished % concurrentios;
-    completion_wait(slot);
-    locker.lock();
-    r = completion_ret(slot);
-    if (r != 0 && r != -ENOENT) { // file does not exist
-      cerr << "remove got " << r << std::endl;
-      locker.unlock();
-      goto ERR;
-    }
-    ++data.finished;
-    --data.in_flight;
-    release_completion(slot);
-    locker.unlock();
-  }
-
   locker.lock();
   data.done = true;
   locker.unlock();
index 2a95a07b47112709b3b78418e959e1da6351f116..88b1d89852e4bfb14b8afef4253e3ba2cfd51e00 100644 (file)
@@ -1115,7 +1115,7 @@ protected:
   }
 
   bool completion_is_done(int slot) override {
-    return completions[slot]->is_complete();
+    return completions[slot] && completions[slot]->is_complete();
   }
 
   int completion_wait(int slot) override {