]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: AIO submissions processed by new AioImageRequestWQ
authorJason Dillaman <dillaman@redhat.com>
Tue, 7 Jul 2015 17:06:50 +0000 (13:06 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 13 Nov 2015 01:17:52 +0000 (20:17 -0500)
New work queue has the ability to suspend write operations, which
should occur when exclusive locking is enabled and the client doesn't
own the lock.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/AioCompletion.cc
src/librbd/AioImageRequestWQ.cc [new file with mode: 0644]
src/librbd/AioImageRequestWQ.h [new file with mode: 0644]
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/Makefile.am
src/librbd/internal.cc
src/librbd/librbd.cc

index 451898b6b2e92b5c3ad2e787731e460c360c8582..f3822379f096f1357fc8b41a978d3d8d3781ff06 100644 (file)
@@ -6,6 +6,7 @@
 #include "common/ceph_context.h"
 #include "common/dout.h"
 #include "common/errno.h"
+#include "common/WorkQueue.h"
 
 #include "librbd/AioObjectRequest.h"
 #include "librbd/internal.h"
diff --git a/src/librbd/AioImageRequestWQ.cc b/src/librbd/AioImageRequestWQ.cc
new file mode 100644 (file)
index 0000000..03e2af3
--- /dev/null
@@ -0,0 +1,201 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/AioImageRequestWQ.h"
+#include "librbd/AioCompletion.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/internal.h"
+
+namespace librbd {
+
+namespace {
+
+class C_AioReadWQ : public Context {
+public:
+  C_AioReadWQ(ImageCtx *ictx, uint64_t off, size_t len, char *buf,
+              bufferlist *pbl, AioCompletion *c, int op_flags)
+    : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_pbl(pbl), m_comp(c),
+      m_op_flags(op_flags) {
+  }
+protected:
+  virtual void finish(int r) {
+    aio_read(m_ictx, m_off, m_len, m_buf, m_pbl, m_comp, m_op_flags);
+  }
+private:
+  ImageCtx *m_ictx;
+  uint64_t m_off;
+  uint64_t m_len;
+  char *m_buf;
+  bufferlist *m_pbl;
+  AioCompletion *m_comp;
+  int m_op_flags;
+};
+
+class C_AioWriteWQ : public Context {
+public:
+  C_AioWriteWQ(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
+               AioCompletion *c, int op_flags)
+    : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_comp(c),
+      m_op_flags(op_flags) {
+  }
+protected:
+  virtual void finish(int r) {
+    aio_write(m_ictx, m_off, m_len, m_buf, m_comp, m_op_flags);
+  }
+private:
+  ImageCtx *m_ictx;
+  uint64_t m_off;
+  uint64_t m_len;
+  const char *m_buf;
+  AioCompletion *m_comp;
+  int m_op_flags;
+};
+
+class C_AioDiscardWQ : public Context {
+public:
+  C_AioDiscardWQ(ImageCtx *ictx, uint64_t off, uint64_t len, AioCompletion *c)
+    : m_ictx(ictx), m_off(off), m_len(len), m_comp(c) {
+  }
+protected:
+  virtual void finish(int r) {
+    aio_discard(m_ictx, m_off, m_len, m_comp);
+  }
+private:
+  ImageCtx *m_ictx;
+  uint64_t m_off;
+  uint64_t m_len;
+  AioCompletion *m_comp;
+};
+
+class C_AioFlushWQ : public Context {
+public:
+  C_AioFlushWQ(ImageCtx *ictx, AioCompletion *c)
+    : m_ictx(ictx), m_comp(c) {
+  }
+protected:
+  virtual void finish(int r) {
+    aio_flush(m_ictx, m_comp);
+  }
+private:
+  ImageCtx *m_ictx;
+  AioCompletion *m_comp;
+};
+
+} // anonymous namespace
+
+void AioImageRequestWQ::aio_read(ImageCtx *ictx, uint64_t off, size_t len,
+                                 char *buf, bufferlist *pbl, AioCompletion *c,
+                                 int op_flags) {
+  c->init_time(ictx, librbd::AIO_TYPE_READ);
+  if (ictx->non_blocking_aio) {
+    queue(new C_AioReadWQ(ictx, off, len, buf, pbl, c, op_flags),
+          Metadata(false, c));
+  } else {
+    librbd::aio_read(ictx, off, len, buf, pbl, c, op_flags);
+  }
+}
+
+void AioImageRequestWQ::aio_write(ImageCtx *ictx, uint64_t off, size_t len,
+                                  const char *buf, AioCompletion *c,
+                                  int op_flags) {
+  c->init_time(ictx, librbd::AIO_TYPE_WRITE);
+  if (ictx->non_blocking_aio) {
+    queue(new C_AioWriteWQ(ictx, off, len, buf, c, op_flags),
+          Metadata(true, c));
+  } else {
+    librbd::aio_write(ictx, off, len, buf, c, op_flags);
+  }
+}
+
+void AioImageRequestWQ::aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len,
+                                    AioCompletion *c) {
+  c->init_time(ictx, librbd::AIO_TYPE_DISCARD);
+  if (ictx->non_blocking_aio) {
+    queue(new C_AioDiscardWQ(ictx, off, len, c), Metadata(true, c));
+  } else {
+    librbd::aio_discard(ictx, off, len, c);
+  }
+}
+
+void AioImageRequestWQ::aio_flush(ImageCtx *ictx, AioCompletion *c) {
+  c->init_time(ictx, librbd::AIO_TYPE_FLUSH);
+  if (ictx->non_blocking_aio) {
+    queue(new C_AioFlushWQ(ictx, c), Metadata(false, c));
+  } else {
+    librbd::aio_flush(ictx, c);
+  }
+}
+
+bool AioImageRequestWQ::writes_empty() const {
+  Mutex::Locker locker(m_lock);
+  for (ceph::unordered_map<Context *, Metadata>::const_iterator it =
+         m_context_metadata.begin();
+       it != m_context_metadata.end(); ++it) {
+    if (it->second.write_op) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void *AioImageRequestWQ::_void_dequeue() {
+  Context *peek_item = front();
+  if (peek_item == NULL) {
+    return NULL;
+  }
+
+  {
+    Mutex::Locker locker(m_lock);
+    ceph::unordered_map<Context *, Metadata>::iterator it =
+      m_context_metadata.find(peek_item);
+    assert(it != m_context_metadata.end());
+
+    if (it->second.write_op) {
+      if (m_writes_suspended) {
+        return NULL;
+      }
+      ++m_in_progress_writes;
+    }
+  }
+
+  Context *item = reinterpret_cast<Context *>(
+    ThreadPool::PointerWQ<Context>::_void_dequeue());
+  assert(peek_item == item);
+  return item;
+}
+
+void AioImageRequestWQ::process(Context *ctx) {
+  Metadata metadata;
+  {
+    Mutex::Locker locker(m_lock);
+    ceph::unordered_map<Context *, Metadata>::iterator it =
+      m_context_metadata.find(ctx);
+    assert(it != m_context_metadata.end());
+
+    metadata = it->second;
+    m_context_metadata.erase(it);
+  }
+
+  // TODO
+  ctx->complete(0);
+
+  {
+    Mutex::Locker locker(m_lock);
+    if (metadata.write_op) {
+      assert(m_in_progress_writes > 0);
+      if (--m_in_progress_writes == 0) {
+        m_cond.Signal();
+      }
+    }
+  }
+}
+
+void AioImageRequestWQ::queue(Context *ctx, const Metadata &metadata) {
+  {
+    Mutex::Locker locker(m_lock);
+    m_context_metadata[ctx] = metadata;
+  }
+  ThreadPool::PointerWQ<Context>::queue(ctx);
+}
+
+} // namespace librbd
diff --git a/src/librbd/AioImageRequestWQ.h b/src/librbd/AioImageRequestWQ.h
new file mode 100644 (file)
index 0000000..d4abfa9
--- /dev/null
@@ -0,0 +1,83 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
+#define CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
+
+#include "common/WorkQueue.h"
+#include "common/Mutex.h"
+#include "include/unordered_map.h"
+
+namespace librbd {
+
+class AioCompletion;
+class ImageCtx;
+
+class AioImageRequestWQ : protected ThreadPool::PointerWQ<Context> {
+public:
+  AioImageRequestWQ(const string &name, time_t ti, ThreadPool *tp)
+    : ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
+      m_lock("AioImageRequestWQ::m_lock"), m_writes_suspended(false),
+      m_in_progress_writes(0) {
+  }
+
+  void aio_read(ImageCtx *ictx, uint64_t off, size_t len, char *buf,
+                bufferlist *pbl, AioCompletion *c, int op_flags);
+  void aio_write(ImageCtx *ictx, uint64_t off, size_t len, const char *buf,
+                 AioCompletion *c, int op_flags);
+  void aio_discard(ImageCtx *ictx, uint64_t off, uint64_t len,
+                   AioCompletion *c);
+  void aio_flush(ImageCtx *ictx, AioCompletion *c);
+
+  using ThreadPool::PointerWQ<Context>::drain;
+
+  bool writes_empty() const;
+  inline bool writes_suspended() const {
+    Mutex::Locker locker(m_lock);
+    return m_writes_suspended;
+  }
+
+  void suspend_writes() {
+    Mutex::Locker locker(m_lock);
+    while (m_in_progress_writes > 0) {
+      m_cond.Wait(m_lock);
+    }
+  }
+
+  void resume_writes() {
+    {
+      Mutex::Locker locker(m_lock);
+      m_writes_suspended = false;
+    }
+    signal();
+  }
+protected:
+  virtual void _clear() {
+    ThreadPool::PointerWQ<Context>::_clear();
+    m_context_metadata.clear();
+  }
+
+  virtual void *_void_dequeue();
+  virtual void process(Context *ctx);
+private:
+  struct Metadata {
+    bool write_op;
+    AioCompletion *aio_comp;
+
+    Metadata() : write_op(false), aio_comp(NULL) {}
+    Metadata(bool _write_op, AioCompletion *_aio_comp)
+      : write_op(_write_op), aio_comp(_aio_comp) {}
+  };
+
+  mutable Mutex m_lock;
+  Cond m_cond;
+  bool m_writes_suspended;
+  uint32_t m_in_progress_writes;
+  ceph::unordered_map<Context *, Metadata> m_context_metadata;
+
+  void queue(Context *ctx, const Metadata &metadata);
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
index 0a2808caa1d555c48be9f6ef6833c405fb156d1e..ac2e3300125b5d575f641c1f8245977180260c52 100644 (file)
@@ -8,7 +8,9 @@
 #include "common/dout.h"
 #include "common/errno.h"
 #include "common/perf_counters.h"
+#include "common/WorkQueue.h"
 
+#include "librbd/AioImageRequestWQ.h"
 #include "librbd/AsyncOperation.h"
 #include "librbd/AsyncRequest.h"
 #include "librbd/AsyncResizeRequest.h"
@@ -99,9 +101,9 @@ public:
     ThreadPoolSingleton *thread_pool_singleton;
     cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
       thread_pool_singleton, "librbd::thread_pool");
-    aio_work_queue = new ContextWQ("librbd::aio_work_queue",
-                                   cct->_conf->rbd_op_thread_timeout,
-                                   thread_pool_singleton);
+    aio_work_queue = new AioImageRequestWQ("librbd::aio_work_queue",
+                                           cct->_conf->rbd_op_thread_timeout,
+                                           thread_pool_singleton);
     op_work_queue = new ContextWQ("librbd::op_work_queue",
                                   cct->_conf->rbd_op_thread_timeout,
                                   thread_pool_singleton);
index bfddf80169736873576d0020a7e98c364e815738..4fd1112e112d992ef6c09842744c08ea797fda1c 100644 (file)
@@ -16,7 +16,6 @@
 #include "common/Readahead.h"
 #include "common/RWLock.h"
 #include "common/snap_types.h"
-#include "common/WorkQueue.h"
 #include "include/atomic.h"
 #include "include/buffer.h"
 #include "include/rbd/librbd.hpp"
 #include "librbd/parent_types.h"
 
 class CephContext;
+class ContextWQ;
 class Finisher;
 class PerfCounters;
 
 namespace librbd {
 
+  class AioImageRequestWQ;
   class AsyncOperation;
   template <typename ImageCtxT> class AsyncRequest;
   class AsyncResizeRequest;
@@ -132,7 +133,7 @@ namespace librbd {
 
     xlist<AsyncResizeRequest*> async_resize_reqs;
 
-    ContextWQ *aio_work_queue;
+    AioImageRequestWQ *aio_work_queue;
     ContextWQ *op_work_queue;
 
     // Configuration
index aa0fee52cca769d13fd9d7eb25fa0edbd41116de..29d45b34ca5ab45163ae302148fcdf5645f55733 100644 (file)
@@ -8,6 +8,7 @@ if WITH_RBD
 
 librbd_internal_la_SOURCES = \
        librbd/AioCompletion.cc \
+       librbd/AioImageRequestWQ.cc \
        librbd/AioObjectRequest.cc \
        librbd/AsyncFlattenRequest.cc \
        librbd/AsyncObjectThrottle.cc \
@@ -49,6 +50,7 @@ lib_LTLIBRARIES += librbd.la
 
 noinst_HEADERS += \
        librbd/AioCompletion.h \
+       librbd/AioImageRequestWQ.h \
        librbd/AioObjectRequest.h \
        librbd/AsyncFlattenRequest.h \
        librbd/AsyncObjectThrottle.h \
index e1b3a723a6fea268ae985f3bb102194206b800fe..78fb8235a853baac0206494c98545bfed5a6bc42 100644 (file)
@@ -10,6 +10,7 @@
 #include "common/errno.h"
 #include "common/ContextCompletion.h"
 #include "common/Throttle.h"
+#include "common/WorkQueue.h"
 #include "cls/lock/cls_lock_client.h"
 #include "include/stringify.h"
 
@@ -17,6 +18,7 @@
 #include "cls/rbd/cls_rbd_client.h"
 
 #include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequestWQ.h"
 #include "librbd/AioObjectRequest.h"
 #include "librbd/AsyncFlattenRequest.h"
 #include "librbd/AsyncResizeRequest.h"
@@ -2535,6 +2537,7 @@ reprotect_and_return_err:
     {
       Mutex::Locker cache_locker(ictx->cache_lock);
       RWLock::WLocker snap_locker(ictx->snap_lock);
+
       {
        int r;
        RWLock::WLocker parent_locker(ictx->parent_lock);
@@ -3078,6 +3081,8 @@ reprotect_and_return_err:
       }
     }
 
+    assert(!ictx->aio_work_queue->writes_suspended() ||
+           ictx->aio_work_queue->writes_empty());
     ictx->aio_work_queue->drain();
     ictx->cancel_async_requests();
     ictx->flush_async_operations();
index d3c4b02552ccfaff1a943bdeb0a6f6da76e0dd4b..b7345f9c7547efd429f245121cd9be8768bafa0a 100644 (file)
@@ -26,6 +26,7 @@
 #include "osdc/ObjectCacher.h"
 
 #include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequestWQ.h"
 #include "cls/rbd/cls_rbd_client.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/internal.h"
@@ -60,121 +61,6 @@ namespace {
 
 TracepointProvider::Traits tracepoint_traits("librbd_tp.so", "rbd_tracing");
 
-class C_AioReadWQ : public Context {
-public:
-  C_AioReadWQ(librbd::ImageCtx *ictx, uint64_t off, size_t len,
-              char *buf, bufferlist *pbl, librbd::AioCompletion *c,
-              int op_flags)
-    : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_pbl(pbl), m_comp(c),
-      m_op_flags(op_flags) {
-  }
-protected:
-  virtual void finish(int r) {
-    librbd::aio_read(m_ictx, m_off, m_len, m_buf, m_pbl, m_comp, m_op_flags);
-  }
-private:
-  librbd::ImageCtx *m_ictx;
-  uint64_t m_off;
-  uint64_t m_len;
-  char *m_buf;
-  bufferlist *m_pbl;
-  librbd::AioCompletion *m_comp;
-  int m_op_flags;
-};
-
-class C_AioWriteWQ : public Context {
-public:
-  C_AioWriteWQ(librbd::ImageCtx *ictx, uint64_t off, size_t len,
-               const char *buf, librbd::AioCompletion *c, int op_flags)
-    : m_ictx(ictx), m_off(off), m_len(len), m_buf(buf), m_comp(c),
-      m_op_flags(op_flags) {
-  }
-protected:
-  virtual void finish(int r) {
-    librbd::aio_write(m_ictx, m_off, m_len, m_buf, m_comp, m_op_flags);
-  }
-private:
-  librbd::ImageCtx *m_ictx;
-  uint64_t m_off;
-  uint64_t m_len;
-  const char *m_buf;
-  librbd::AioCompletion *m_comp;
-  int m_op_flags;
-};
-
-class C_AioDiscardWQ : public Context {
-public:
-  C_AioDiscardWQ(librbd::ImageCtx *ictx, uint64_t off, uint64_t len,
-                 librbd::AioCompletion *c)
-    : m_ictx(ictx), m_off(off), m_len(len), m_comp(c) {
-  }
-protected:
-  virtual void finish(int r) {
-    librbd::aio_discard(m_ictx, m_off, m_len, m_comp);
-  }
-private:
-  librbd::ImageCtx *m_ictx;
-  uint64_t m_off;
-  uint64_t m_len;
-  librbd::AioCompletion *m_comp;
-};
-
-class C_AioFlushWQ : public Context {
-public:
-  C_AioFlushWQ(librbd::ImageCtx *ictx, librbd::AioCompletion *c)
-    : m_ictx(ictx), m_comp(c) {
-  }
-protected:
-  virtual void finish(int r) {
-    librbd::aio_flush(m_ictx, m_comp);
-  }
-private:
-  librbd::ImageCtx *m_ictx;
-  librbd::AioCompletion *m_comp;
-};
-
-void submit_aio_read(librbd::ImageCtx *ictx, uint64_t off, size_t len,
-                     char *buf, bufferlist *pbl, librbd::AioCompletion *c,
-                     int op_flags) {
-  c->init_time(ictx, librbd::AIO_TYPE_READ);
-  if (ictx->non_blocking_aio) {
-    ictx->aio_work_queue->queue(new C_AioReadWQ(ictx, off, len, buf, pbl, c,
-                                                op_flags));
-  } else {
-    librbd::aio_read(ictx, off, len, buf, pbl, c, op_flags);
-  }
-}
-
-void submit_aio_write(librbd::ImageCtx *ictx, uint64_t off, size_t len,
-                      const char *buf, librbd::AioCompletion *c, int op_flags) {
-  c->init_time(ictx, librbd::AIO_TYPE_WRITE);
-  if (ictx->non_blocking_aio) {
-    ictx->aio_work_queue->queue(new C_AioWriteWQ(ictx, off, len, buf, c,
-                                                 op_flags));
-  } else {
-    librbd::aio_write(ictx, off, len, buf, c, op_flags);
-  }
-}
-
-void submit_aio_discard(librbd::ImageCtx *ictx, uint64_t off, uint64_t len,
-                        librbd::AioCompletion *c) {
-  c->init_time(ictx, librbd::AIO_TYPE_DISCARD);
-  if (ictx->non_blocking_aio) {
-    ictx->aio_work_queue->queue(new C_AioDiscardWQ(ictx, off, len, c));
-  } else {
-    librbd::aio_discard(ictx, off, len, c);
-  }
-}
-
-void submit_aio_flush(librbd::ImageCtx *ictx, librbd::AioCompletion *c) {
-  c->init_time(ictx, librbd::AIO_TYPE_FLUSH);
-  if (ictx->non_blocking_aio) {
-    ictx->aio_work_queue->queue(new C_AioFlushWQ(ictx, c));
-  } else {
-    librbd::aio_flush(ictx, c);
-  }
-}
-
 CephContext* get_cct(IoCtx &io_ctx) {
   return reinterpret_cast<CephContext*>(io_ctx.cct());
 }
@@ -1011,7 +897,8 @@ namespace librbd {
       tracepoint(librbd, aio_write_exit, -EINVAL);
       return -EINVAL;
     }
-    submit_aio_write(ictx, off, len, bl.c_str(), get_aio_completion(c), 0);
+    ictx->aio_work_queue->aio_write(ictx, off, len, bl.c_str(),
+                                    get_aio_completion(c), 0);
     tracepoint(librbd, aio_write_exit, 0);
     return 0;
   }
@@ -1026,8 +913,8 @@ namespace librbd {
       tracepoint(librbd, aio_write_exit, -EINVAL);
       return -EINVAL;
     }
-    submit_aio_write(ictx, off, len, bl.c_str(), get_aio_completion(c),
-                     op_flags);
+    ictx->aio_work_queue->aio_write(ictx, off, len, bl.c_str(),
+                                    get_aio_completion(c), op_flags);
     tracepoint(librbd, aio_write_exit, 0);
     return 0;
   }
@@ -1036,7 +923,7 @@ namespace librbd {
   {
     ImageCtx *ictx = (ImageCtx *)ctx;
     tracepoint(librbd, aio_discard_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, off, len, c->pc);
-    submit_aio_discard(ictx, off, len, get_aio_completion(c));
+    ictx->aio_work_queue->aio_discard(ictx, off, len, get_aio_completion(c));
     tracepoint(librbd, aio_discard_exit, 0);
     return 0;
   }
@@ -1048,7 +935,8 @@ namespace librbd {
     tracepoint(librbd, aio_read_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, off, len, bl.c_str(), c->pc);
     ldout(ictx->cct, 10) << "Image::aio_read() buf=" << (void *)bl.c_str() << "~"
                         << (void *)(bl.c_str() + len - 1) << dendl;
-    submit_aio_read(ictx, off, len, NULL, &bl, get_aio_completion(c), 0);
+    ictx->aio_work_queue->aio_read(ictx, off, len, NULL, &bl,
+                                   get_aio_completion(c), 0);
     tracepoint(librbd, aio_read_exit, 0);
     return 0;
   }
@@ -1061,7 +949,8 @@ namespace librbd {
                ictx->read_only, off, len, bl.c_str(), c->pc, op_flags);
     ldout(ictx->cct, 10) << "Image::aio_read() buf=" << (void *)bl.c_str() << "~"
                         << (void *)(bl.c_str() + len - 1) << dendl;
-    submit_aio_read(ictx, off, len, NULL, &bl, get_aio_completion(c), op_flags);
+    ictx->aio_work_queue->aio_read(ictx, off, len, NULL, &bl,
+                                   get_aio_completion(c), op_flags);
     tracepoint(librbd, aio_read_exit, 0);
     return 0;
   }
@@ -1079,7 +968,7 @@ namespace librbd {
   {
     ImageCtx *ictx = (ImageCtx *)ctx;
     tracepoint(librbd, aio_flush_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, c->pc);
-    submit_aio_flush(ictx, get_aio_completion(c));
+    ictx->aio_work_queue->aio_flush(ictx, get_aio_completion(c));
     tracepoint(librbd, aio_flush_exit, 0);
     return 0;
   }
@@ -2140,7 +2029,8 @@ extern "C" int rbd_aio_write(rbd_image_t image, uint64_t off, size_t len,
   librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
   librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
   tracepoint(librbd, aio_write_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, off, len, buf, comp->pc);
-  submit_aio_write(ictx, off, len, buf, get_aio_completion(comp), 0);
+  ictx->aio_work_queue->aio_write(ictx, off, len, buf,
+                                  get_aio_completion(comp), 0);
   tracepoint(librbd, aio_write_exit, 0);
   return 0;
 }
@@ -2152,7 +2042,8 @@ extern "C" int rbd_aio_write2(rbd_image_t image, uint64_t off, size_t len,
   librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
   tracepoint(librbd, aio_write2_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(),
              ictx->read_only, off, len, buf, comp->pc, op_flags);
-  submit_aio_write(ictx, off, len, buf, get_aio_completion(comp), op_flags);
+  ictx->aio_work_queue->aio_write(ictx, off, len, buf, get_aio_completion(comp),
+                                  op_flags);
   tracepoint(librbd, aio_write_exit, 0);
   return 0;
 }
@@ -2164,7 +2055,7 @@ extern "C" int rbd_aio_discard(rbd_image_t image, uint64_t off, uint64_t len,
   librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
   librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
   tracepoint(librbd, aio_discard_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, off, len, comp->pc);
-  submit_aio_discard(ictx, off, len, get_aio_completion(comp));
+  ictx->aio_work_queue->aio_discard(ictx, off, len, get_aio_completion(comp));
   tracepoint(librbd, aio_discard_exit, 0);
   return 0;
 }
@@ -2175,7 +2066,8 @@ extern "C" int rbd_aio_read(rbd_image_t image, uint64_t off, size_t len,
   librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
   librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
   tracepoint(librbd, aio_read_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, off, len, buf, comp->pc);
-  submit_aio_read(ictx, off, len, buf, NULL, get_aio_completion(comp), 0);
+  ictx->aio_work_queue->aio_read(ictx, off, len, buf, NULL,
+                                 get_aio_completion(comp), 0);
   tracepoint(librbd, aio_read_exit, 0);
   return 0;
 }
@@ -2187,8 +2079,8 @@ extern "C" int rbd_aio_read2(rbd_image_t image, uint64_t off, size_t len,
   librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
   tracepoint(librbd, aio_read2_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(),
              ictx->read_only, off, len, buf, comp->pc, op_flags);
-  submit_aio_read(ictx, off, len, buf, NULL, get_aio_completion(comp),
-                  op_flags);
+  ictx->aio_work_queue->aio_read(ictx, off, len, buf, NULL,
+                                 get_aio_completion(comp), op_flags);
   tracepoint(librbd, aio_read_exit, 0);
   return 0;
 }
@@ -2207,7 +2099,7 @@ extern "C" int rbd_aio_flush(rbd_image_t image, rbd_completion_t c)
   librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
   librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
   tracepoint(librbd, aio_flush_enter, ictx, ictx->name.c_str(), ictx->snap_name.c_str(), ictx->read_only, comp->pc);
-  submit_aio_flush(ictx, get_aio_completion(comp));
+  ictx->aio_work_queue->aio_flush(ictx, get_aio_completion(comp));
   tracepoint(librbd, aio_flush_exit, 0);
   return 0;
 }