]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: add shut down support to the AIO work queue
authorJason Dillaman <dillaman@redhat.com>
Wed, 9 Dec 2015 19:51:18 +0000 (14:51 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 15 Dec 2015 01:31:31 +0000 (20:31 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/AioImageRequestWQ.cc
src/librbd/AioImageRequestWQ.h

index ef09c2a9e18cb5510e4557883156a43814dc7162..67a092f597e4ceedeee9efb3e57fcda2612a600a 100644 (file)
@@ -2,11 +2,13 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "librbd/AioImageRequestWQ.h"
+#include "common/errno.h"
 #include "librbd/AioCompletion.h"
 #include "librbd/AioImageRequest.h"
 #include "librbd/ExclusiveLock.h"
 #include "librbd/ImageCtx.h"
 #include "librbd/internal.h"
+#include "librbd/Utils.h"
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -17,8 +19,10 @@ namespace librbd {
 AioImageRequestWQ::AioImageRequestWQ(ImageCtx *image_ctx, const string &name,
                                      time_t ti, ThreadPool *tp)
   : ThreadPool::PointerWQ<AioImageRequest>(name, ti, 0, tp),
-    m_image_ctx(*image_ctx), m_lock("AioImageRequestWQ::m_lock"),
-    m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0) {
+    m_image_ctx(*image_ctx),
+    m_lock(util::unique_lock_name("AioImageRequestWQ::m_lock", this)),
+    m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0),
+    m_in_flight_ops(0), m_shutdown(false), m_on_shutdown(nullptr) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 5) << this << " " << ": ictx=" << image_ctx << dendl;
 }
@@ -86,34 +90,47 @@ int AioImageRequestWQ::discard(uint64_t off, uint64_t len) {
 }
 
 void AioImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
-                                 char *buf, bufferlist *pbl, int op_flags, bool native_async) {
+                                 char *buf, bufferlist *pbl, int op_flags,
+                                 bool native_async) {
   c->init_time(&m_image_ctx, librbd::AIO_TYPE_READ);
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << "aio_read: ictx=" << &m_image_ctx << ", "
                  << "completion=" << c << ", off=" << off << ", "
                  << "len=" << len << ", " << "flags=" << op_flags << dendl;
 
-  if (native_async && m_image_ctx.event_socket.is_valid())
+  if (native_async && m_image_ctx.event_socket.is_valid()) {
     c->set_event_notify(true);
+  }
+
+  if (!start_in_flight_op(c)) {
+    return;
+  }
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
-  if (m_image_ctx.non_blocking_aio) {
+  if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
     queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags));
   } else {
     AioImageRequest::aio_read(&m_image_ctx, c, off, len, buf, pbl, op_flags);
+    finish_in_flight_op();
   }
 }
 
 void AioImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
-                                  const char *buf, int op_flags, bool native_async) {
+                                  const char *buf, int op_flags,
+                                  bool native_async) {
   c->init_time(&m_image_ctx, librbd::AIO_TYPE_WRITE);
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << "aio_write: ictx=" << &m_image_ctx << ", "
                  << "completion=" << c << ", off=" << off << ", "
                  << "len=" << len << ", flags=" << op_flags << dendl;
 
-  if (native_async && m_image_ctx.event_socket.is_valid())
+  if (native_async && m_image_ctx.event_socket.is_valid()) {
     c->set_event_notify(true);
+  }
+
+  if (!start_in_flight_op(c)) {
+    return;
+  }
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
   if (m_image_ctx.non_blocking_aio || is_journal_required() ||
@@ -121,6 +138,7 @@ void AioImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
     queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags));
   } else {
     AioImageRequest::aio_write(&m_image_ctx, c, off, len, buf, op_flags);
+    finish_in_flight_op();
   }
 }
 
@@ -132,8 +150,13 @@ void AioImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off,
                  << "completion=" << c << ", off=" << off << ", len=" << len
                  << dendl;
 
-  if (native_async && m_image_ctx.event_socket.is_valid())
+  if (native_async && m_image_ctx.event_socket.is_valid()) {
     c->set_event_notify(true);
+  }
+
+  if (!start_in_flight_op(c)) {
+    return;
+  }
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
   if (m_image_ctx.non_blocking_aio || is_journal_required() ||
@@ -141,6 +164,7 @@ void AioImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off,
     queue(new AioImageDiscard(m_image_ctx, c, off, len));
   } else {
     AioImageRequest::aio_discard(&m_image_ctx, c, off, len);
+    finish_in_flight_op();
   }
 }
 
@@ -150,8 +174,13 @@ void AioImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) {
   ldout(cct, 20) << "aio_flush: ictx=" << &m_image_ctx << ", "
                  << "completion=" << c << dendl;
 
-  if (native_async && m_image_ctx.event_socket.is_valid())
+  if (native_async && m_image_ctx.event_socket.is_valid()) {
     c->set_event_notify(true);
+  }
+
+  if (!start_in_flight_op(c)) {
+    return;
+  }
 
   RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
   if (m_image_ctx.non_blocking_aio || is_journal_required() ||
@@ -159,7 +188,26 @@ void AioImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) {
     queue(new AioImageFlush(m_image_ctx, c));
   } else {
     AioImageRequest::aio_flush(&m_image_ctx, c);
+    finish_in_flight_op();
+  }
+}
+
+void AioImageRequestWQ::shut_down(Context *on_shutdown) {
+  {
+    RWLock::WLocker locker(m_lock);
+    assert(!m_shutdown);
+    m_shutdown = true;
+
+    CephContext *cct = m_image_ctx.cct;
+    ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ops.read()
+                  << dendl;
+    if (m_in_flight_ops.read() > 0) {
+      m_on_shutdown = on_shutdown;
+      return;
+    }
   }
+
+  on_shutdown->complete(0);
 }
 
 void AioImageRequestWQ::block_writes() {
@@ -172,11 +220,11 @@ void AioImageRequestWQ::block_writes(Context *on_blocked) {
   CephContext *cct = m_image_ctx.cct;
 
   {
-    Mutex::Locker locker(m_lock);
+    RWLock::WLocker locker(m_lock);
     ++m_write_blockers;
     ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
                   << "num=" << m_write_blockers << dendl;
-    if (!m_write_blocker_contexts.empty() || m_in_progress_writes > 0) {
+    if (!m_write_blocker_contexts.empty() || m_in_progress_writes.read() > 0) {
       m_write_blocker_contexts.push_back(on_blocked);
       return;
     }
@@ -190,7 +238,7 @@ void AioImageRequestWQ::unblock_writes() {
 
   bool wake_up = false;
   {
-    Mutex::Locker locker(m_lock);
+    RWLock::WLocker locker(m_lock);
     assert(m_write_blockers > 0);
     --m_write_blockers;
 
@@ -214,11 +262,11 @@ void *AioImageRequestWQ::_void_dequeue() {
 
   {
     if (peek_item->is_write_op()) {
-      Mutex::Locker locker(m_lock);
+      RWLock::RLocker locker(m_lock);
       if (m_write_blockers > 0) {
         return NULL;
       }
-      ++m_in_progress_writes;
+      m_in_progress_writes.inc();
     }
   }
 
@@ -240,13 +288,14 @@ void AioImageRequestWQ::process(AioImageRequest *req) {
 
   bool writes_blocked = false;
   {
-    Mutex::Locker locker(m_lock);
+    RWLock::RLocker locker(m_lock);
     if (req->is_write_op()) {
-      assert(m_queued_writes > 0);
-      --m_queued_writes;
+      assert(m_queued_writes.read() > 0);
+      m_queued_writes.dec();
 
-      assert(m_in_progress_writes > 0);
-      if (--m_in_progress_writes == 0 && !m_write_blocker_contexts.empty()) {
+      assert(m_in_progress_writes.read() > 0);
+      if (m_in_progress_writes.dec() == 0 &&
+          !m_write_blocker_contexts.empty()) {
         writes_blocked = true;
       }
     }
@@ -257,9 +306,43 @@ void AioImageRequestWQ::process(AioImageRequest *req) {
     m_image_ctx.flush(new C_BlockedWrites(this));
   }
   delete req;
+
+  finish_in_flight_op();
+}
+
+int AioImageRequestWQ::start_in_flight_op(AioCompletion *c) {
+  RWLock::RLocker locker(m_lock);
+
+  if (m_shutdown) {
+    CephContext *cct = m_image_ctx.cct;
+    lderr(cct) << "IO received on closed image" << dendl;
+
+    c->get();
+    c->fail(cct, -ESHUTDOWN);
+    return false;
+  }
+
+  m_in_flight_ops.inc();
+  return true;
+}
+
+void AioImageRequestWQ::finish_in_flight_op() {
+  {
+    RWLock::RLocker locker(m_lock);
+    if (m_in_flight_ops.dec() > 0 || !m_shutdown) {
+      return;
+    }
+  }
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 5) << __func__ << ": completing shut down" << dendl;
+
+  assert(m_on_shutdown != nullptr);
+  m_on_shutdown->complete(0);
 }
 
 bool AioImageRequestWQ::is_journal_required() const {
+  // TODO eliminate once journal startup state is integrated
   RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
   return (m_image_ctx.journal != NULL);
 }
@@ -279,13 +362,10 @@ void AioImageRequestWQ::queue(AioImageRequest *req) {
                  << "req=" << req << dendl;
 
   assert(m_image_ctx.owner_lock.is_locked());
-
-  {
-    Mutex::Locker locker(m_lock);
-    if (req->is_write_op()) {
-      ++m_queued_writes;
-    }
+  if (req->is_write_op()) {
+    m_queued_writes.inc();
   }
+
   ThreadPool::PointerWQ<AioImageRequest>::queue(req);
 
   if (is_lock_required()) {
@@ -296,7 +376,7 @@ void AioImageRequestWQ::queue(AioImageRequest *req) {
 void AioImageRequestWQ::handle_blocked_writes(int r) {
   Contexts contexts;
   {
-    Mutex::Locker locker(m_lock);
+    RWLock::WLocker locker(m_lock);
     contexts.swap(m_write_blocker_contexts);
   }
 
index bb323cbb627ac6d24c69f0880b64be8e21fc6643..0eeb716bc992d4c4e2ea10481762e36d8fd2073c 100644 (file)
@@ -5,8 +5,9 @@
 #define CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
 
 #include "include/Context.h"
+#include "include/atomic.h"
 #include "common/WorkQueue.h"
-#include "common/Mutex.h"
+#include "common/RWLock.h"
 
 namespace librbd {
 
@@ -34,15 +35,17 @@ public:
   using ThreadPool::PointerWQ<AioImageRequest>::empty;
 
   inline bool writes_empty() const {
-    Mutex::Locker locker(m_lock);
-    return (m_queued_writes == 0);
+    RWLock::RLocker locker(m_lock);
+    return (m_queued_writes.read() == 0);
   }
 
   inline bool writes_blocked() const {
-    Mutex::Locker locker(m_lock);
+    RWLock::RLocker locker(m_lock);
     return (m_write_blockers > 0);
   }
 
+  void shut_down(Context *on_shutdown);
+
   void block_writes();
   void block_writes(Context *on_blocked);
   void unblock_writes();
@@ -66,11 +69,18 @@ private:
   };
 
   ImageCtx &m_image_ctx;
-  mutable Mutex m_lock;
+  mutable RWLock m_lock;
   Contexts m_write_blocker_contexts;
   uint32_t m_write_blockers;
-  uint32_t m_in_progress_writes;
-  uint32_t m_queued_writes;
+  atomic_t m_in_progress_writes;
+  atomic_t m_queued_writes;
+  atomic_t m_in_flight_ops;
+
+  bool m_shutdown;
+  Context *m_on_shutdown;
+
+  int start_in_flight_op(AioCompletion *c);
+  void finish_in_flight_op();
 
   bool is_journal_required() const;
   bool is_lock_required() const;