From 8d3bde88ace8196837c5ada636abc9a5c394128b Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Thu, 30 Apr 2020 13:07:33 -0400 Subject: [PATCH] librbd: queue image IO dispatch layer This layer replaces the queue implemented by ImageRequestWQ and handles blocking all write requests through the dispatch pipeline. Signed-off-by: Jason Dillaman --- src/librbd/CMakeLists.txt | 1 + src/librbd/exclusive_lock/ImageDispatch.cc | 5 +- src/librbd/io/ImageDispatcher.cc | 29 ++ src/librbd/io/ImageDispatcher.h | 9 + src/librbd/io/ImageDispatcherInterface.h | 7 + src/librbd/io/ImageRequestWQ.cc | 7 - src/librbd/io/ImageRequestWQ.h | 1 - src/librbd/io/QueueImageDispatch.cc | 264 ++++++++++++++++++ src/librbd/io/QueueImageDispatch.h | 111 ++++++++ src/test/librbd/mock/io/MockImageDispatcher.h | 7 + 10 files changed, 431 insertions(+), 10 deletions(-) create mode 100644 src/librbd/io/QueueImageDispatch.cc create mode 100644 src/librbd/io/QueueImageDispatch.h diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt index 0ecd0258c93..ff6a6990807 100644 --- a/src/librbd/CMakeLists.txt +++ b/src/librbd/CMakeLists.txt @@ -84,6 +84,7 @@ set(librbd_internal_srcs io/ObjectDispatcher.cc io/ObjectRequest.cc io/QosImageDispatch.cc + io/QueueImageDispatch.cc io/ReadResult.cc io/RefreshImageDispatch.cc io/SimpleSchedulerObjectDispatch.cc diff --git a/src/librbd/exclusive_lock/ImageDispatch.cc b/src/librbd/exclusive_lock/ImageDispatch.cc index f119d60af2c..93c675e74f6 100644 --- a/src/librbd/exclusive_lock/ImageDispatch.cc +++ b/src/librbd/exclusive_lock/ImageDispatch.cc @@ -9,6 +9,7 @@ #include "librbd/ImageCtx.h" #include "librbd/Utils.h" #include "librbd/exclusive_lock/Policy.h" +#include "librbd/io/ImageDispatcherInterface.h" #include "librbd/io/ImageRequestWQ.h" #define dout_subsys ceph_subsys_rbd @@ -45,7 +46,7 @@ void ImageDispatch::set_require_lock(io::Direction direction, // TODO if (blocked) { std::shared_lock owner_locker{m_image_ctx->owner_lock}; - m_image_ctx->io_work_queue->block_writes(on_finish); + m_image_ctx->io_image_dispatcher->block_writes(on_finish); } else { on_finish->complete(0); } @@ -57,7 +58,7 @@ void ImageDispatch::unset_require_lock(io::Direction direction) { // TODO if (unblocked) { - m_image_ctx->io_work_queue->unblock_writes(); + m_image_ctx->io_image_dispatcher->unblock_writes(); } } diff --git a/src/librbd/io/ImageDispatcher.cc b/src/librbd/io/ImageDispatcher.cc index d48016d8c87..b9e00e15c19 100644 --- a/src/librbd/io/ImageDispatcher.cc +++ b/src/librbd/io/ImageDispatcher.cc @@ -11,6 +11,7 @@ #include "librbd/io/ImageDispatch.h" #include "librbd/io/ImageDispatchInterface.h" #include "librbd/io/ImageDispatchSpec.h" +#include "librbd/io/QueueImageDispatch.h" #include "librbd/io/QosImageDispatch.h" #include "librbd/io/RefreshImageDispatch.h" #include @@ -106,6 +107,9 @@ ImageDispatcher::ImageDispatcher(I* image_ctx) auto image_dispatch = new ImageDispatch(image_ctx); this->register_dispatch(image_dispatch); + m_queue_image_dispatch = new QueueImageDispatch(image_ctx); + this->register_dispatch(m_queue_image_dispatch); + m_qos_image_dispatch = new QosImageDispatch(image_ctx); this->register_dispatch(m_qos_image_dispatch); @@ -124,6 +128,31 @@ void ImageDispatcher::apply_qos_limit(uint64_t flag, uint64_t limit, m_qos_image_dispatch->apply_qos_limit(flag, limit, burst); } +template +bool ImageDispatcher::writes_blocked() const { + return m_queue_image_dispatch->writes_blocked(); +} + +template +int ImageDispatcher::block_writes() { + return m_queue_image_dispatch->block_writes(); +} + +template +void ImageDispatcher::block_writes(Context *on_blocked) { + m_queue_image_dispatch->block_writes(on_blocked); +} + +template +void ImageDispatcher::unblock_writes() { + m_queue_image_dispatch->unblock_writes(); +} + +template +void ImageDispatcher::wait_on_writes_unblocked(Context *on_unblocked) { + m_queue_image_dispatch->wait_on_writes_unblocked(on_unblocked); +} + template void ImageDispatcher::finish(int r, ImageDispatchLayer image_dispatch_layer, uint64_t tid) { diff --git a/src/librbd/io/ImageDispatcher.h b/src/librbd/io/ImageDispatcher.h index 95ea6866a98..b1de734be28 100644 --- a/src/librbd/io/ImageDispatcher.h +++ b/src/librbd/io/ImageDispatcher.h @@ -21,6 +21,7 @@ struct ImageCtx; namespace io { +template struct QueueImageDispatch; template struct QosImageDispatch; template @@ -31,6 +32,13 @@ public: void apply_qos_schedule_tick_min(uint64_t tick) override; void apply_qos_limit(uint64_t flag, uint64_t limit, uint64_t burst) override; + bool writes_blocked() const override; + int block_writes() override; + void block_writes(Context *on_blocked) override; + + void unblock_writes() override; + void wait_on_writes_unblocked(Context *on_unblocked) override; + void finish(int r, ImageDispatchLayer image_dispatch_layer, uint64_t tid) override; @@ -42,6 +50,7 @@ protected: private: struct SendVisitor; + QueueImageDispatch* m_queue_image_dispatch = nullptr; QosImageDispatch* m_qos_image_dispatch = nullptr; }; diff --git a/src/librbd/io/ImageDispatcherInterface.h b/src/librbd/io/ImageDispatcherInterface.h index 60d0c25f91c..a0a1e6dd4a8 100644 --- a/src/librbd/io/ImageDispatcherInterface.h +++ b/src/librbd/io/ImageDispatcherInterface.h @@ -21,6 +21,13 @@ public: virtual void apply_qos_limit(uint64_t flag, uint64_t limit, uint64_t burst) = 0; + virtual bool writes_blocked() const = 0; + virtual int block_writes() = 0; + virtual void block_writes(Context *on_blocked) = 0; + + virtual void unblock_writes() = 0; + virtual void wait_on_writes_unblocked(Context *on_unblocked) = 0; + virtual void finish(int r, ImageDispatchLayer image_dispatch_layer, uint64_t tid) = 0; }; diff --git a/src/librbd/io/ImageRequestWQ.cc b/src/librbd/io/ImageRequestWQ.cc index 13e6cd70919..968a54ce775 100644 --- a/src/librbd/io/ImageRequestWQ.cc +++ b/src/librbd/io/ImageRequestWQ.cc @@ -627,13 +627,6 @@ void ImageRequestWQ::shut_down(Context *on_shutdown) { flush_image(m_image_ctx, on_shutdown); } -template -int ImageRequestWQ::block_writes() { - C_SaferCond cond_ctx; - block_writes(&cond_ctx); - return cond_ctx.wait(); -} - template void ImageRequestWQ::block_writes(Context *on_blocked) { ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); diff --git a/src/librbd/io/ImageRequestWQ.h b/src/librbd/io/ImageRequestWQ.h index 1a115441073..1783d340549 100644 --- a/src/librbd/io/ImageRequestWQ.h +++ b/src/librbd/io/ImageRequestWQ.h @@ -65,7 +65,6 @@ public: return (m_write_blockers > 0); } - int block_writes(); void block_writes(Context *on_blocked); void unblock_writes(); diff --git a/src/librbd/io/QueueImageDispatch.cc b/src/librbd/io/QueueImageDispatch.cc new file mode 100644 index 00000000000..23a087d8dcc --- /dev/null +++ b/src/librbd/io/QueueImageDispatch.cc @@ -0,0 +1,264 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/io/QueueImageDispatch.h" +#include "common/dout.h" +#include "common/Cond.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "librbd/io/AioCompletion.h" +#include "librbd/io/ImageDispatchSpec.h" +#include "librbd/io/ImageRequestWQ.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::io::QueueImageDispatch: " << this \ + << " " << __func__ << ": " + +namespace librbd { +namespace io { + +template +QueueImageDispatch::QueueImageDispatch(I* image_ctx) + : m_image_ctx(image_ctx), + m_lock(ceph::make_shared_mutex( + util::unique_lock_name("librbd::io::QueueImageDispatch::m_lock", + this))) { + auto cct = m_image_ctx->cct; + ldout(cct, 5) << "ictx=" << image_ctx << dendl; +} + +template +void QueueImageDispatch::shut_down(Context* on_finish) { + on_finish->complete(0); +} + +template +int QueueImageDispatch::block_writes() { + C_SaferCond cond_ctx; + block_writes(&cond_ctx); + return cond_ctx.wait(); +} + +template +void QueueImageDispatch::block_writes(Context *on_blocked) { + ceph_assert(ceph_mutex_is_locked(m_image_ctx->owner_lock)); + auto cct = m_image_ctx->cct; + + // TODO temp + auto ctx = new C_Gather(cct, on_blocked); + m_image_ctx->io_work_queue->block_writes(ctx->new_sub()); + + { + std::unique_lock locker{m_lock}; + ++m_write_blockers; + ldout(cct, 5) << m_image_ctx << ", " + << "num=" << m_write_blockers << dendl; + if (!m_write_blocker_contexts.empty() || !m_in_flight_write_tids.empty()) { + ldout(cct, 5) << "waiting for in-flight writes to complete: " + << "write_tids=" << m_in_flight_write_tids << dendl; + m_write_blocker_contexts.push_back(ctx->new_sub()); + ctx->activate(); + return; + } + } + + // ensure that all in-flight IO is flushed + flush_image(ctx->new_sub()); + ctx->activate(); +}; + +template +void QueueImageDispatch::unblock_writes() { + auto cct = m_image_ctx->cct; + + Contexts waiter_contexts; + Contexts dispatch_contexts; + { + std::unique_lock locker{m_lock}; + ceph_assert(m_write_blockers > 0); + --m_write_blockers; + + ldout(cct, 5) << m_image_ctx << ", " + << "num=" << m_write_blockers << dendl; + if (m_write_blockers == 0) { + std::swap(waiter_contexts, m_unblocked_write_waiter_contexts); + std::swap(dispatch_contexts, m_on_dispatches); + } + } + + for (auto ctx : waiter_contexts) { + ctx->complete(0); + } + + for (auto ctx : dispatch_contexts) { + ctx->complete(0); + } + + // TODO temp + m_image_ctx->io_work_queue->unblock_writes(); +} + +template +void QueueImageDispatch::wait_on_writes_unblocked(Context *on_unblocked) { + ceph_assert(ceph_mutex_is_locked(m_image_ctx->owner_lock)); + auto cct = m_image_ctx->cct; + + // TODO temp + auto ctx = new C_Gather(cct, on_unblocked); + m_image_ctx->io_work_queue->wait_on_writes_unblocked(ctx->new_sub()); + + { + std::unique_lock locker{m_lock}; + ldout(cct, 20) << m_image_ctx << ", " + << "write_blockers=" << m_write_blockers << dendl; + if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) { + m_unblocked_write_waiter_contexts.push_back(ctx->new_sub()); + ctx->activate(); + return; + } + } + + ctx->activate(); +} + +template +bool QueueImageDispatch::read( + AioCompletion* aio_comp, Extents &&image_extents, ReadResult &&read_result, + int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + return enqueue(true, tid, dispatch_result, on_dispatched); +} + +template +bool QueueImageDispatch::write( + AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl, + int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + return enqueue(false, tid, dispatch_result, on_dispatched); +} + +template +bool QueueImageDispatch::discard( + AioCompletion* aio_comp, Extents &&image_extents, + uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace, + uint64_t tid, std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + return enqueue(false, tid, dispatch_result, on_dispatched); +} + +template +bool QueueImageDispatch::write_same( + AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl, + int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + return enqueue(false, tid, dispatch_result, on_dispatched); +} + +template +bool QueueImageDispatch::compare_and_write( + AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&cmp_bl, + bufferlist &&bl, uint64_t *mismatch_offset, int op_flags, + const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + return enqueue(false, tid, dispatch_result, on_dispatched); +} + +template +bool QueueImageDispatch::flush( + AioCompletion* aio_comp, FlushSource flush_source, + const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "tid=" << tid << dendl; + + if (flush_source != FLUSH_SOURCE_USER) { + return false; + } + + return enqueue(false, tid, dispatch_result, on_dispatched); +} + +template +void QueueImageDispatch::handle_finished(int r, uint64_t tid) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "r=" << r << ", tid=" << tid << dendl; + + std::unique_lock locker{m_lock}; + auto it = m_in_flight_write_tids.find(tid); + if (it == m_in_flight_write_tids.end()) { + // assumed to be a read op + return; + } + m_in_flight_write_tids.erase(it); + + Contexts write_blocker_contexts; + if (m_in_flight_write_tids.empty()) { + std::swap(write_blocker_contexts, m_write_blocker_contexts); + } + locker.unlock(); + + for (auto ctx : write_blocker_contexts) { + ctx->complete(0); + } +} + +template +bool QueueImageDispatch::enqueue( + bool read_op, uint64_t tid, DispatchResult* dispatch_result, + Context* on_dispatched) { + std::unique_lock locker{m_lock}; + if (!read_op) { + if (m_write_blockers > 0 || !m_on_dispatches.empty()) { + *dispatch_result = DISPATCH_RESULT_RESTART; + m_on_dispatches.push_back(on_dispatched); + return true; + } + + m_in_flight_write_tids.insert(tid); + } + locker.unlock(); + + if (!m_image_ctx->non_blocking_aio) { + return false; + } + + *dispatch_result = DISPATCH_RESULT_CONTINUE; + m_image_ctx->op_work_queue->queue(on_dispatched, 0); + return true; +} + +template +void QueueImageDispatch::flush_image(Context* on_finish) { + auto aio_comp = AioCompletion::create_and_start( + on_finish, util::get_image_ctx(m_image_ctx), librbd::io::AIO_TYPE_FLUSH); + auto req = ImageDispatchSpec::create_flush_request( + *m_image_ctx, IMAGE_DISPATCH_LAYER_QUEUE, aio_comp, FLUSH_SOURCE_INTERNAL, + {}); + req->send(); +} + +} // namespace io +} // namespace librbd + +template class librbd::io::QueueImageDispatch; diff --git a/src/librbd/io/QueueImageDispatch.h b/src/librbd/io/QueueImageDispatch.h new file mode 100644 index 00000000000..bd48f7f8fea --- /dev/null +++ b/src/librbd/io/QueueImageDispatch.h @@ -0,0 +1,111 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_IO_QUEUE_IMAGE_DISPATCH_H +#define CEPH_LIBRBD_IO_QUEUE_IMAGE_DISPATCH_H + +#include "librbd/io/ImageDispatchInterface.h" +#include "include/int_types.h" +#include "include/buffer.h" +#include "common/ceph_mutex.h" +#include "common/zipkin_trace.h" +#include "common/Throttle.h" +#include "librbd/io/ReadResult.h" +#include "librbd/io/Types.h" +#include +#include + +struct Context; + +namespace librbd { + +struct ImageCtx; + +namespace io { + +struct AioCompletion; + +template +class QueueImageDispatch : public ImageDispatchInterface { +public: + QueueImageDispatch(ImageCtxT* image_ctx); + + ImageDispatchLayer get_dispatch_layer() const override { + return IMAGE_DISPATCH_LAYER_QUEUE; + } + + void shut_down(Context* on_finish) override; + + int block_writes(); + void block_writes(Context *on_blocked); + void unblock_writes(); + + inline bool writes_blocked() const { + std::shared_lock locker{m_lock}; + return (m_write_blockers > 0); + } + + void wait_on_writes_unblocked(Context *on_unblocked); + + bool read( + AioCompletion* aio_comp, Extents &&image_extents, + ReadResult &&read_result, int op_flags, + const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) override; + bool write( + AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl, + int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) override; + bool discard( + AioCompletion* aio_comp, Extents &&image_extents, + uint32_t discard_granularity_bytes, + const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) override; + bool write_same( + AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl, + int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) override; + bool compare_and_write( + AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&cmp_bl, + bufferlist &&bl, uint64_t *mismatch_offset, int op_flags, + const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) override; + bool flush( + AioCompletion* aio_comp, FlushSource flush_source, + const ZTracer::Trace &parent_trace, uint64_t tid, + std::atomic* image_dispatch_flags, + DispatchResult* dispatch_result, Context* on_dispatched) override; + + void handle_finished(int r, uint64_t tid) override; + +private: + typedef std::list Contexts; + typedef std::set Tids; + + ImageCtxT* m_image_ctx; + + mutable ceph::shared_mutex m_lock; + Contexts m_on_dispatches; + Tids m_in_flight_write_tids; + + uint32_t m_write_blockers = 0; + Contexts m_write_blocker_contexts; + Contexts m_unblocked_write_waiter_contexts; + + bool enqueue(bool read_op, uint64_t tid, DispatchResult* dispatch_result, + Context* on_dispatched); + void flush_image(Context* on_blocked); + +}; + +} // namespace io +} // namespace librbd + +extern template class librbd::io::QueueImageDispatch; + +#endif // CEPH_LIBRBD_IO_QUEUE_IMAGE_DISPATCH_H diff --git a/src/test/librbd/mock/io/MockImageDispatcher.h b/src/test/librbd/mock/io/MockImageDispatcher.h index bd7b8428e5d..7fe508e3d9e 100644 --- a/src/test/librbd/mock/io/MockImageDispatcher.h +++ b/src/test/librbd/mock/io/MockImageDispatcher.h @@ -29,6 +29,13 @@ public: MOCK_METHOD1(apply_qos_schedule_tick_min, void(uint64_t)); MOCK_METHOD3(apply_qos_limit, void(uint64_t, uint64_t, uint64_t)); + + MOCK_CONST_METHOD0(writes_blocked, bool()); + MOCK_METHOD0(block_writes, int()); + MOCK_METHOD1(block_writes, void(Context*)); + + MOCK_METHOD0(unblock_writes, void()); + MOCK_METHOD1(wait_on_writes_unblocked, void(Context*)); }; } // namespace io -- 2.39.5