]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
obj_bencher: cleanup files in parallel using aio
authorMike Ryan <mike.ryan@inktank.com>
Thu, 23 Aug 2012 18:52:51 +0000 (11:52 -0700)
committerMike Ryan <mike.ryan@inktank.com>
Thu, 23 Aug 2012 22:52:27 +0000 (15:52 -0700)
Signed-off-by: Mike Ryan <mike.ryan@inktank.com>
src/common/obj_bencher.cc
src/common/obj_bencher.h

index 12818fde1b341c40f2e27c1282f88585aadbf0dd..f1554ce3f24e80b9e34f1589f193c60a5773760a 100644 (file)
@@ -220,7 +220,7 @@ int ObjBencher::aio_bench(int operation, int secondsToRun, int concurrentios, in
       goto out;
     }
  
-    r = clean_up(num_objects, prevPid);
+    r = clean_up(num_objects, prevPid, concurrentios);
     if (r != 0) goto out;
 
     // lastrun file
@@ -669,23 +669,7 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
   return -5;
 }
 
-int ObjBencher::clean_up(int num_objects, int prevPid) {
-  int r = 0;
-
-  for (int i = 0; i < num_objects; ++i) {
-      std::string name = generate_object_name(i, prevPid);
-      r = sync_remove(name);
-
-      if (r < 0) {
-          return r;
-      }
-  }
-
-  return 0;
-}
-
-
-int ObjBencher::clean_up(const std::string& prefix, int concurrent_ios) {
+int ObjBencher::clean_up(const std::string& prefix, int concurrentios) {
   int r = 0;
   int object_size;
   int num_objects;
@@ -700,7 +684,7 @@ int ObjBencher::clean_up(const std::string& prefix, int concurrent_ios) {
   }
   // if this file is not found we should try to do a linear search on the prefix
 
-  r = clean_up(num_objects, prevPid);
+  r = clean_up(num_objects, prevPid, concurrentios);
   if (r != 0) return r;
 
   r = sync_remove(metadata_name);
@@ -709,4 +693,128 @@ int ObjBencher::clean_up(const std::string& prefix, int concurrent_ios) {
   return 0;
 }
 
+int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
+  lock_cond lc(&lock);
+  std::string name[concurrentios];
+  std::string newName;
+  int r = 0;
+  utime_t runtime;
+  int slot = 0;
+
+  lock.Lock();
+  data.done = false;
+  data.in_flight = 0;
+  data.started = 0;
+  data.finished = 0;
+  lock.Unlock();
+
+  // don't start more completions than files
+  if (num_objects < concurrentios) {
+    concurrentios = num_objects;
+  }
+
+  r = completions_init(concurrentios);
+  if (r < 0)
+    return r;
+
+  //set up initial removes
+  for (int i = 0; i < concurrentios; ++i) {
+    name[i] = generate_object_name(i, prevPid);
+  }
+
+  //start initial removes
+  for (int i = 0; i < concurrentios; ++i) {
+    create_completion(i, _aio_cb, (void *)&lc);
+    r = aio_remove(name[i], i);
+    if (r < 0) { //naughty, doesn't clean up heap
+      cerr << "r = " << r << std::endl;
+      goto ERR;
+    }
+    lock.Lock();
+    ++data.started;
+    ++data.in_flight;
+    lock.Unlock();
+  }
+
+  //keep on adding new removes as old ones complete
+  while (data.started < num_objects) {
+    lock.Lock();
+    int old_slot = slot;
+    bool found = false;
+    while (1) {
+      do {
+       if (completion_is_done(slot)) {
+          found = true;
+         break;
+       }
+        slot++;
+        if (slot == concurrentios) {
+          slot = 0;
+        }
+      } while (slot != old_slot);
+      if (found) {
+       break;
+      }
+      lc.cond.Wait(lock);
+    }
+    lock.Unlock();
+    newName = generate_object_name(data.started, prevPid);
+    completion_wait(slot);
+    lock.Lock();
+    r = completion_ret(slot);
+    if (r != 0 && r != -ENOENT) { // file does not exist
+      cerr << "remove got " << r << std::endl;
+      lock.Unlock();
+      goto ERR;
+    }
+    ++data.finished;
+    --data.in_flight;
+    lock.Unlock();
+    release_completion(slot);
+
+    //start new remove and check data if requested
+    create_completion(slot, _aio_cb, (void *)&lc);
+    r = aio_remove(newName, slot);
+    if (r < 0) {
+      goto ERR;
+    }
+    lock.Lock();
+    ++data.started;
+    ++data.in_flight;
+    lock.Unlock();
+    name[slot] = newName;
+  }
+
+  //wait for final removes to complete
+  while (data.finished < data.started) {
+    slot = data.finished % concurrentios;
+    completion_wait(slot);
+    lock.Lock();
+    r = completion_ret(slot);
+    if (r != 0 && r != -ENOENT) { // file does not exist
+      cerr << "remove got " << r << std::endl;
+      lock.Unlock();
+      goto ERR;
+    }
+    ++data.finished;
+    --data.in_flight;
+    release_completion(slot);
+    lock.Unlock();
+  }
+
+  lock.Lock();
+  data.done = true;
+  lock.Unlock();
+
+  completions_done();
+
+  return 0;
+
+ ERR:
+  lock.Lock();
+  data.done = 1;
+  lock.Unlock();
+  return -5;
+}
+
 
index f766462e93cfd7ad22896608c90c39901dc20505..04aa13d3303f28500a503e488ff868f3ebd5d625 100644 (file)
@@ -63,7 +63,7 @@ protected:
   int write_bench(int secondsToRun, int concurrentios);
   int seq_read_bench(int secondsToRun, int concurrentios, int num_objects, int writePid);
 
-  int clean_up(int num_objects, int prevPid);
+  int clean_up(int num_objects, int prevPid, int concurrentios);
 
   virtual int completions_init(int concurrentios) = 0;
   virtual void completions_done() = 0;
@@ -88,7 +88,7 @@ public:
   ObjBencher() : show_time(false), lock("ObjBencher::lock") {}
   virtual ~ObjBencher() {}
   int aio_bench(int operation, int secondsToRun, int concurrentios, int op_size, bool cleanup);
-  int clean_up(const std::string& prefix, int concurrent_ios);
+  int clean_up(const std::string& prefix, int concurrentios);
 
   void set_show_time(bool dt) {
     show_time = dt;