From bad4641547ba176f9b605fa4faecf21d850642e4 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Wed, 9 Dec 2015 14:51:18 -0500 Subject: [PATCH] librbd: add shut down support to the AIO work queue Signed-off-by: Jason Dillaman --- src/librbd/AioImageRequestWQ.cc | 132 +++++++++++++++++++++++++------- src/librbd/AioImageRequestWQ.h | 24 ++++-- 2 files changed, 123 insertions(+), 33 deletions(-) diff --git a/src/librbd/AioImageRequestWQ.cc b/src/librbd/AioImageRequestWQ.cc index ef09c2a9e18cb..67a092f597e4c 100644 --- a/src/librbd/AioImageRequestWQ.cc +++ b/src/librbd/AioImageRequestWQ.cc @@ -2,11 +2,13 @@ // vim: ts=8 sw=2 smarttab #include "librbd/AioImageRequestWQ.h" +#include "common/errno.h" #include "librbd/AioCompletion.h" #include "librbd/AioImageRequest.h" #include "librbd/ExclusiveLock.h" #include "librbd/ImageCtx.h" #include "librbd/internal.h" +#include "librbd/Utils.h" #define dout_subsys ceph_subsys_rbd #undef dout_prefix @@ -17,8 +19,10 @@ namespace librbd { AioImageRequestWQ::AioImageRequestWQ(ImageCtx *image_ctx, const string &name, time_t ti, ThreadPool *tp) : ThreadPool::PointerWQ(name, ti, 0, tp), - m_image_ctx(*image_ctx), m_lock("AioImageRequestWQ::m_lock"), - m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0) { + m_image_ctx(*image_ctx), + m_lock(util::unique_lock_name("AioImageRequestWQ::m_lock", this)), + m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0), + m_in_flight_ops(0), m_shutdown(false), m_on_shutdown(nullptr) { CephContext *cct = m_image_ctx.cct; ldout(cct, 5) << this << " " << ": ictx=" << image_ctx << dendl; } @@ -86,34 +90,47 @@ int AioImageRequestWQ::discard(uint64_t off, uint64_t len) { } void AioImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len, - char *buf, bufferlist *pbl, int op_flags, bool native_async) { + char *buf, bufferlist *pbl, int op_flags, + bool native_async) { c->init_time(&m_image_ctx, librbd::AIO_TYPE_READ); CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << "aio_read: ictx=" << &m_image_ctx << ", " << "completion=" << c << ", off=" << off << ", " << "len=" << len << ", " << "flags=" << op_flags << dendl; - if (native_async && m_image_ctx.event_socket.is_valid()) + if (native_async && m_image_ctx.event_socket.is_valid()) { c->set_event_notify(true); + } + + if (!start_in_flight_op(c)) { + return; + } RWLock::RLocker owner_locker(m_image_ctx.owner_lock); - if (m_image_ctx.non_blocking_aio) { + if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags)); } else { AioImageRequest::aio_read(&m_image_ctx, c, off, len, buf, pbl, op_flags); + finish_in_flight_op(); } } void AioImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len, - const char *buf, int op_flags, bool native_async) { + const char *buf, int op_flags, + bool native_async) { c->init_time(&m_image_ctx, librbd::AIO_TYPE_WRITE); CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << "aio_write: ictx=" << &m_image_ctx << ", " << "completion=" << c << ", off=" << off << ", " << "len=" << len << ", flags=" << op_flags << dendl; - if (native_async && m_image_ctx.event_socket.is_valid()) + if (native_async && m_image_ctx.event_socket.is_valid()) { c->set_event_notify(true); + } + + if (!start_in_flight_op(c)) { + return; + } RWLock::RLocker owner_locker(m_image_ctx.owner_lock); if (m_image_ctx.non_blocking_aio || is_journal_required() || @@ -121,6 +138,7 @@ void AioImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len, queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags)); } else { AioImageRequest::aio_write(&m_image_ctx, c, off, len, buf, op_flags); + finish_in_flight_op(); } } @@ -132,8 +150,13 @@ void AioImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off, << "completion=" << c << ", off=" << off << ", len=" << len << dendl; - if (native_async && m_image_ctx.event_socket.is_valid()) + if (native_async && m_image_ctx.event_socket.is_valid()) { c->set_event_notify(true); + } + + if (!start_in_flight_op(c)) { + return; + } RWLock::RLocker owner_locker(m_image_ctx.owner_lock); if (m_image_ctx.non_blocking_aio || is_journal_required() || @@ -141,6 +164,7 @@ void AioImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off, queue(new AioImageDiscard(m_image_ctx, c, off, len)); } else { AioImageRequest::aio_discard(&m_image_ctx, c, off, len); + finish_in_flight_op(); } } @@ -150,8 +174,13 @@ void AioImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) { ldout(cct, 20) << "aio_flush: ictx=" << &m_image_ctx << ", " << "completion=" << c << dendl; - if (native_async && m_image_ctx.event_socket.is_valid()) + if (native_async && m_image_ctx.event_socket.is_valid()) { c->set_event_notify(true); + } + + if (!start_in_flight_op(c)) { + return; + } RWLock::RLocker owner_locker(m_image_ctx.owner_lock); if (m_image_ctx.non_blocking_aio || is_journal_required() || @@ -159,7 +188,26 @@ void AioImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) { queue(new AioImageFlush(m_image_ctx, c)); } else { AioImageRequest::aio_flush(&m_image_ctx, c); + finish_in_flight_op(); + } +} + +void AioImageRequestWQ::shut_down(Context *on_shutdown) { + { + RWLock::WLocker locker(m_lock); + assert(!m_shutdown); + m_shutdown = true; + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ops.read() + << dendl; + if (m_in_flight_ops.read() > 0) { + m_on_shutdown = on_shutdown; + return; + } } + + on_shutdown->complete(0); } void AioImageRequestWQ::block_writes() { @@ -172,11 +220,11 @@ void AioImageRequestWQ::block_writes(Context *on_blocked) { CephContext *cct = m_image_ctx.cct; { - Mutex::Locker locker(m_lock); + RWLock::WLocker locker(m_lock); ++m_write_blockers; ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", " << "num=" << m_write_blockers << dendl; - if (!m_write_blocker_contexts.empty() || m_in_progress_writes > 0) { + if (!m_write_blocker_contexts.empty() || m_in_progress_writes.read() > 0) { m_write_blocker_contexts.push_back(on_blocked); return; } @@ -190,7 +238,7 @@ void AioImageRequestWQ::unblock_writes() { bool wake_up = false; { - Mutex::Locker locker(m_lock); + RWLock::WLocker locker(m_lock); assert(m_write_blockers > 0); --m_write_blockers; @@ -214,11 +262,11 @@ void *AioImageRequestWQ::_void_dequeue() { { if (peek_item->is_write_op()) { - Mutex::Locker locker(m_lock); + RWLock::RLocker locker(m_lock); if (m_write_blockers > 0) { return NULL; } - ++m_in_progress_writes; + m_in_progress_writes.inc(); } } @@ -240,13 +288,14 @@ void AioImageRequestWQ::process(AioImageRequest *req) { bool writes_blocked = false; { - Mutex::Locker locker(m_lock); + RWLock::RLocker locker(m_lock); if (req->is_write_op()) { - assert(m_queued_writes > 0); - --m_queued_writes; + assert(m_queued_writes.read() > 0); + m_queued_writes.dec(); - assert(m_in_progress_writes > 0); - if (--m_in_progress_writes == 0 && !m_write_blocker_contexts.empty()) { + assert(m_in_progress_writes.read() > 0); + if (m_in_progress_writes.dec() == 0 && + !m_write_blocker_contexts.empty()) { writes_blocked = true; } } @@ -257,9 +306,43 @@ void AioImageRequestWQ::process(AioImageRequest *req) { m_image_ctx.flush(new C_BlockedWrites(this)); } delete req; + + finish_in_flight_op(); +} + +int AioImageRequestWQ::start_in_flight_op(AioCompletion *c) { + RWLock::RLocker locker(m_lock); + + if (m_shutdown) { + CephContext *cct = m_image_ctx.cct; + lderr(cct) << "IO received on closed image" << dendl; + + c->get(); + c->fail(cct, -ESHUTDOWN); + return false; + } + + m_in_flight_ops.inc(); + return true; +} + +void AioImageRequestWQ::finish_in_flight_op() { + { + RWLock::RLocker locker(m_lock); + if (m_in_flight_ops.dec() > 0 || !m_shutdown) { + return; + } + } + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << __func__ << ": completing shut down" << dendl; + + assert(m_on_shutdown != nullptr); + m_on_shutdown->complete(0); } bool AioImageRequestWQ::is_journal_required() const { + // TODO eliminate once journal startup state is integrated RWLock::RLocker snap_locker(m_image_ctx.snap_lock); return (m_image_ctx.journal != NULL); } @@ -279,13 +362,10 @@ void AioImageRequestWQ::queue(AioImageRequest *req) { << "req=" << req << dendl; assert(m_image_ctx.owner_lock.is_locked()); - - { - Mutex::Locker locker(m_lock); - if (req->is_write_op()) { - ++m_queued_writes; - } + if (req->is_write_op()) { + m_queued_writes.inc(); } + ThreadPool::PointerWQ::queue(req); if (is_lock_required()) { @@ -296,7 +376,7 @@ void AioImageRequestWQ::queue(AioImageRequest *req) { void AioImageRequestWQ::handle_blocked_writes(int r) { Contexts contexts; { - Mutex::Locker locker(m_lock); + RWLock::WLocker locker(m_lock); contexts.swap(m_write_blocker_contexts); } diff --git a/src/librbd/AioImageRequestWQ.h b/src/librbd/AioImageRequestWQ.h index bb323cbb627ac..0eeb716bc992d 100644 --- a/src/librbd/AioImageRequestWQ.h +++ b/src/librbd/AioImageRequestWQ.h @@ -5,8 +5,9 @@ #define CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H #include "include/Context.h" +#include "include/atomic.h" #include "common/WorkQueue.h" -#include "common/Mutex.h" +#include "common/RWLock.h" namespace librbd { @@ -34,15 +35,17 @@ public: using ThreadPool::PointerWQ::empty; inline bool writes_empty() const { - Mutex::Locker locker(m_lock); - return (m_queued_writes == 0); + RWLock::RLocker locker(m_lock); + return (m_queued_writes.read() == 0); } inline bool writes_blocked() const { - Mutex::Locker locker(m_lock); + RWLock::RLocker locker(m_lock); return (m_write_blockers > 0); } + void shut_down(Context *on_shutdown); + void block_writes(); void block_writes(Context *on_blocked); void unblock_writes(); @@ -66,11 +69,18 @@ private: }; ImageCtx &m_image_ctx; - mutable Mutex m_lock; + mutable RWLock m_lock; Contexts m_write_blocker_contexts; uint32_t m_write_blockers; - uint32_t m_in_progress_writes; - uint32_t m_queued_writes; + atomic_t m_in_progress_writes; + atomic_t m_queued_writes; + atomic_t m_in_flight_ops; + + bool m_shutdown; + Context *m_on_shutdown; + + int start_in_flight_op(AioCompletion *c); + void finish_in_flight_op(); bool is_journal_required() const; bool is_lock_required() const; -- 2.39.5