From cae10830c7ee09303a7dcc8a32070347a0129dfd Mon Sep 17 00:00:00 2001 From: danchai Date: Tue, 29 Oct 2013 18:10:48 +0800 Subject: [PATCH] ObjBencher: add rand_read_bench functions to support rand test in rados-bench Signed-off-by: Tengwei Cai --- src/common/obj_bencher.cc | 191 +++++++++++++++++++++++++++++++++++++- src/common/obj_bencher.h | 1 + 2 files changed, 190 insertions(+), 2 deletions(-) diff --git a/src/common/obj_bencher.cc b/src/common/obj_bencher.cc index 599cbbc62fb2..d3c3b44692cf 100644 --- a/src/common/obj_bencher.cc +++ b/src/common/obj_bencher.cc @@ -214,8 +214,8 @@ int ObjBencher::aio_bench( if (r != 0) goto out; } else if (OP_RAND_READ == operation) { - cerr << "Random test not implemented yet!" << std::endl; - r = -1; + r = rand_read_bench(secondsToRun, num_objects, concurrentios, prevPid); + if (r != 0) goto out; } if (OP_WRITE == operation && cleanup) { @@ -683,6 +683,193 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre return -5; } +int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) +{ + lock_cond lc(&lock); + std::vector name(concurrentios); + std::string newName; + bufferlist* contents[concurrentios]; + int index[concurrentios]; + int errors = 0; + utime_t start_time; + std::vector start_times(concurrentios); + utime_t time_to_run; + time_to_run.set_from_double(seconds_to_run); + double total_latency = 0; + int r = 0; + utime_t runtime; + sanitize_object_contents(&data, data.object_size); //clean it up once; subsequent + //changes will be safe because string length should remain the same + + srand (time(NULL)); + + r = completions_init(concurrentios); + if (r < 0) + return r; + + //set up initial reads + for (int i = 0; i < concurrentios; ++i) { + name[i] = generate_object_name(i, pid); + contents[i] = new bufferlist(); + } + + lock.Lock(); + data.finished = 0; + data.start_time = ceph_clock_now(g_ceph_context); + lock.Unlock(); + + pthread_t print_thread; + pthread_create(&print_thread, NULL, status_printer, (void *)this); + + utime_t finish_time = data.start_time + time_to_run; + //start initial reads + for (int i = 0; i < concurrentios; ++i) { + index[i] = i; + start_times[i] = ceph_clock_now(g_ceph_context); + create_completion(i, _aio_cb, (void *)&lc); + r = aio_read(name[i], i, contents[i], data.object_size); + if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread! + cerr << "r = " << r << std::endl; + goto ERR; + } + lock.Lock(); + ++data.started; + ++data.in_flight; + lock.Unlock(); + } + + //keep on adding new reads as old ones complete + int slot; + bufferlist *cur_contents; + int rand_id; + + slot = 0; + while (seconds_to_run && (ceph_clock_now(g_ceph_context) < finish_time)) { + lock.Lock(); + int old_slot = slot; + bool found = false; + while (1) { + do { + if (completion_is_done(slot)) { + found = true; + break; + } + slot++; + if (slot == concurrentios) { + slot = 0; + } + } while (slot != old_slot); + if (found) { + break; + } + lc.cond.Wait(lock); + } + lock.Unlock(); + rand_id = rand() % num_objects; + newName = generate_object_name(rand_id, pid); + int current_index = index[slot]; + index[slot] = rand_id; + completion_wait(slot); + lock.Lock(); + r = completion_ret(slot); + if (r != 0) { + cerr << "read got " << r << std::endl; + lock.Unlock(); + goto ERR; + } + data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; + total_latency += data.cur_latency; + if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; + if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; + ++data.finished; + data.avg_latency = total_latency / data.finished; + --data.in_flight; + lock.Unlock(); + release_completion(slot); + cur_contents = contents[slot]; + + //start new read and check data if requested + start_times[slot] = ceph_clock_now(g_ceph_context); + contents[slot] = new bufferlist(); + create_completion(slot, _aio_cb, (void *)&lc); + r = aio_read(newName, slot, contents[slot], data.object_size); + if (r < 0) { + goto ERR; + } + lock.Lock(); + ++data.started; + ++data.in_flight; + snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", current_index); + lock.Unlock(); + if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) { + cerr << name[slot] << " is not correct!" << std::endl; + ++errors; + } + name[slot] = newName; + delete cur_contents; + } + + //wait for final reads to complete + while (data.finished < data.started) { + slot = data.finished % concurrentios; + completion_wait(slot); + lock.Lock(); + r = completion_ret(slot); + if (r != 0) { + cerr << "read got " << r << std::endl; + lock.Unlock(); + goto ERR; + } + data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; + total_latency += data.cur_latency; + if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; + if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; + ++data.finished; + data.avg_latency = total_latency / data.finished; + --data.in_flight; + release_completion(slot); + snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", index[slot]); + lock.Unlock(); + if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) { + cerr << name[slot] << " is not correct!" << std::endl; + ++errors; + } + delete contents[slot]; + } + + runtime = ceph_clock_now(g_ceph_context) - data.start_time; + lock.Lock(); + data.done = true; + lock.Unlock(); + + pthread_join(print_thread, NULL); + + double bandwidth; + bandwidth = ((double)data.finished)*((double)data.object_size)/(double)runtime; + bandwidth = bandwidth/(1024*1024); // we want it in MB/sec + char bw[20]; + snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth); + + out(cout) << "Total time run: " << runtime << std::endl + << "Total reads made: " << data.finished << std::endl + << "Read size: " << data.object_size << std::endl + << "Bandwidth (MB/sec): " << bw << std::endl + << "Average Latency: " << data.avg_latency << std::endl + << "Max latency: " << data.max_latency << std::endl + << "Min latency: " << data.min_latency << std::endl; + + completions_done(); + + return 0; + + ERR: + lock.Lock(); + data.done = 1; + lock.Unlock(); + pthread_join(print_thread, NULL); + return -5; +} + int ObjBencher::clean_up(const std::string& prefix, int concurrentios) { int r = 0; int object_size; diff --git a/src/common/obj_bencher.h b/src/common/obj_bencher.h index c8f671f8c908..b87821a7d5af 100644 --- a/src/common/obj_bencher.h +++ b/src/common/obj_bencher.h @@ -65,6 +65,7 @@ protected: int write_bench(int secondsToRun, int maxObjects, int concurrentios); int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid); + int rand_read_bench(int secondsToRun, int num_objects, int concurrentios, int writePid); int clean_up(int num_objects, int prevPid, int concurrentios); int clean_up_slow(const std::string& prefix, int concurrentios); -- 2.47.3