From 048c7dc4c8d5e52c4833ae5d04f0414371ae93c4 Mon Sep 17 00:00:00 2001 From: Mike Ryan Date: Thu, 23 Aug 2012 11:52:51 -0700 Subject: [PATCH] obj_bencher: cleanup files in parallel using aio Signed-off-by: Mike Ryan --- src/common/obj_bencher.cc | 146 +++++++++++++++++++++++++++++++++----- src/common/obj_bencher.h | 4 +- 2 files changed, 129 insertions(+), 21 deletions(-) diff --git a/src/common/obj_bencher.cc b/src/common/obj_bencher.cc index 12818fde1b341..f1554ce3f24e8 100644 --- a/src/common/obj_bencher.cc +++ b/src/common/obj_bencher.cc @@ -220,7 +220,7 @@ int ObjBencher::aio_bench(int operation, int secondsToRun, int concurrentios, in goto out; } - r = clean_up(num_objects, prevPid); + r = clean_up(num_objects, prevPid, concurrentios); if (r != 0) goto out; // lastrun file @@ -669,23 +669,7 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre return -5; } -int ObjBencher::clean_up(int num_objects, int prevPid) { - int r = 0; - - for (int i = 0; i < num_objects; ++i) { - std::string name = generate_object_name(i, prevPid); - r = sync_remove(name); - - if (r < 0) { - return r; - } - } - - return 0; -} - - -int ObjBencher::clean_up(const std::string& prefix, int concurrent_ios) { +int ObjBencher::clean_up(const std::string& prefix, int concurrentios) { int r = 0; int object_size; int num_objects; @@ -700,7 +684,7 @@ int ObjBencher::clean_up(const std::string& prefix, int concurrent_ios) { } // if this file is not found we should try to do a linear search on the prefix - r = clean_up(num_objects, prevPid); + r = clean_up(num_objects, prevPid, concurrentios); if (r != 0) return r; r = sync_remove(metadata_name); @@ -709,4 +693,128 @@ int ObjBencher::clean_up(const std::string& prefix, int concurrent_ios) { return 0; } +int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) { + lock_cond lc(&lock); + std::string name[concurrentios]; + std::string newName; + int r = 0; + utime_t runtime; + int slot = 0; + + lock.Lock(); + data.done = false; + data.in_flight = 0; + data.started = 0; + data.finished = 0; + lock.Unlock(); + + // don't start more completions than files + if (num_objects < concurrentios) { + concurrentios = num_objects; + } + + r = completions_init(concurrentios); + if (r < 0) + return r; + + //set up initial removes + for (int i = 0; i < concurrentios; ++i) { + name[i] = generate_object_name(i, prevPid); + } + + //start initial removes + for (int i = 0; i < concurrentios; ++i) { + create_completion(i, _aio_cb, (void *)&lc); + r = aio_remove(name[i], i); + if (r < 0) { //naughty, doesn't clean up heap + cerr << "r = " << r << std::endl; + goto ERR; + } + lock.Lock(); + ++data.started; + ++data.in_flight; + lock.Unlock(); + } + + //keep on adding new removes as old ones complete + while (data.started < num_objects) { + lock.Lock(); + int old_slot = slot; + bool found = false; + while (1) { + do { + if (completion_is_done(slot)) { + found = true; + break; + } + slot++; + if (slot == concurrentios) { + slot = 0; + } + } while (slot != old_slot); + if (found) { + break; + } + lc.cond.Wait(lock); + } + lock.Unlock(); + newName = generate_object_name(data.started, prevPid); + completion_wait(slot); + lock.Lock(); + r = completion_ret(slot); + if (r != 0 && r != -ENOENT) { // file does not exist + cerr << "remove got " << r << std::endl; + lock.Unlock(); + goto ERR; + } + ++data.finished; + --data.in_flight; + lock.Unlock(); + release_completion(slot); + + //start new remove and check data if requested + create_completion(slot, _aio_cb, (void *)&lc); + r = aio_remove(newName, slot); + if (r < 0) { + goto ERR; + } + lock.Lock(); + ++data.started; + ++data.in_flight; + lock.Unlock(); + name[slot] = newName; + } + + //wait for final removes to complete + while (data.finished < data.started) { + slot = data.finished % concurrentios; + completion_wait(slot); + lock.Lock(); + r = completion_ret(slot); + if (r != 0 && r != -ENOENT) { // file does not exist + cerr << "remove got " << r << std::endl; + lock.Unlock(); + goto ERR; + } + ++data.finished; + --data.in_flight; + release_completion(slot); + lock.Unlock(); + } + + lock.Lock(); + data.done = true; + lock.Unlock(); + + completions_done(); + + return 0; + + ERR: + lock.Lock(); + data.done = 1; + lock.Unlock(); + return -5; +} + diff --git a/src/common/obj_bencher.h b/src/common/obj_bencher.h index f766462e93cfd..04aa13d3303f2 100644 --- a/src/common/obj_bencher.h +++ b/src/common/obj_bencher.h @@ -63,7 +63,7 @@ protected: int write_bench(int secondsToRun, int concurrentios); int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid); - int clean_up(int num_objects, int prevPid); + int clean_up(int num_objects, int prevPid, int concurrentios); virtual int completions_init(int concurrentios) = 0; virtual void completions_done() = 0; @@ -88,7 +88,7 @@ public: ObjBencher() : show_time(false), lock("ObjBencher::lock") {} virtual ~ObjBencher() {} int aio_bench(int operation, int secondsToRun, int concurrentios, int op_size, bool cleanup); - int clean_up(const std::string& prefix, int concurrent_ios); + int clean_up(const std::string& prefix, int concurrentios); void set_show_time(bool dt) { show_time = dt; -- 2.39.5