*
  */
 
-#include <boost/bind.hpp>
-#include <map>
-#include <set>
-#include <sstream>
-
 #include "include/rados/librados.hpp"
 #include "common/Formatter.h"
 #include "common/admin_socket.h"
 #include "common/debug.h"
 #include "common/errno.h"
+#include "common/Timer.h"
 #include "common/WorkQueue.h"
 #include "global/global_context.h"
 #include "librbd/internal.h"
 #include "librbd/Utils.h"
 #include "ImageDeleter.h"
 #include "tools/rbd_mirror/image_deleter/RemoveRequest.h"
+#include <map>
+#include <sstream>
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rbd_mirror
                            << __func__ << ": "
 
 using std::string;
-using std::map;
 using std::stringstream;
 using std::vector;
 using std::pair;
 ImageDeleter<I>::ImageDeleter(ContextWQ *work_queue, SafeTimer *timer,
                               Mutex *timer_lock,
                               ServiceDaemon<librbd::ImageCtx>* service_daemon)
-  : m_work_queue(work_queue),
+  : m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
     m_service_daemon(service_daemon),
-    m_delete_lock("rbd::mirror::ImageDeleter::Delete"),
-    m_image_deleter_thread(this),
-    m_failed_timer(timer),
-    m_failed_timer_lock(timer_lock),
+    m_lock("rbd::mirror::ImageDeleter::m_lock"),
     m_asok_hook(new ImageDeleterAdminSocketHook<I>(g_ceph_context, this))
 {
-  set_failed_timer_interval(g_ceph_context->_conf->get_val<double>(
-    "rbd_mirror_delete_retry_interval"));
-  m_image_deleter_thread.create("image_deleter");
 }
 
 template <typename I>
 ImageDeleter<I>::~ImageDeleter() {
-  dout(20) << "enter" << dendl;
+  dout(20) << dendl;
 
-  m_running = false;
   {
-    Mutex::Locker l (m_delete_lock);
-    m_delete_queue_cond.Signal();
-  }
-  if (m_image_deleter_thread.is_started()) {
-    m_image_deleter_thread.join();
+    Mutex::Locker timer_locker(*m_timer_lock);
+    Mutex::Locker locker(m_lock);
+    m_running = false;
+    cancel_retry_timer();
   }
 
-  delete m_asok_hook;
-  dout(20) << "return" << dendl;
-}
+  C_SaferCond ctx;
+  m_async_op_tracker.wait_for_ops(&ctx);
+  ctx.wait();
 
-template <typename I>
-void ImageDeleter<I>::run() {
-  dout(20) << "enter" << dendl;
-  while(m_running) {
-    m_delete_lock.Lock();
-    while (m_delete_queue.empty()) {
-      dout(20) << "waiting for delete requests" << dendl;
-      m_delete_queue_cond.Wait(m_delete_lock);
-
-      if (!m_running) {
-        m_delete_lock.Unlock();
-        dout(20) << "return" << dendl;
-        return;
-      }
+  // wake up any external state machines waiting on deletions
+  assert(m_in_flight_delete_queue.empty());
+  for (auto& info : m_delete_queue) {
+    if (info->on_delete != nullptr) {
+      info->on_delete->complete(-ECANCELED);
     }
-
-    m_active_delete = std::move(m_delete_queue.back());
-    m_delete_queue.pop_back();
-    m_delete_lock.Unlock();
-
-    bool move_to_next = process_image_delete();
-    if (!move_to_next) {
-      if (!m_running) {
-       dout(20) << "return" << dendl;
-       return;
-      }
-
-      Mutex::Locker l(m_delete_lock);
-      if (m_delete_queue.size() == 1) {
-        m_delete_queue_cond.Wait(m_delete_lock);
-      }
+  }
+  for (auto& info : m_retry_delete_queue) {
+    if (info->on_delete != nullptr) {
+      info->on_delete->complete(-ECANCELED);
     }
   }
+
+  delete m_asok_hook;
 }
 
 template <typename I>
 void ImageDeleter<I>::schedule_image_delete(IoCtxRef local_io_ctx,
                                             const std::string& global_image_id,
                                             bool ignore_orphaned) {
-  dout(20) << "enter" << dendl;
-
-  Mutex::Locker locker(m_delete_lock);
   int64_t local_pool_id = local_io_ctx->get_id();
-  auto del_info = find_delete_info(local_pool_id, global_image_id);
-  if (del_info != nullptr) {
-    dout(20) << "image " << global_image_id << " "
-             << "was already scheduled for deletion" << dendl;
-    if (ignore_orphaned) {
-      (*del_info)->ignore_orphaned = true;
+  dout(5) << "local_pool_id=" << local_pool_id << ", "
+          << "global_image_id=" << global_image_id << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    auto del_info = find_delete_info(local_pool_id, global_image_id);
+    if (del_info != nullptr) {
+      dout(20) << "image " << global_image_id << " "
+               << "was already scheduled for deletion" << dendl;
+      if (ignore_orphaned) {
+        del_info->ignore_orphaned = true;
+      }
+      return;
     }
-    return;
-  }
 
-  m_delete_queue.push_front(
-    unique_ptr<DeleteInfo>(new DeleteInfo(local_pool_id, global_image_id,
-                                          local_io_ctx, ignore_orphaned)));
-  m_delete_queue_cond.Signal();
+    m_delete_queue.emplace_back(new DeleteInfo(local_pool_id, global_image_id,
+                                               local_io_ctx, ignore_orphaned));
+  }
+  remove_images();
 }
 
 template <typename I>
                                                   const std::string &global_image_id,
                                                   Context *ctx,
                                                   bool notify_on_failed_retry) {
+  dout(5) << "local_pool_id=" << local_pool_id << ", "
+          << "global_image_id=" << global_image_id << dendl;
 
   ctx = new FunctionContext([this, ctx](int r) {
       m_work_queue->queue(ctx, r);
     });
 
-  Mutex::Locker locker(m_delete_lock);
+  Mutex::Locker locker(m_lock);
   auto del_info = find_delete_info(local_pool_id, global_image_id);
   if (!del_info) {
     // image not scheduled for deletion
     return;
   }
 
-  dout(20) << "local_pool_id=" << local_pool_id << ", "
-           << "global_image_id=" << global_image_id << dendl;
-
-  if ((*del_info)->on_delete != nullptr) {
-    (*del_info)->on_delete->complete(-ESTALE);
+  if (del_info->on_delete != nullptr) {
+    del_info->on_delete->complete(-ESTALE);
   }
-  (*del_info)->on_delete = ctx;
-  (*del_info)->notify_on_failed_retry = notify_on_failed_retry;
+  del_info->on_delete = ctx;
+  del_info->notify_on_failed_retry = notify_on_failed_retry;
 }
 
 template <typename I>
 void ImageDeleter<I>::cancel_waiter(int64_t local_pool_id,
                                     const std::string &global_image_id) {
-  Mutex::Locker locker(m_delete_lock);
+  dout(5) << "local_pool_id=" << local_pool_id << ", "
+          << "global_image_id=" << global_image_id << dendl;
+
+  Mutex::Locker locker(m_lock);
   auto del_info = find_delete_info(local_pool_id, global_image_id);
   if (!del_info) {
     return;
   }
 
-  if ((*del_info)->on_delete != nullptr) {
-    (*del_info)->on_delete->complete(-ECANCELED);
-    (*del_info)->on_delete = nullptr;
-  }
-}
-
-template <typename I>
-bool ImageDeleter<I>::process_image_delete() {
-  stringstream ss;
-  m_active_delete->to_string(ss);
-  std::string del_info_str = ss.str();
-  dout(10) << "start processing delete request: " << del_info_str << dendl;
-
-  C_SaferCond remove_ctx;
-  image_deleter::ErrorResult error_result;
-  auto req = image_deleter::RemoveRequest<I>::create(
-    *m_active_delete->local_io_ctx, m_active_delete->global_image_id,
-    m_active_delete->ignore_orphaned, &error_result, m_work_queue, &remove_ctx);
-  req->send();
-
-  int r = remove_ctx.wait();
-  if (r < 0) {
-    if (error_result == image_deleter::ERROR_RESULT_COMPLETE) {
-      complete_active_delete(r);
-      return true;
-    } else if (error_result == image_deleter::ERROR_RESULT_RETRY_IMMEDIATELY) {
-      Mutex::Locker l(m_delete_lock);
-      m_active_delete->notify(r);
-      m_delete_queue.push_front(std::move(m_active_delete));
-      return false;
-    } else {
-      enqueue_failed_delete(r);
-      return true;
-    }
+  if (del_info->on_delete != nullptr) {
+    del_info->on_delete->complete(-ECANCELED);
+    del_info->on_delete = nullptr;
   }
-
-  complete_active_delete(0);
-  return true;
 }
 
 template <typename I>
-void ImageDeleter<I>::complete_active_delete(int r) {
-  dout(20) << dendl;
-
-  Mutex::Locker delete_locker(m_delete_lock);
-  m_active_delete->notify(r);
-  m_active_delete.reset();
+void ImageDeleter<I>::complete_active_delete(DeleteInfoRef* delete_info,
+                                             int r) {
+  dout(20) << "info=" << *delete_info << ", r=" << r << dendl;
+  Mutex::Locker locker(m_lock);
+  (*delete_info)->notify(r);
+  delete_info->reset();
 }
 
 template <typename I>
-void ImageDeleter<I>::enqueue_failed_delete(int error_code) {
-  dout(20) << "enter" << dendl;
-
+void ImageDeleter<I>::enqueue_failed_delete(DeleteInfoRef* delete_info,
+                                            int error_code,
+                                            double retry_delay) {
+  dout(20) << "info=" << *delete_info << ", r=" << error_code << dendl;
   if (error_code == -EBLACKLISTED) {
+    Mutex::Locker locker(m_lock);
     derr << "blacklisted while deleting local image" << dendl;
-    complete_active_delete(error_code);
+    complete_active_delete(delete_info, error_code);
     return;
   }
 
-  m_delete_lock.Lock();
-  if (m_active_delete->notify_on_failed_retry) {
-    m_active_delete->notify(error_code);
+  Mutex::Locker timer_locker(*m_timer_lock);
+  Mutex::Locker locker(m_lock);
+  auto& delete_info_ref = *delete_info;
+  if (delete_info_ref->notify_on_failed_retry) {
+    delete_info_ref->notify(error_code);
   }
-  m_active_delete->error_code = error_code;
-  bool was_empty = m_failed_queue.empty();
-  m_failed_queue.push_front(std::move(m_active_delete));
-  m_delete_lock.Unlock();
-  if (was_empty) {
-    FunctionContext *ctx = new FunctionContext(
-      boost::bind(&ImageDeleter<I>::retry_failed_deletions, this));
-    Mutex::Locker l(*m_failed_timer_lock);
-    m_failed_timer->add_event_after(m_failed_interval, ctx);
-  }
-}
+  delete_info_ref->error_code = error_code;
+  delete_info_ref->retry_time = ceph_clock_now();
+  delete_info_ref->retry_time += retry_delay;
+  m_retry_delete_queue.push_back(delete_info_ref);
 
-template <typename I>
-void ImageDeleter<I>::retry_failed_deletions() {
-  dout(20) << "enter" << dendl;
-
-  Mutex::Locker l(m_delete_lock);
-
-  bool empty = m_failed_queue.empty();
-  while (!m_failed_queue.empty()) {
-    m_delete_queue.push_back(std::move(m_failed_queue.back()));
-    m_delete_queue.back()->retries++;
-    m_failed_queue.pop_back();
-  }
-  if (!empty) {
-    m_delete_queue_cond.Signal();
-  }
+  schedule_retry_timer();
 }
 
 template <typename I>
-unique_ptr<typename ImageDeleter<I>::DeleteInfo> const*
+typename ImageDeleter<I>::DeleteInfoRef
 ImageDeleter<I>::find_delete_info(int64_t local_pool_id,
                                   const std::string &global_image_id) {
-  assert(m_delete_lock.is_locked());
-
-  if (m_active_delete && m_active_delete->match(local_pool_id,
-                                                global_image_id)) {
-    return &m_active_delete;
-  }
-
-  for (const auto& del_info : m_delete_queue) {
-    if (del_info->match(local_pool_id, global_image_id)) {
-      return &del_info;
-    }
-  }
-
-  for (const auto& del_info : m_failed_queue) {
-    if (del_info->match(local_pool_id, global_image_id)) {
-      return &del_info;
+  assert(m_lock.is_locked());
+  DeleteQueue delete_queues[] = {m_in_flight_delete_queue,
+                                 m_retry_delete_queue,
+                                 m_delete_queue};
+
+  DeleteInfo delete_info{local_pool_id, global_image_id};
+  for (auto& queue : delete_queues) {
+    auto it = std::find_if(queue.begin(), queue.end(),
+                           [&delete_info](const DeleteInfoRef& ref) {
+                             return delete_info == *ref;
+                           });
+    if (it != queue.end()) {
+      return *it;
     }
   }
-
-  return nullptr;
+  return {};
 }
 
 template <typename I>
 void ImageDeleter<I>::print_status(Formatter *f, stringstream *ss) {
-  dout(20) << "enter" << dendl;
+  dout(20) << dendl;
 
   if (f) {
     f->open_object_section("image_deleter_status");
     f->open_array_section("delete_images_queue");
   }
 
-  Mutex::Locker l(m_delete_lock);
+  Mutex::Locker l(m_lock);
   for (const auto& image : m_delete_queue) {
     image->print_status(f, ss);
   }
     f->open_array_section("failed_deletes_queue");
   }
 
-  for (const auto& image : m_failed_queue) {
+  for (const auto& image : m_retry_delete_queue) {
     image->print_status(f, ss, true);
   }
 
 }
 
 template <typename I>
-void ImageDeleter<I>::DeleteInfo::notify(int r) {
-  if (on_delete) {
-    dout(20) << "executing image deletion handler r=" << r << dendl;
+vector<string> ImageDeleter<I>::get_delete_queue_items() {
+  vector<string> items;
 
-    Context *ctx = on_delete;
-    on_delete = nullptr;
-    ctx->complete(r);
+  Mutex::Locker l(m_lock);
+  for (const auto& del_info : m_delete_queue) {
+    items.push_back(del_info->global_image_id);
   }
+
+  return items;
 }
 
 template <typename I>
-void ImageDeleter<I>::DeleteInfo::to_string(stringstream& ss) {
-  ss << "[" << "local_pool_id=" << local_pool_id << ", ";
-  ss << "global_image_id=" << global_image_id << "]";
+vector<pair<string, int> > ImageDeleter<I>::get_failed_queue_items() {
+  vector<pair<string, int> > items;
+
+  Mutex::Locker l(m_lock);
+  for (const auto& del_info : m_retry_delete_queue) {
+    items.push_back(make_pair(del_info->global_image_id,
+                              del_info->error_code));
+  }
+
+  return items;
 }
 
 template <typename I>
-void ImageDeleter<I>::DeleteInfo::print_status(Formatter *f, stringstream *ss,
-                                               bool print_failure_info) {
-  if (f) {
-    f->open_object_section("delete_info");
-    f->dump_int("local_pool_id", local_pool_id);
-    f->dump_string("global_image_id", global_image_id);
-    if (print_failure_info) {
-      f->dump_string("error_code", cpp_strerror(error_code));
-      f->dump_int("retries", retries);
+void ImageDeleter<I>::remove_images() {
+  dout(10) << dendl;
+
+  uint64_t max_concurrent_deletions = g_ceph_context->_conf->get_val<uint64_t>(
+    "rbd_mirror_concurrent_image_deletions");
+
+  Mutex::Locker locker(m_lock);
+  while (true) {
+    if (!m_running || m_delete_queue.empty() ||
+        m_in_flight_delete_queue.size() >= max_concurrent_deletions) {
+      return;
     }
-    f->close_section();
-    f->flush(*ss);
-  } else {
-    this->to_string(*ss);
+
+    DeleteInfoRef delete_info = m_delete_queue.front();
+    m_delete_queue.pop_front();
+
+    assert(delete_info);
+    remove_image(delete_info);
   }
 }
 
 template <typename I>
-vector<string> ImageDeleter<I>::get_delete_queue_items() {
-  vector<string> items;
+void ImageDeleter<I>::remove_image(DeleteInfoRef delete_info) {
+  dout(10) << "info=" << *delete_info << dendl;
+  assert(m_lock.is_locked());
 
-  Mutex::Locker l(m_delete_lock);
-  for (const auto& del_info : m_delete_queue) {
-    items.push_back(del_info->global_image_id);
+  m_in_flight_delete_queue.push_back(delete_info);
+  m_async_op_tracker.start_op();
+
+  auto ctx = new FunctionContext([this, delete_info](int r) {
+      handle_remove_image(delete_info, r);
+      m_async_op_tracker.finish_op();
+    });
+
+  auto req = image_deleter::RemoveRequest<I>::create(
+    *delete_info->local_io_ctx, delete_info->global_image_id,
+    delete_info->ignore_orphaned, &delete_info->error_result, m_work_queue,
+    ctx);
+  req->send();
+}
+
+template <typename I>
+void ImageDeleter<I>::handle_remove_image(DeleteInfoRef delete_info,
+                                          int r) {
+  dout(10) << "info=" << *delete_info << ", r=" << r << dendl;
+
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_lock.is_locked());
+    auto it = std::find(m_in_flight_delete_queue.begin(),
+                        m_in_flight_delete_queue.end(), delete_info);
+    assert(it != m_in_flight_delete_queue.end());
+    m_in_flight_delete_queue.erase(it);
   }
 
-  return items;
+  if (r < 0) {
+    if (delete_info->error_result == image_deleter::ERROR_RESULT_COMPLETE) {
+      complete_active_delete(&delete_info, r);
+    } else if (delete_info->error_result ==
+                 image_deleter::ERROR_RESULT_RETRY_IMMEDIATELY) {
+      enqueue_failed_delete(&delete_info, r, m_busy_interval);
+    } else {
+      double failed_interval = g_ceph_context->_conf->get_val<double>(
+        "rbd_mirror_delete_retry_interval");
+      enqueue_failed_delete(&delete_info, r, failed_interval);
+    }
+  } else {
+    complete_active_delete(&delete_info, 0);
+  }
+
+  // process the next queued image to delete
+  remove_images();
 }
 
 template <typename I>
-vector<pair<string, int> > ImageDeleter<I>::get_failed_queue_items() {
-  vector<pair<string, int> > items;
+void ImageDeleter<I>::schedule_retry_timer() {
+  assert(m_timer_lock->is_locked());
+  assert(m_lock.is_locked());
+  if (!m_running || m_timer_ctx != nullptr || m_retry_delete_queue.empty()) {
+    return;
+  }
 
-  Mutex::Locker l(m_delete_lock);
-  for (const auto& del_info : m_failed_queue) {
-    items.push_back(make_pair(del_info->global_image_id,
-                              del_info->error_code));
+  dout(10) << dendl;
+  auto &delete_info = m_retry_delete_queue.front();
+  m_timer_ctx = new FunctionContext([this](int r) {
+      handle_retry_timer();
+    });
+  m_timer->add_event_at(delete_info->retry_time, m_timer_ctx);
+}
+
+template <typename I>
+void ImageDeleter<I>::cancel_retry_timer() {
+  dout(10) << dendl;
+  assert(m_timer_lock->is_locked());
+  if (m_timer_ctx != nullptr) {
+    bool canceled = m_timer->cancel_event(m_timer_ctx);
+    m_timer_ctx = nullptr;
+    assert(canceled);
   }
+}
 
-  return items;
+template <typename I>
+void ImageDeleter<I>::handle_retry_timer() {
+  dout(10) << dendl;
+  assert(m_timer_lock->is_locked());
+  Mutex::Locker locker(m_lock);
+
+  assert(m_timer_ctx != nullptr);
+  m_timer_ctx = nullptr;
+
+  assert(m_running);
+  assert(!m_retry_delete_queue.empty());
+
+  // move all ready-to-ready items back to main queue
+  utime_t now = ceph_clock_now();
+  while (!m_retry_delete_queue.empty()) {
+    auto &delete_info = m_retry_delete_queue.front();
+    if (delete_info->retry_time > now) {
+      break;
+    }
+
+    m_delete_queue.push_back(delete_info);
+    m_retry_delete_queue.pop_front();
+  }
+
+  // schedule wake up for any future retries
+  schedule_retry_timer();
+
+  // start (concurrent) removal of images
+  m_async_op_tracker.start_op();
+  auto ctx = new FunctionContext([this](int r) {
+      remove_images();
+      m_async_op_tracker.finish_op();
+    });
+  m_work_queue->queue(ctx, 0);
+}
+
+template <typename I>
+void ImageDeleter<I>::DeleteInfo::notify(int r) {
+  if (on_delete) {
+    dout(20) << "executing image deletion handler r=" << r << dendl;
+
+    Context *ctx = on_delete;
+    on_delete = nullptr;
+    ctx->complete(r);
+  }
 }
 
 template <typename I>
-void ImageDeleter<I>::set_failed_timer_interval(double interval) {
-  this->m_failed_interval = interval;
+void ImageDeleter<I>::DeleteInfo::print_status(Formatter *f, stringstream *ss,
+                                               bool print_failure_info) {
+  if (f) {
+    f->open_object_section("delete_info");
+    f->dump_int("local_pool_id", local_pool_id);
+    f->dump_string("global_image_id", global_image_id);
+    if (print_failure_info) {
+      f->dump_string("error_code", cpp_strerror(error_code));
+      f->dump_int("retries", retries);
+    }
+    f->close_section();
+    f->flush(*ss);
+  } else {
+    *ss << *this;
+  }
 }
 
 } // namespace mirror
 
  *
  */
 
-#ifndef CEPH_RBD_MIRROR_IMAGEDELETER_H
-#define CEPH_RBD_MIRROR_IMAGEDELETER_H
+#ifndef CEPH_RBD_MIRROR_IMAGE_DELETER_H
+#define CEPH_RBD_MIRROR_IMAGE_DELETER_H
 
+#include "include/utime.h"
+#include "common/AsyncOpTracker.h"
 #include "common/Mutex.h"
-#include "common/Cond.h"
-#include "common/Thread.h"
-#include "common/Timer.h"
 #include "types.h"
-
+#include "tools/rbd_mirror/image_deleter/Types.h"
+#include <atomic>
 #include <deque>
+#include <iosfwd>
+#include <memory>
 #include <vector>
-#include <atomic>
 
 class AdminSocketHook;
+class Context;
 class ContextWQ;
+class SafeTimer;
 namespace librbd { struct ImageCtx; }
 
 namespace rbd {
 template <typename ImageCtxT = librbd::ImageCtx>
 class ImageDeleter {
 public:
-  static const int EISPRM = 1000;
-
   ImageDeleter(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
                ServiceDaemon<librbd::ImageCtx>* service_daemon);
   ~ImageDeleter();
   // for testing purposes
   std::vector<std::string> get_delete_queue_items();
   std::vector<std::pair<std::string, int> > get_failed_queue_items();
-  void set_failed_timer_interval(double interval);
 
-private:
+  inline void set_busy_timer_interval(double interval) {
+    m_busy_interval = interval;
+  }
 
-  class ImageDeleterThread : public Thread {
-    ImageDeleter *m_image_deleter;
-  public:
-    ImageDeleterThread(ImageDeleter *image_deleter) :
-      m_image_deleter(image_deleter) {}
-    void *entry() override {
-      m_image_deleter->run();
-      return 0;
-    }
-  };
+private:
 
   struct DeleteInfo {
     int64_t local_pool_id;
     std::string global_image_id;
     IoCtxRef local_io_ctx;
-    bool ignore_orphaned;
+    bool ignore_orphaned = false;
+
+    image_deleter::ErrorResult error_result = {};
     int error_code = 0;
+    utime_t retry_time = {};
     int retries = 0;
     bool notify_on_failed_retry = true;
     Context *on_delete = nullptr;
 
+    DeleteInfo(int64_t local_pool_id, const std::string& global_image_id)
+      : local_pool_id(local_pool_id), global_image_id(global_image_id) {
+    }
+
     DeleteInfo(int64_t local_pool_id, const std::string& global_image_id,
                IoCtxRef local_io_ctx, bool ignore_orphaned)
       : local_pool_id(local_pool_id), global_image_id(global_image_id),
         local_io_ctx(local_io_ctx), ignore_orphaned(ignore_orphaned) {
     }
 
-    bool match(int64_t local_pool_id, const std::string &global_image_id) {
-      return (this->local_pool_id == local_pool_id &&
-              this->global_image_id == global_image_id);
+    inline bool operator==(const DeleteInfo& delete_info) const {
+      return (local_pool_id == delete_info.local_pool_id &&
+              global_image_id == delete_info.global_image_id);
+    }
+
+    friend std::ostream& operator<<(std::ostream& os, DeleteInfo& delete_info) {
+      os << "[" << "local_pool_id=" << delete_info.local_pool_id << ", "
+         << "global_image_id=" << delete_info.global_image_id << "]";
+    return os;
     }
+
     void notify(int r);
-    void to_string(std::stringstream& ss);
     void print_status(Formatter *f, std::stringstream *ss,
                       bool print_failure_info=false);
   };
-
-  std::atomic<unsigned> m_running { 1 };
+  typedef std::shared_ptr<DeleteInfo> DeleteInfoRef;
+  typedef std::deque<DeleteInfoRef> DeleteQueue;
 
   ContextWQ *m_work_queue;
+  SafeTimer *m_timer;
+  Mutex *m_timer_lock;
   ServiceDaemon<librbd::ImageCtx>* m_service_daemon;
 
-  std::deque<std::unique_ptr<DeleteInfo> > m_delete_queue;
-  Mutex m_delete_lock;
-  Cond m_delete_queue_cond;
+  std::atomic<unsigned> m_running { 1 };
 
-  unique_ptr<DeleteInfo> m_active_delete;
+  double m_busy_interval = 1;
 
-  ImageDeleterThread m_image_deleter_thread;
+  AsyncOpTracker m_async_op_tracker;
 
-  std::deque<std::unique_ptr<DeleteInfo>> m_failed_queue;
-  double m_failed_interval;
-  SafeTimer *m_failed_timer;
-  Mutex *m_failed_timer_lock;
+  Mutex m_lock;
+  DeleteQueue m_delete_queue;
+  DeleteQueue m_retry_delete_queue;
+  DeleteQueue m_in_flight_delete_queue;
 
   AdminSocketHook *m_asok_hook;
 
-  void run();
+  Context *m_timer_ctx = nullptr;
+
   bool process_image_delete();
 
-  void complete_active_delete(int r);
-  void enqueue_failed_delete(int error_code);
-  void retry_failed_deletions();
+  void complete_active_delete(DeleteInfoRef* delete_info, int r);
+  void enqueue_failed_delete(DeleteInfoRef* delete_info, int error_code,
+                             double retry_delay);
+
+  DeleteInfoRef find_delete_info(int64_t local_pool_id,
+                                 const std::string &global_image_id);
+
+  void remove_images();
+  void remove_image(DeleteInfoRef delete_info);
+  void handle_remove_image(DeleteInfoRef delete_info, int r);
 
-  unique_ptr<DeleteInfo> const*
-  find_delete_info(int64_t local_pool_id, const std::string &global_image_id);
+  void schedule_retry_timer();
+  void cancel_retry_timer();
+  void handle_retry_timer();
 
 };
 
 
 extern template class rbd::mirror::ImageDeleter<librbd::ImageCtx>;
 
-#endif // CEPH_RBD_MIRROR_IMAGEDELETER_H
+#endif // CEPH_RBD_MIRROR_IMAGE_DELETER_H