if (r != 0) goto out;
}
else if (OP_RAND_READ == operation) {
- cerr << "Random test not implemented yet!" << std::endl;
- r = -1;
+ r = rand_read_bench(secondsToRun, num_objects, concurrentios, prevPid);
+ if (r != 0) goto out;
}
if (OP_WRITE == operation && cleanup) {
return -5;
}
+int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid)
+{
+ lock_cond lc(&lock);
+ std::vector<string> name(concurrentios);
+ std::string newName;
+ bufferlist* contents[concurrentios];
+ int index[concurrentios];
+ int errors = 0;
+ utime_t start_time;
+ std::vector<utime_t> start_times(concurrentios);
+ utime_t time_to_run;
+ time_to_run.set_from_double(seconds_to_run);
+ double total_latency = 0;
+ int r = 0;
+ utime_t runtime;
+ sanitize_object_contents(&data, data.object_size); //clean it up once; subsequent
+ //changes will be safe because string length should remain the same
+
+ srand (time(NULL));
+
+ r = completions_init(concurrentios);
+ if (r < 0)
+ return r;
+
+ //set up initial reads
+ for (int i = 0; i < concurrentios; ++i) {
+ name[i] = generate_object_name(i, pid);
+ contents[i] = new bufferlist();
+ }
+
+ lock.Lock();
+ data.finished = 0;
+ data.start_time = ceph_clock_now(g_ceph_context);
+ lock.Unlock();
+
+ pthread_t print_thread;
+ pthread_create(&print_thread, NULL, status_printer, (void *)this);
+
+ utime_t finish_time = data.start_time + time_to_run;
+ //start initial reads
+ for (int i = 0; i < concurrentios; ++i) {
+ index[i] = i;
+ start_times[i] = ceph_clock_now(g_ceph_context);
+ create_completion(i, _aio_cb, (void *)&lc);
+ r = aio_read(name[i], i, contents[i], data.object_size);
+ if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
+ cerr << "r = " << r << std::endl;
+ goto ERR;
+ }
+ lock.Lock();
+ ++data.started;
+ ++data.in_flight;
+ lock.Unlock();
+ }
+
+ //keep on adding new reads as old ones complete
+ int slot;
+ bufferlist *cur_contents;
+ int rand_id;
+
+ slot = 0;
+ while (seconds_to_run && (ceph_clock_now(g_ceph_context) < finish_time)) {
+ lock.Lock();
+ int old_slot = slot;
+ bool found = false;
+ while (1) {
+ do {
+ if (completion_is_done(slot)) {
+ found = true;
+ break;
+ }
+ slot++;
+ if (slot == concurrentios) {
+ slot = 0;
+ }
+ } while (slot != old_slot);
+ if (found) {
+ break;
+ }
+ lc.cond.Wait(lock);
+ }
+ lock.Unlock();
+ rand_id = rand() % num_objects;
+ newName = generate_object_name(rand_id, pid);
+ int current_index = index[slot];
+ index[slot] = rand_id;
+ completion_wait(slot);
+ lock.Lock();
+ r = completion_ret(slot);
+ if (r != 0) {
+ cerr << "read got " << r << std::endl;
+ lock.Unlock();
+ goto ERR;
+ }
+ data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+ total_latency += data.cur_latency;
+ if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+ if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+ ++data.finished;
+ data.avg_latency = total_latency / data.finished;
+ --data.in_flight;
+ lock.Unlock();
+ release_completion(slot);
+ cur_contents = contents[slot];
+
+ //start new read and check data if requested
+ start_times[slot] = ceph_clock_now(g_ceph_context);
+ contents[slot] = new bufferlist();
+ create_completion(slot, _aio_cb, (void *)&lc);
+ r = aio_read(newName, slot, contents[slot], data.object_size);
+ if (r < 0) {
+ goto ERR;
+ }
+ lock.Lock();
+ ++data.started;
+ ++data.in_flight;
+ snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", current_index);
+ lock.Unlock();
+ if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) {
+ cerr << name[slot] << " is not correct!" << std::endl;
+ ++errors;
+ }
+ name[slot] = newName;
+ delete cur_contents;
+ }
+
+ //wait for final reads to complete
+ while (data.finished < data.started) {
+ slot = data.finished % concurrentios;
+ completion_wait(slot);
+ lock.Lock();
+ r = completion_ret(slot);
+ if (r != 0) {
+ cerr << "read got " << r << std::endl;
+ lock.Unlock();
+ goto ERR;
+ }
+ data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];
+ total_latency += data.cur_latency;
+ if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
+ if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
+ ++data.finished;
+ data.avg_latency = total_latency / data.finished;
+ --data.in_flight;
+ release_completion(slot);
+ snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", index[slot]);
+ lock.Unlock();
+ if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
+ cerr << name[slot] << " is not correct!" << std::endl;
+ ++errors;
+ }
+ delete contents[slot];
+ }
+
+ runtime = ceph_clock_now(g_ceph_context) - data.start_time;
+ lock.Lock();
+ data.done = true;
+ lock.Unlock();
+
+ pthread_join(print_thread, NULL);
+
+ double bandwidth;
+ bandwidth = ((double)data.finished)*((double)data.object_size)/(double)runtime;
+ bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
+ char bw[20];
+ snprintf(bw, sizeof(bw), "%.3lf \n", bandwidth);
+
+ out(cout) << "Total time run: " << runtime << std::endl
+ << "Total reads made: " << data.finished << std::endl
+ << "Read size: " << data.object_size << std::endl
+ << "Bandwidth (MB/sec): " << bw << std::endl
+ << "Average Latency: " << data.avg_latency << std::endl
+ << "Max latency: " << data.max_latency << std::endl
+ << "Min latency: " << data.min_latency << std::endl;
+
+ completions_done();
+
+ return 0;
+
+ ERR:
+ lock.Lock();
+ data.done = 1;
+ lock.Unlock();
+ pthread_join(print_thread, NULL);
+ return -5;
+}
+
int ObjBencher::clean_up(const std::string& prefix, int concurrentios) {
int r = 0;
int object_size;