goto out;
}
- r = clean_up(num_objects, prevPid);
+ r = clean_up(num_objects, prevPid, concurrentios);
if (r != 0) goto out;
// lastrun file
return -5;
}
-int ObjBencher::clean_up(int num_objects, int prevPid) {
- int r = 0;
-
- for (int i = 0; i < num_objects; ++i) {
- std::string name = generate_object_name(i, prevPid);
- r = sync_remove(name);
-
- if (r < 0) {
- return r;
- }
- }
-
- return 0;
-}
-
-
-int ObjBencher::clean_up(const std::string& prefix, int concurrent_ios) {
+int ObjBencher::clean_up(const std::string& prefix, int concurrentios) {
int r = 0;
int object_size;
int num_objects;
}
// if this file is not found we should try to do a linear search on the prefix
- r = clean_up(num_objects, prevPid);
+ r = clean_up(num_objects, prevPid, concurrentios);
if (r != 0) return r;
r = sync_remove(metadata_name);
return 0;
}
+int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
+ lock_cond lc(&lock);
+ std::string name[concurrentios];
+ std::string newName;
+ int r = 0;
+ utime_t runtime;
+ int slot = 0;
+
+ lock.Lock();
+ data.done = false;
+ data.in_flight = 0;
+ data.started = 0;
+ data.finished = 0;
+ lock.Unlock();
+
+ // don't start more completions than files
+ if (num_objects < concurrentios) {
+ concurrentios = num_objects;
+ }
+
+ r = completions_init(concurrentios);
+ if (r < 0)
+ return r;
+
+ //set up initial removes
+ for (int i = 0; i < concurrentios; ++i) {
+ name[i] = generate_object_name(i, prevPid);
+ }
+
+ //start initial removes
+ for (int i = 0; i < concurrentios; ++i) {
+ create_completion(i, _aio_cb, (void *)&lc);
+ r = aio_remove(name[i], i);
+ if (r < 0) { //naughty, doesn't clean up heap
+ cerr << "r = " << r << std::endl;
+ goto ERR;
+ }
+ lock.Lock();
+ ++data.started;
+ ++data.in_flight;
+ lock.Unlock();
+ }
+
+ //keep on adding new removes as old ones complete
+ while (data.started < num_objects) {
+ lock.Lock();
+ int old_slot = slot;
+ bool found = false;
+ while (1) {
+ do {
+ if (completion_is_done(slot)) {
+ found = true;
+ break;
+ }
+ slot++;
+ if (slot == concurrentios) {
+ slot = 0;
+ }
+ } while (slot != old_slot);
+ if (found) {
+ break;
+ }
+ lc.cond.Wait(lock);
+ }
+ lock.Unlock();
+ newName = generate_object_name(data.started, prevPid);
+ completion_wait(slot);
+ lock.Lock();
+ r = completion_ret(slot);
+ if (r != 0 && r != -ENOENT) { // file does not exist
+ cerr << "remove got " << r << std::endl;
+ lock.Unlock();
+ goto ERR;
+ }
+ ++data.finished;
+ --data.in_flight;
+ lock.Unlock();
+ release_completion(slot);
+
+ //start new remove and check data if requested
+ create_completion(slot, _aio_cb, (void *)&lc);
+ r = aio_remove(newName, slot);
+ if (r < 0) {
+ goto ERR;
+ }
+ lock.Lock();
+ ++data.started;
+ ++data.in_flight;
+ lock.Unlock();
+ name[slot] = newName;
+ }
+
+ //wait for final removes to complete
+ while (data.finished < data.started) {
+ slot = data.finished % concurrentios;
+ completion_wait(slot);
+ lock.Lock();
+ r = completion_ret(slot);
+ if (r != 0 && r != -ENOENT) { // file does not exist
+ cerr << "remove got " << r << std::endl;
+ lock.Unlock();
+ goto ERR;
+ }
+ ++data.finished;
+ --data.in_flight;
+ release_completion(slot);
+ lock.Unlock();
+ }
+
+ lock.Lock();
+ data.done = true;
+ lock.Unlock();
+
+ completions_done();
+
+ return 0;
+
+ ERR:
+ lock.Lock();
+ data.done = 1;
+ lock.Unlock();
+ return -5;
+}
+
int write_bench(int secondsToRun, int concurrentios);
int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid);
- int clean_up(int num_objects, int prevPid);
+ int clean_up(int num_objects, int prevPid, int concurrentios);
virtual int completions_init(int concurrentios) = 0;
virtual void completions_done() = 0;
ObjBencher() : show_time(false), lock("ObjBencher::lock") {}
virtual ~ObjBencher() {}
int aio_bench(int operation, int secondsToRun, int concurrentios, int op_size, bool cleanup);
- int clean_up(const std::string& prefix, int concurrent_ios);
+ int clean_up(const std::string& prefix, int concurrentios);
void set_show_time(bool dt) {
show_time = dt;