From: Yehuda Sadeh Date: Fri, 20 Apr 2012 00:01:48 +0000 (-0700) Subject: rados_bencher: restructure code (initial work) X-Git-Tag: v0.47~23^2~25 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ddb858c5db9a78c293f5630c79b81191bdffe13f;p=ceph.git rados_bencher: restructure code (initial work) Signed-off-by: Yehuda Sadeh --- diff --git a/src/osdc/rados_bencher.h b/src/osdc/rados_bencher.h index 3f0e3802a84b..7800cc99df66 100644 --- a/src/osdc/rados_bencher.h +++ b/src/osdc/rados_bencher.h @@ -18,6 +18,7 @@ #include "common/config.h" #include "global/global_init.h" #include "common/Cond.h" +#include "rados_bencher.h" #include #include @@ -25,29 +26,10 @@ #include #include -Mutex dataLock("data mutex"); -const int OP_WRITE = 1; -const int OP_SEQ_READ = 2; -const int OP_RAND_READ = 3; -const char *BENCH_DATA = "benchmark_write_data"; -struct bench_data { - bool done; //is the benchmark is done - int object_size; //the size of the objects - int trans_size; //size of the write/read to perform - // same as object_size for write tests - int in_flight; //number of reads/writes being waited on - int started; - int finished; - double min_latency; - double max_latency; - double avg_latency; - utime_t cur_latency; //latency of last completed transaction - utime_t start_time; //start time for benchmark - char *object_contents; //pointer to the contents written to each object -}; +const char *BENCH_DATA = "benchmark_write_data"; -void generate_object_name(char *s, size_t size, int objnum, int pid = 0) +static void generate_object_name(char *s, size_t size, int objnum, int pid = 0) { char hostname[30]; gethostname(hostname, sizeof(hostname)-1); @@ -59,16 +41,80 @@ void generate_object_name(char *s, size_t size, int objnum, int pid = 0) } } -int write_bench(librados::Rados& rados, librados::IoCtx& io_ctx, - int secondsToRun, int concurrentios, bench_data *data); -int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, - int secondsToRun, int concurrentios, bench_data *data, - int writePid); -void *status_printer(void * data_store); -void sanitize_object_contents(bench_data *data, int length); +static void sanitize_object_contents (bench_data *data, int length) { + for (int i = 0; i < length; ++i) { + data->object_contents[i] = i % sizeof(char); + } +} -int aio_bench(librados::Rados& rados, librados::IoCtx &io_ctx, int operation, - int secondsToRun, int concurrentios, int op_size) { +void *RadosBencher::status_printer(void *_bencher) { + RadosBencher *bencher = (RadosBencher *)_bencher; + bench_data& data = bencher->data; + Cond cond; + int i = 0; + int previous_writes = 0; + int cycleSinceChange = 0; + double avg_bandwidth; + double bandwidth; + utime_t ONE_SECOND; + ONE_SECOND.set_from_double(1.0); + bencher->lock.Lock(); + while(!data.done) { + if (i % 20 == 0) { + if (i > 0) + cout << "min lat: " << data.min_latency + << " max lat: " << data.max_latency + << " avg lat: " << data.avg_latency << std::endl; + //I'm naughty and don't reset the fill + cout << setfill(' ') + << setw(5) << "sec" + << setw(8) << "Cur ops" + << setw(10) << "started" + << setw(10) << "finished" + << setw(10) << "avg MB/s" + << setw(10) << "cur MB/s" + << setw(10) << "last lat" + << setw(10) << "avg lat" << std::endl; + } + bandwidth = (double)(data.finished - previous_writes) + * (data.trans_size) + / (1024*1024) + / cycleSinceChange; + avg_bandwidth = (double) (data.trans_size) * (data.finished) + / (double)(ceph_clock_now(g_ceph_context) - data.start_time) / (1024*1024); + if (previous_writes != data.finished) { + previous_writes = data.finished; + cycleSinceChange = 0; + cout << setfill(' ') + << setw(5) << i + << setw(8) << data.in_flight + << setw(10) << data.started + << setw(10) << data.finished + << setw(10) << avg_bandwidth + << setw(10) << bandwidth + << setw(10) << (double)data.cur_latency + << setw(10) << data.avg_latency << std::endl; + } + else { + cout << setfill(' ') + << setw(5) << i + << setw(8) << data.in_flight + << setw(10) << data.started + << setw(10) << data.finished + << setw(10) << avg_bandwidth + << setw(10) << '0' + << setw(10) << '-' + << setw(10) << data.avg_latency << std::endl; + } + ++i; + ++cycleSinceChange; + cond.WaitInterval(g_ceph_context, bencher->lock, ONE_SECOND); + } + bencher->lock.Unlock(); + return NULL; +} + +int RadosBencher::aio_bench(int operation, int secondsToRun, int concurrentios, int op_size) { int object_size = op_size; int num_objects = 0; char* contentsChars = new char[op_size]; @@ -93,29 +139,28 @@ int aio_bench(librados::Rados& rados, librados::IoCtx &io_ctx, int operation, object_size = op_size; } - dataLock.Lock(); - bench_data *data = new bench_data(); - data->done = false; - data->object_size = object_size; - data->trans_size = op_size; - data->in_flight = 0; - data->started = 0; - data->finished = num_objects; - data->min_latency = 9999.0; // this better be higher than initial latency! - data->max_latency = 0; - data->avg_latency = 0; - data->object_contents = contentsChars; - dataLock.Unlock(); + lock.Lock(); + data.done = false; + data.object_size = object_size; + data.trans_size = op_size; + data.in_flight = 0; + data.started = 0; + data.finished = num_objects; + data.min_latency = 9999.0; // this better be higher than initial latency! + data.max_latency = 0; + data.avg_latency = 0; + data.object_contents = contentsChars; + lock.Unlock(); //fill in contentsChars deterministically so we can check returns - sanitize_object_contents(data, data->object_size); + sanitize_object_contents(&data, data.object_size); if (OP_WRITE == operation) { - r = write_bench(rados, io_ctx, secondsToRun, concurrentios, data); + r = write_bench(secondsToRun, concurrentios); if (r != 0) goto out; } else if (OP_SEQ_READ == operation) { - r = seq_read_bench(rados, io_ctx, secondsToRun, concurrentios, data, prevPid); + r = seq_read_bench(secondsToRun, concurrentios, num_objects, prevPid); if (r != 0) goto out; } else if (OP_RAND_READ == operation) { @@ -125,21 +170,25 @@ int aio_bench(librados::Rados& rados, librados::IoCtx &io_ctx, int operation, out: delete[] contentsChars; - delete data; return r; } +struct lock_cond { + lock_cond(Mutex *_lock) : lock(_lock) {} + Mutex *lock; + Cond cond; +}; + void _aio_cb(void *cb, void *arg) { - dataLock.Lock(); - Cond *cond = (Cond *) arg; - cond->Signal(); - dataLock.Unlock(); + struct lock_cond *lc = (struct lock_cond *)arg; + lc->lock->Lock(); + lc->cond.Signal(); + lc->lock->Unlock(); } -int write_bench(librados::Rados& rados, librados::IoCtx& io_ctx, - int secondsToRun, int concurrentios, bench_data *data) { +int RadosBencher::write_bench(int secondsToRun, int concurrentios) { cout << "Maintaining " << concurrentios << " concurrent writes of " - << data->object_size << " bytes for at least " + << data.object_size << " bytes for at least " << secondsToRun << " seconds." << std::endl; librados::AioCompletion* completions[concurrentios]; @@ -150,7 +199,7 @@ int write_bench(librados::Rados& rados, librados::IoCtx& io_ctx, utime_t stopTime; int r = 0; bufferlist b_write; - Cond cond; + lock_cond lc(&lock); utime_t runtime; utime_t timePassed; @@ -159,28 +208,28 @@ int write_bench(librados::Rados& rados, librados::IoCtx& io_ctx, name[i] = new char[128]; contents[i] = new bufferlist(); generate_object_name(name[i], 128, i); - snprintf(data->object_contents, data->object_size, "I'm the %dth object!", i); - contents[i]->append(data->object_contents, data->object_size); + snprintf(data.object_contents, data.object_size, "I'm the %dth object!", i); + contents[i]->append(data.object_contents, data.object_size); } pthread_t print_thread; - pthread_create(&print_thread, NULL, status_printer, (void *)data); - dataLock.Lock(); - data->start_time = ceph_clock_now(g_ceph_context); - dataLock.Unlock(); + pthread_create(&print_thread, NULL, RadosBencher::status_printer, (void *)this); + lock.Lock(); + data.start_time = ceph_clock_now(g_ceph_context); + lock.Unlock(); for (int i = 0; iobject_size, 0); + r = io_ctx.aio_write(name[i], completions[i], *contents[i], data.object_size, 0); if (r < 0) { //naughty, doesn't clean up heap goto ERR; } - dataLock.Lock(); - ++data->started; - ++data->in_flight; - dataLock.Unlock(); + lock.Lock(); + ++data.started; + ++data.in_flight; + lock.Unlock(); } //keep on adding new writes as old ones complete until we've passed minimum time @@ -191,9 +240,9 @@ int write_bench(librados::Rados& rados, librados::IoCtx& io_ctx, //don't need locking for reads because other thread doesn't write runtime.set_from_double(secondsToRun); - stopTime = data->start_time + runtime; + stopTime = data.start_time + runtime; while( ceph_clock_now(g_ceph_context) < stopTime ) { - dataLock.Lock(); + lock.Lock(); while (1) { for (slot = 0; slot < concurrentios; ++slot) { if (completions[slot]->is_safe()) { @@ -203,126 +252,115 @@ int write_bench(librados::Rados& rados, librados::IoCtx& io_ctx, if (slot < concurrentios) { break; } - cond.Wait(dataLock); + lc.cond.Wait(lock); } - dataLock.Unlock(); + lock.Unlock(); //create new contents and name on the heap, and fill them newContents = new bufferlist(); newName = new char[128]; - generate_object_name(newName, 128, data->started); - snprintf(data->object_contents, data->object_size, "I'm the %dth object!", data->started); - newContents->append(data->object_contents, data->object_size); + generate_object_name(newName, 128, data.started); + snprintf(data.object_contents, data.object_size, "I'm the %dth object!", data.started); + newContents->append(data.object_contents, data.object_size); completions[slot]->wait_for_safe(); - dataLock.Lock(); + lock.Lock(); r = completions[slot]->get_return_value(); if (r != 0) { - dataLock.Unlock(); + lock.Unlock(); goto ERR; } - data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; - total_latency += data->cur_latency; - if( data->cur_latency > data->max_latency) data->max_latency = data->cur_latency; - if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency; - ++data->finished; - data->avg_latency = total_latency / data->finished; - --data->in_flight; - dataLock.Unlock(); + data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; + total_latency += data.cur_latency; + if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; + if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; + ++data.finished; + data.avg_latency = total_latency / data.finished; + --data.in_flight; + lock.Unlock(); completions[slot]->release(); completions[slot] = 0; - timePassed = ceph_clock_now(g_ceph_context) - data->start_time; + timePassed = ceph_clock_now(g_ceph_context) - data.start_time; //write new stuff to rados, then delete old stuff //and save locations of new stuff for later deletion start_times[slot] = ceph_clock_now(g_ceph_context); - completions[slot] = rados.aio_create_completion((void *) &cond, 0, &_aio_cb); - r = io_ctx.aio_write(newName, completions[slot], *newContents, data->object_size, 0); + completions[slot] = rados.aio_create_completion((void *) &lc, 0, &_aio_cb); + r = io_ctx.aio_write(newName, completions[slot], *newContents, data.object_size, 0); if (r < 0) {//naughty; doesn't clean up heap space. goto ERR; } - dataLock.Lock(); - ++data->started; - ++data->in_flight; - dataLock.Unlock(); + lock.Lock(); + ++data.started; + ++data.in_flight; + lock.Unlock(); delete[] name[slot]; delete contents[slot]; name[slot] = newName; contents[slot] = newContents; } - while (data->finished < data->started) { - slot = data->finished % concurrentios; + while (data.finished < data.started) { + slot = data.finished % concurrentios; completions[slot]->wait_for_safe(); - dataLock.Lock(); + lock.Lock(); r = completions[slot]->get_return_value(); if (r != 0) { - dataLock.Unlock(); + lock.Unlock(); goto ERR; } - data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; - total_latency += data->cur_latency; - if (data->cur_latency > data->max_latency) data->max_latency = data->cur_latency; - if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency; - ++data->finished; - data->avg_latency = total_latency / data->finished; - --data->in_flight; - dataLock.Unlock(); + data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; + total_latency += data.cur_latency; + if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; + if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; + ++data.finished; + data.avg_latency = total_latency / data.finished; + --data.in_flight; + lock.Unlock(); completions[slot]->release(); completions[slot] = 0; delete[] name[slot]; delete contents[slot]; } - timePassed = ceph_clock_now(g_ceph_context) - data->start_time; - dataLock.Lock(); - data->done = true; - dataLock.Unlock(); + timePassed = ceph_clock_now(g_ceph_context) - data.start_time; + lock.Lock(); + data.done = true; + lock.Unlock(); pthread_join(print_thread, NULL); double bandwidth; - bandwidth = ((double)data->finished)*((double)data->object_size)/(double)timePassed; + bandwidth = ((double)data.finished)*((double)data.object_size)/(double)timePassed; bandwidth = bandwidth/(1024*1024); // we want it in MB/sec char bw[20]; snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth); cout << "Total time run: " << timePassed << std::endl - << "Total writes made: " << data->finished << std::endl - << "Write size: " << data->object_size << std::endl + << "Total writes made: " << data.finished << std::endl + << "Write size: " << data.object_size << std::endl << "Bandwidth (MB/sec): " << bw << std::endl - << "Average Latency: " << data->avg_latency << std::endl - << "Max latency: " << data->max_latency << std::endl - << "Min latency: " << data->min_latency << std::endl; + << "Average Latency: " << data.avg_latency << std::endl + << "Max latency: " << data.max_latency << std::endl + << "Min latency: " << data.min_latency << std::endl; //write object size/number data for read benchmarks - ::encode(data->object_size, b_write); - ::encode(data->finished, b_write); + ::encode(data.object_size, b_write); + ::encode(data.finished, b_write); ::encode(getpid(), b_write); io_ctx.write(BENCH_DATA, b_write, sizeof(int)*3, 0); return 0; ERR: - dataLock.Lock(); - data->done = 1; - dataLock.Unlock(); + lock.Lock(); + data.done = 1; + lock.Unlock(); pthread_join(print_thread, NULL); return -5; } -int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, int seconds_to_run, - int concurrentios, bench_data *write_data, int pid) { - bench_data *data = new bench_data(); - data->done = false; - data->object_size = write_data->object_size; - data->trans_size = data->object_size; - data->in_flight= 0; - data->started = 0; - data->finished = 0; - data->min_latency = 9999.0; - data->max_latency = 0; - data->avg_latency = 0; - data->object_contents = write_data->object_contents; +int RadosBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) { + data.finished = 0; - Cond cond; + lock_cond lc(&lock); librados::AioCompletion* completions[concurrentios]; char* name[concurrentios]; bufferlist* contents[concurrentios]; @@ -335,7 +373,7 @@ int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, int seconds_ double total_latency = 0; int r = 0; utime_t runtime; - sanitize_object_contents(data, 128); //clean it up once; subsequent + sanitize_object_contents(&data, 128); //clean it up once; subsequent //changes will be safe because string length monotonically increases //set up initial reads @@ -346,26 +384,26 @@ int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, int seconds_ } pthread_t print_thread; - pthread_create(&print_thread, NULL, status_printer, (void *)data); + pthread_create(&print_thread, NULL, status_printer, (void *)this); - dataLock.Lock(); - data->start_time = ceph_clock_now(g_ceph_context); - dataLock.Unlock(); - utime_t finish_time = data->start_time + time_to_run; + lock.Lock(); + data.start_time = ceph_clock_now(g_ceph_context); + lock.Unlock(); + utime_t finish_time = data.start_time + time_to_run; //start initial reads for (int i = 0; i < concurrentios; ++i) { index[i] = i; start_times[i] = ceph_clock_now(g_ceph_context); - completions[i] = rados.aio_create_completion((void *) &cond, &_aio_cb, 0); - r = io_ctx.aio_read(name[i], completions[i], contents[i], data->object_size, 0); + completions[i] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0); + r = io_ctx.aio_read(name[i], completions[i], contents[i], data.object_size, 0); if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread! cerr << "r = " << r << std::endl; goto ERR; } - dataLock.Lock(); - ++data->started; - ++data->in_flight; - dataLock.Unlock(); + lock.Lock(); + ++data.started; + ++data.in_flight; + lock.Unlock(); } //keep on adding new reads as old ones complete @@ -374,8 +412,8 @@ int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, int seconds_ bufferlist *cur_contents; while (seconds_to_run && (ceph_clock_now(g_ceph_context) < finish_time) && - write_data->finished > data->started) { - dataLock.Lock(); + num_objects > data.started) { + lock.Lock(); while (1) { for (slot = 0; slot < concurrentios; ++slot) { if (completions[slot]->is_complete()) { @@ -385,29 +423,29 @@ int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, int seconds_ if (slot < concurrentios) { break; } - cond.Wait(dataLock); + lc.cond.Wait(lock); } - dataLock.Unlock(); + lock.Unlock(); newName = new char[128]; - generate_object_name(newName, 128, data->started, pid); + generate_object_name(newName, 128, data.started, pid); int current_index = index[slot]; - index[slot] = data->started; + index[slot] = data.started; completions[slot]->wait_for_complete(); - dataLock.Lock(); + lock.Lock(); r = completions[slot]->get_return_value(); if (r != 0) { cerr << "read got " << r << std::endl; - dataLock.Unlock(); + lock.Unlock(); goto ERR; } - data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; - total_latency += data->cur_latency; - if( data->cur_latency > data->max_latency) data->max_latency = data->cur_latency; - if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency; - ++data->finished; - data->avg_latency = total_latency / data->finished; - --data->in_flight; - dataLock.Unlock(); + data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; + total_latency += data.cur_latency; + if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; + if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; + ++data.finished; + data.avg_latency = total_latency / data.finished; + --data.in_flight; + lock.Unlock(); completions[slot]->release(); completions[slot] = 0; cur_contents = contents[slot]; @@ -415,17 +453,17 @@ int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, int seconds_ //start new read and check data if requested start_times[slot] = ceph_clock_now(g_ceph_context); contents[slot] = new bufferlist(); - completions[slot] = rados.aio_create_completion((void *) &cond, &_aio_cb, 0); - r = io_ctx.aio_read(newName, completions[slot], contents[slot], data->object_size, 0); + completions[slot] = rados.aio_create_completion((void *) &lc, &_aio_cb, 0); + r = io_ctx.aio_read(newName, completions[slot], contents[slot], data.object_size, 0); if (r < 0) { goto ERR; } - dataLock.Lock(); - ++data->started; - ++data->in_flight; - snprintf(data->object_contents, data->object_size, "I'm the %dth object!", current_index); - dataLock.Unlock(); - if (memcmp(data->object_contents, cur_contents->c_str(), data->object_size) != 0) { + lock.Lock(); + ++data.started; + ++data.in_flight; + snprintf(data.object_contents, data.object_size, "I'm the %dth object!", current_index); + lock.Unlock(); + if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) { cerr << name[slot] << " is not correct!" << std::endl; ++errors; } @@ -435,28 +473,28 @@ int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, int seconds_ } //wait for final reads to complete - while (data->finished < data->started) { - slot = data->finished % concurrentios; + while (data.finished < data.started) { + slot = data.finished % concurrentios; completions[slot]->wait_for_complete(); - dataLock.Lock(); + lock.Lock(); r = completions[slot]->get_return_value(); if (r != 0) { cerr << "read got " << r << std::endl; - dataLock.Unlock(); + lock.Unlock(); goto ERR; } - data->cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; - total_latency += data->cur_latency; - if (data->cur_latency > data->max_latency) data->max_latency = data->cur_latency; - if (data->cur_latency < data->min_latency) data->min_latency = data->cur_latency; - ++data->finished; - data->avg_latency = total_latency / data->finished; - --data->in_flight; + data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot]; + total_latency += data.cur_latency; + if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; + if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; + ++data.finished; + data.avg_latency = total_latency / data.finished; + --data.in_flight; completions[slot]-> release(); completions[slot] = 0; - snprintf(data->object_contents, data->object_size, "I'm the %dth object!", index[slot]); - dataLock.Unlock(); - if (memcmp(data->object_contents, contents[slot]->c_str(), data->object_size) != 0) { + snprintf(data.object_contents, data.object_size, "I'm the %dth object!", index[slot]); + lock.Unlock(); + if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) { cerr << name[slot] << " is not correct!" << std::endl; ++errors; } @@ -464,108 +502,35 @@ int seq_read_bench(librados::Rados& rados, librados::IoCtx& io_ctx, int seconds_ delete contents[slot]; } - runtime = ceph_clock_now(g_ceph_context) - data->start_time; - dataLock.Lock(); - data->done = true; - dataLock.Unlock(); + runtime = ceph_clock_now(g_ceph_context) - data.start_time; + lock.Lock(); + data.done = true; + lock.Unlock(); pthread_join(print_thread, NULL); double bandwidth; - bandwidth = ((double)data->finished)*((double)data->object_size)/(double)runtime; + bandwidth = ((double)data.finished)*((double)data.object_size)/(double)runtime; bandwidth = bandwidth/(1024*1024); // we want it in MB/sec char bw[20]; snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth); cout << "Total time run: " << runtime << std::endl - << "Total reads made: " << data->finished << std::endl - << "Read size: " << data->object_size << std::endl + << "Total reads made: " << data.finished << std::endl + << "Read size: " << data.object_size << std::endl << "Bandwidth (MB/sec): " << bw << std::endl - << "Average Latency: " << data->avg_latency << std::endl - << "Max latency: " << data->max_latency << std::endl - << "Min latency: " << data->min_latency << std::endl; + << "Average Latency: " << data.avg_latency << std::endl + << "Max latency: " << data.max_latency << std::endl + << "Min latency: " << data.min_latency << std::endl; - delete data; return 0; ERR: - dataLock.Lock(); - data->done = 1; - dataLock.Unlock(); + lock.Lock(); + data.done = 1; + lock.Unlock(); pthread_join(print_thread, NULL); return -5; } - -void *status_printer(void * data_store) { - bench_data *data = (bench_data *) data_store; - Cond cond; - int i = 0; - int previous_writes = 0; - int cycleSinceChange = 0; - double avg_bandwidth; - double bandwidth; - utime_t ONE_SECOND; - ONE_SECOND.set_from_double(1.0); - dataLock.Lock(); - while(!data->done) { - if (i % 20 == 0) { - if (i > 0) - cout << "min lat: " << data->min_latency - << " max lat: " << data->max_latency - << " avg lat: " << data->avg_latency << std::endl; - //I'm naughty and don't reset the fill - cout << setfill(' ') - << setw(5) << "sec" - << setw(8) << "Cur ops" - << setw(10) << "started" - << setw(10) << "finished" - << setw(10) << "avg MB/s" - << setw(10) << "cur MB/s" - << setw(10) << "last lat" - << setw(10) << "avg lat" << std::endl; - } - bandwidth = (double)(data->finished - previous_writes) - * (data->trans_size) - / (1024*1024) - / cycleSinceChange; - avg_bandwidth = (double) (data->trans_size) * (data->finished) - / (double)(ceph_clock_now(g_ceph_context) - data->start_time) / (1024*1024); - if (previous_writes != data->finished) { - previous_writes = data->finished; - cycleSinceChange = 0; - cout << setfill(' ') - << setw(5) << i - << setw(8) << data->in_flight - << setw(10) << data->started - << setw(10) << data->finished - << setw(10) << avg_bandwidth - << setw(10) << bandwidth - << setw(10) << (double)data->cur_latency - << setw(10) << data->avg_latency << std::endl; - } - else { - cout << setfill(' ') - << setw(5) << i - << setw(8) << data->in_flight - << setw(10) << data->started - << setw(10) << data->finished - << setw(10) << avg_bandwidth - << setw(10) << '0' - << setw(10) << '-' - << setw(10) << data->avg_latency << std::endl; - } - ++i; - ++cycleSinceChange; - cond.WaitInterval(g_ceph_context, dataLock, ONE_SECOND); - } - dataLock.Unlock(); - return NULL; -} - -inline void sanitize_object_contents (bench_data *data, int length) { - for (int i = 0; i < length; ++i) { - data->object_contents[i] = i % sizeof(char); - } -}