From 604bd1785635139a2b520f654c653057330c9c24 Mon Sep 17 00:00:00 2001 From: Jianshen Liu Date: Mon, 27 Jan 2020 22:52:36 -0700 Subject: [PATCH] rados bench: fix the delayed checking of completed ops When the submission process is done, rados bench will scan the completed ops in a sequence manner. However, because the order of completion may not be sequential, some of the completed ops suffer from delayed checking, causing the average latency of all ops to be prolonged. The solution is to reuse the checking mechanism used during the submission process. Signed-off-by: Jianshen Liu --- src/common/obj_bencher.cc | 208 +++++++++++--------------------------- src/tools/rados/rados.cc | 2 +- 2 files changed, 62 insertions(+), 148 deletions(-) diff --git a/src/common/obj_bencher.cc b/src/common/obj_bencher.cc index 27cc80d18979e..e94fd7c3e346c 100644 --- a/src/common/obj_bencher.cc +++ b/src/common/obj_bencher.cc @@ -390,9 +390,9 @@ int ObjBencher::fetch_bench_metadata(const std::string& metadata_file, int ObjBencher::write_bench(int secondsToRun, int concurrentios, const string& run_name_meta, unsigned max_objects, int prev_pid) { - if (concurrentios <= 0) + if (concurrentios <= 0) return -EINVAL; - + if (!formatter) { out(cout) << "Maintaining " << concurrentios << " concurrent writes of " << data.op_size << " bytes to objects of size " @@ -472,7 +472,7 @@ int ObjBencher::write_bench(int secondsToRun, stopTime = data.start_time + std::chrono::seconds(secondsToRun); slot = 0; locker.lock(); - while (secondsToRun && mono_clock::now() < stopTime) { + while (data.finished < data.started) { bool found = false; while (1) { int old_slot = slot; @@ -491,12 +491,6 @@ int ObjBencher::write_bench(int secondsToRun, lc.cond.wait(locker); } locker.unlock(); - //create new contents and name on the heap, and fill them - newName = generate_object_name_fast(data.started / writes_per_object); - newContents = contents[slot].get(); - snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started); - // we wrote to buffer, going around internal crc cache, so invalidate it now. - newContents->invalidate_crc(); completion_wait(slot); locker.lock(); @@ -519,7 +513,28 @@ int ObjBencher::write_bench(int secondsToRun, locker.unlock(); release_completion(slot); + if (!secondsToRun || mono_clock::now() >= stopTime) { + locker.lock(); + continue; + } + + if (data.op_size && max_objects && + data.started >= + (int)((data.object_size * max_objects + data.op_size - 1) / + data.op_size)) { + locker.lock(); + continue; + } + //write new stuff to backend + + //create new contents and name on the heap, and fill them + newName = generate_object_name_fast(data.started / writes_per_object); + newContents = contents[slot].get(); + snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started); + // we wrote to buffer, going around internal crc cache, so invalidate it now. + newContents->invalidate_crc(); + start_times[slot] = mono_clock::now(); r = create_completion(slot, _aio_cb, &lc); if (r < 0) @@ -533,39 +548,9 @@ int ObjBencher::write_bench(int secondsToRun, locker.lock(); ++data.started; ++data.in_flight; - if (data.op_size) { - if (max_objects && - data.started >= (int)((data.object_size * max_objects + data.op_size - 1) / - data.op_size)) - break; - } } locker.unlock(); - while (data.finished < data.started) { - slot = data.finished % concurrentios; - completion_wait(slot); - locker.lock(); - r = completion_ret(slot); - if (r != 0) { - locker.unlock(); - goto ERR; - } - data.cur_latency = mono_clock::now() - start_times[slot]; - total_latency += data.cur_latency.count(); - if (data.cur_latency.count() > data.max_latency) - data.max_latency = data.cur_latency.count(); - if (data.cur_latency.count() < data.min_latency) - data.min_latency = data.cur_latency.count(); - ++data.finished; - double delta = data.cur_latency.count() - data.avg_latency; - data.avg_latency = total_latency / data.finished; - data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency); - --data.in_flight; - locker.unlock(); - release_completion(slot); - } - timePassed = mono_clock::now() - data.start_time; locker.lock(); data.done = true; @@ -601,7 +586,7 @@ int ObjBencher::write_bench(int secondsToRun, out(cout) << "Total time run: " << timePassed.count() << std::endl << "Total writes made: " << data.finished << std::endl << "Write size: " << data.op_size << std::endl - << "Object size: " << data.object_size << std::endl + << "Object size: " << data.object_size << std::endl << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl << "Stddev Bandwidth: " << bandwidth_stddev << std::endl << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl @@ -659,7 +644,7 @@ int ObjBencher::seq_read_bench( lock_cond lc(&lock); - if (concurrentios <= 0) + if (concurrentios <= 0) return -EINVAL; std::vector name(concurrentios); @@ -721,8 +706,7 @@ int ObjBencher::seq_read_bench( bufferlist *cur_contents; slot = 0; - while ((seconds_to_run && mono_clock::now() < finish_time) && - num_ops > data.started) { + while (data.finished < data.started) { locker.lock(); int old_slot = slot; bool found = false; @@ -748,21 +732,26 @@ int ObjBencher::seq_read_bench( cur_contents = contents[slot].get(); int current_index = index[slot]; - + // invalidate internal crc cache cur_contents->invalidate_crc(); - + if (!no_verify) { snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index); - if ( (cur_contents->length() != data.op_size) || + if ( (cur_contents->length() != data.op_size) || (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) { cerr << name[slot] << " is not correct!" << std::endl; ++errors; } } - newName = generate_object_name_fast(data.started / reads_per_object, pid); - index[slot] = data.started; + bool start_new_read = (seconds_to_run && mono_clock::now() < finish_time) && + num_ops > data.started; + if (start_new_read) { + newName = generate_object_name_fast(data.started / reads_per_object, pid); + index[slot] = data.started; + } + locker.unlock(); completion_wait(slot); locker.lock(); @@ -783,6 +772,9 @@ int ObjBencher::seq_read_bench( locker.unlock(); release_completion(slot); + if (!start_new_read) + continue; + //start new read and check data if requested start_times[slot] = mono_clock::now(); create_completion(slot, _aio_cb, (void *)&lc); @@ -798,40 +790,6 @@ int ObjBencher::seq_read_bench( name[slot] = newName; } - //wait for final reads to complete - while (data.finished < data.started) { - slot = data.finished % concurrentios; - completion_wait(slot); - locker.lock(); - r = completion_ret(slot); - if (r < 0) { - cerr << "read got " << r << std::endl; - locker.unlock(); - goto ERR; - } - data.cur_latency = mono_clock::now() - start_times[slot]; - total_latency += data.cur_latency.count(); - if (data.cur_latency.count() > data.max_latency) - data.max_latency = data.cur_latency.count(); - if (data.cur_latency.count() < data.min_latency) - data.min_latency = data.cur_latency.count(); - ++data.finished; - data.avg_latency = total_latency / data.finished; - --data.in_flight; - release_completion(slot); - if (!no_verify) { - snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]); - locker.unlock(); - if ((contents[slot]->length() != data.op_size) || - (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) { - cerr << name[slot] << " is not correct!" << std::endl; - ++errors; - } - } else { - locker.unlock(); - } - } - timePassed = mono_clock::now() - data.start_time; locker.lock(); data.done = true; @@ -842,7 +800,7 @@ int ObjBencher::seq_read_bench( double bandwidth; bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count(); bandwidth = bandwidth/(1024*1024); // we want it in MB/sec - + double iops_stddev; if (data.idata.iops_cycles > 1) { iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1)); @@ -961,7 +919,7 @@ int ObjBencher::rand_read_bench( int rand_id; slot = 0; - while ((seconds_to_run && mono_clock::now() < finish_time)) { + while (data.finished < data.started) { locker.lock(); int old_slot = slot; bool found = false; @@ -1006,26 +964,31 @@ int ObjBencher::rand_read_bench( ++data.finished; data.avg_latency = total_latency / data.finished; --data.in_flight; - locker.unlock(); - + if (!no_verify) { snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index); - if ((cur_contents->length() != data.op_size) || + if ((cur_contents->length() != data.op_size) || (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) { cerr << name[slot] << " is not correct!" << std::endl; ++errors; } - } + } + + locker.unlock(); + release_completion(slot); + + if (!seconds_to_run || mono_clock::now() >= finish_time) + continue; + + //start new read and check data if requested rand_id = rand() % num_ops; newName = generate_object_name_fast(rand_id / reads_per_object, pid); index[slot] = rand_id; - release_completion(slot); // invalidate internal crc cache cur_contents->invalidate_crc(); - //start new read and check data if requested start_times[slot] = mono_clock::now(); create_completion(slot, _aio_cb, (void *)&lc); r = aio_read(newName, slot, contents[slot].get(), data.op_size, @@ -1040,41 +1003,6 @@ int ObjBencher::rand_read_bench( name[slot] = newName; } - - //wait for final reads to complete - while (data.finished < data.started) { - slot = data.finished % concurrentios; - completion_wait(slot); - locker.lock(); - r = completion_ret(slot); - if (r < 0) { - cerr << "read got " << r << std::endl; - locker.unlock(); - goto ERR; - } - data.cur_latency = mono_clock::now() - start_times[slot]; - total_latency += data.cur_latency.count(); - if (data.cur_latency.count() > data.max_latency) - data.max_latency = data.cur_latency.count(); - if (data.cur_latency.count() < data.min_latency) - data.min_latency = data.cur_latency.count(); - ++data.finished; - data.avg_latency = total_latency / data.finished; - --data.in_flight; - release_completion(slot); - if (!no_verify) { - snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]); - locker.unlock(); - if ((contents[slot]->length() != data.op_size) || - (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) { - cerr << name[slot] << " is not correct!" << std::endl; - ++errors; - } - } else { - locker.unlock(); - } - } - timePassed = mono_clock::now() - data.start_time; locker.lock(); data.done = true; @@ -1085,7 +1013,7 @@ int ObjBencher::rand_read_bench( double bandwidth; bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count(); bandwidth = bandwidth/(1024*1024); // we want it in MB/sec - + double iops_stddev; if (data.idata.iops_cycles > 1) { iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1)); @@ -1199,8 +1127,8 @@ int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, cons int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) { lock_cond lc(&lock); - - if (concurrentios <= 0) + + if (concurrentios <= 0) return -EINVAL; std::vector name(concurrentios); @@ -1246,7 +1174,7 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) { } //keep on adding new removes as old ones complete - while (data.started < num_objects) { + while (data.finished < data.started) { locker.lock(); int old_slot = slot; bool found = false; @@ -1267,7 +1195,6 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) { lc.cond.wait(locker); } locker.unlock(); - newName = generate_object_name_fast(data.started, prevPid); completion_wait(slot); locker.lock(); r = completion_ret(slot); @@ -1281,7 +1208,11 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) { locker.unlock(); release_completion(slot); + if (data.started >= num_objects) + continue; + //start new remove and check data if requested + newName = generate_object_name_fast(data.started, prevPid); create_completion(slot, _aio_cb, (void *)&lc); r = aio_remove(newName, slot); if (r < 0) { @@ -1294,23 +1225,6 @@ int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) { name[slot] = newName; } - //wait for final removes to complete - while (data.finished < data.started) { - slot = data.finished % concurrentios; - completion_wait(slot); - locker.lock(); - r = completion_ret(slot); - if (r != 0 && r != -ENOENT) { // file does not exist - cerr << "remove got " << r << std::endl; - locker.unlock(); - goto ERR; - } - ++data.finished; - --data.in_flight; - release_completion(slot); - locker.unlock(); - } - locker.lock(); data.done = true; locker.unlock(); diff --git a/src/tools/rados/rados.cc b/src/tools/rados/rados.cc index 2a95a07b47112..88b1d89852e4b 100644 --- a/src/tools/rados/rados.cc +++ b/src/tools/rados/rados.cc @@ -1115,7 +1115,7 @@ protected: } bool completion_is_done(int slot) override { - return completions[slot]->is_complete(); + return completions[slot] && completions[slot]->is_complete(); } int completion_wait(int slot) override { -- 2.39.5