From: Jason Dillaman Date: Fri, 8 May 2020 03:03:08 +0000 (-0400) Subject: librbd: remove io::ImageRequestWQ X-Git-Tag: v16.1.0~2318^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=106a6490d807058c092ce825cbf134dbe321d178;p=ceph.git librbd: remove io::ImageRequestWQ This has been fully deprecated in favor of the new plugable image dispatcher system. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt index 746e1a6cdb28..5deb68071bc9 100644 --- a/src/librbd/CMakeLists.txt +++ b/src/librbd/CMakeLists.txt @@ -80,7 +80,6 @@ set(librbd_internal_srcs io/ImageDispatchSpec.cc io/ImageDispatcher.cc io/ImageRequest.cc - io/ImageRequestWQ.cc io/ObjectDispatch.cc io/ObjectDispatchSpec.cc io/ObjectDispatcher.cc diff --git a/src/librbd/ExclusiveLock.cc b/src/librbd/ExclusiveLock.cc index ae43941512d0..e3838538f4f2 100644 --- a/src/librbd/ExclusiveLock.cc +++ b/src/librbd/ExclusiveLock.cc @@ -10,7 +10,6 @@ #include "librbd/exclusive_lock/PostAcquireRequest.h" #include "librbd/exclusive_lock/PreReleaseRequest.h" #include "librbd/io/ImageDispatcherInterface.h" -#include "librbd/io/ImageRequestWQ.h" #include "librbd/Utils.h" #include "common/ceph_mutex.h" #include "common/dout.h" diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index 4a0b884cd345..33a220d4c19c 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -29,7 +29,6 @@ #include "librbd/io/AioCompletion.h" #include "librbd/io/AsyncOperation.h" #include "librbd/io/ImageDispatcher.h" -#include "librbd/io/ImageRequestWQ.h" #include "librbd/io/ObjectDispatcher.h" #include "librbd/io/QosImageDispatch.h" #include "librbd/journal/StandardPolicy.h" @@ -123,7 +122,7 @@ public: state(new ImageState<>(this)), operations(new Operations<>(*this)), exclusive_lock(nullptr), object_map(nullptr), - io_work_queue(nullptr), op_work_queue(nullptr), + op_work_queue(nullptr), external_callback_completions(32), event_socket_completions(32), asok_hook(nullptr), @@ -139,10 +138,6 @@ public: ThreadPool *thread_pool; get_thread_pool_instance(cct, &thread_pool, &op_work_queue); - io_work_queue = new io::ImageRequestWQ<>( - this, "librbd::io_work_queue", - cct->_conf.get_val("rbd_op_thread_timeout"), - thread_pool); io_image_dispatcher = new io::ImageDispatcher(this); io_object_dispatcher = new io::ObjectDispatcher(this); @@ -176,14 +171,12 @@ public: if (data_ctx.is_valid()) { data_ctx.aio_flush(); } - io_work_queue->drain(); delete io_object_dispatcher; delete io_image_dispatcher; delete journal_policy; delete exclusive_lock_policy; - delete io_work_queue; delete operations; delete state; } diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h index ef930f57c781..ed47d43a5569 100644 --- a/src/librbd/ImageCtx.h +++ b/src/librbd/ImageCtx.h @@ -58,7 +58,6 @@ namespace librbd { class AioCompletion; class AsyncOperation; template class CopyupRequest; - template class ImageRequestWQ; struct ImageDispatcherInterface; struct ObjectDispatcherInterface; } @@ -181,7 +180,6 @@ namespace librbd { xlist*> resize_reqs; - io::ImageRequestWQ *io_work_queue; io::ImageDispatcherInterface *io_image_dispatcher = nullptr; io::ObjectDispatcherInterface *io_object_dispatcher = nullptr; diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index df459af73e1a..33eafa9745bc 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -14,7 +14,6 @@ #include "journal/Settings.h" #include "journal/Utils.h" #include "librbd/ImageCtx.h" -#include "librbd/io/ImageRequestWQ.h" #include "librbd/io/ObjectDispatchSpec.h" #include "librbd/io/ObjectDispatcherInterface.h" #include "librbd/journal/CreateRequest.h" diff --git a/src/librbd/api/DiffIterate.cc b/src/librbd/api/DiffIterate.cc index a5419e05e831..b34c6ff4afeb 100644 --- a/src/librbd/api/DiffIterate.cc +++ b/src/librbd/api/DiffIterate.cc @@ -9,7 +9,6 @@ #include "librbd/internal.h" #include "librbd/io/AioCompletion.h" #include "librbd/io/ImageDispatchSpec.h" -#include "librbd/io/ImageRequestWQ.h" #include "librbd/object_map/DiffRequest.h" #include "include/rados/librados.hpp" #include "include/interval_set.h" diff --git a/src/librbd/api/Migration.cc b/src/librbd/api/Migration.cc index a5c224182939..dce8c2a280d5 100644 --- a/src/librbd/api/Migration.cc +++ b/src/librbd/api/Migration.cc @@ -31,7 +31,6 @@ #include "librbd/image/RemoveRequest.h" #include "librbd/image/Types.h" #include "librbd/internal.h" -#include "librbd/io/ImageRequestWQ.h" #include "librbd/mirror/DisableRequest.h" #include "librbd/mirror/EnableRequest.h" diff --git a/src/librbd/exclusive_lock/ImageDispatch.cc b/src/librbd/exclusive_lock/ImageDispatch.cc index 70fc0777364a..253ae149ce67 100644 --- a/src/librbd/exclusive_lock/ImageDispatch.cc +++ b/src/librbd/exclusive_lock/ImageDispatch.cc @@ -14,7 +14,6 @@ #include "librbd/io/FlushTracker.h" #include "librbd/io/ImageDispatchSpec.h" #include "librbd/io/ImageDispatcherInterface.h" -#include "librbd/io/ImageRequestWQ.h" #define dout_subsys ceph_subsys_rbd #undef dout_prefix @@ -98,9 +97,6 @@ bool ImageDispatch::set_require_lock(io::Direction direction, bool enabled) { ldout(cct, 20) << "direction=" << direction << ", enabled=" << enabled << dendl; - // TODO remove when ImageRequestWQ is removed - m_image_ctx->io_work_queue->set_require_lock(direction, enabled); - std::unique_lock locker{m_lock}; auto prev_require_lock = (m_require_lock_on_read || m_require_lock_on_write); diff --git a/src/librbd/image/CloseRequest.cc b/src/librbd/image/CloseRequest.cc index 0ddcfb82fa20..5768b6ef7aad 100644 --- a/src/librbd/image/CloseRequest.cc +++ b/src/librbd/image/CloseRequest.cc @@ -13,7 +13,6 @@ #include "librbd/io/AioCompletion.h" #include "librbd/io/ImageDispatcher.h" #include "librbd/io/ImageDispatchSpec.h" -#include "librbd/io/ImageRequestWQ.h" #include "librbd/io/ObjectDispatcherInterface.h" #define dout_subsys ceph_subsys_rbd @@ -82,24 +81,6 @@ void CloseRequest::handle_shut_down_update_watchers(int r) { << dendl; } - send_shut_down_io_queue(); -} - -template -void CloseRequest::send_shut_down_io_queue() { - CephContext *cct = m_image_ctx->cct; - ldout(cct, 10) << this << " " << __func__ << dendl; - - std::shared_lock owner_locker{m_image_ctx->owner_lock}; - m_image_ctx->io_work_queue->shut_down(create_context_callback< - CloseRequest, &CloseRequest::handle_shut_down_io_queue>(this)); -} - -template -void CloseRequest::handle_shut_down_io_queue(int r) { - CephContext *cct = m_image_ctx->cct; - ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl; - send_shut_down_exclusive_lock(); } @@ -228,52 +209,54 @@ void CloseRequest::handle_flush_readahead(int r) { CephContext *cct = m_image_ctx->cct; ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl; - send_shut_down_object_dispatcher(); + send_shut_down_image_dispatcher(); } template -void CloseRequest::send_shut_down_object_dispatcher() { +void CloseRequest::send_shut_down_image_dispatcher() { CephContext *cct = m_image_ctx->cct; ldout(cct, 10) << this << " " << __func__ << dendl; - m_image_ctx->io_object_dispatcher->shut_down(create_context_callback< + m_image_ctx->io_image_dispatcher->shut_down(create_context_callback< CloseRequest, - &CloseRequest::handle_shut_down_object_dispatcher>(this)); + &CloseRequest::handle_shut_down_image_dispatcher>(this)); } template -void CloseRequest::handle_shut_down_object_dispatcher(int r) { +void CloseRequest::handle_shut_down_image_dispatcher(int r) { CephContext *cct = m_image_ctx->cct; ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl; save_result(r); if (r < 0) { - lderr(cct) << "failed to shut down object dispatcher: " + lderr(cct) << "failed to shut down image dispatcher: " << cpp_strerror(r) << dendl; } - send_shut_down_image_dispatcher(); + + send_shut_down_object_dispatcher(); } template -void CloseRequest::send_shut_down_image_dispatcher() { +void CloseRequest::send_shut_down_object_dispatcher() { CephContext *cct = m_image_ctx->cct; ldout(cct, 10) << this << " " << __func__ << dendl; - m_image_ctx->io_image_dispatcher->shut_down(create_context_callback< + m_image_ctx->io_object_dispatcher->shut_down(create_context_callback< CloseRequest, - &CloseRequest::handle_shut_down_image_dispatcher>(this)); + &CloseRequest::handle_shut_down_object_dispatcher>(this)); } template -void CloseRequest::handle_shut_down_image_dispatcher(int r) { +void CloseRequest::handle_shut_down_object_dispatcher(int r) { CephContext *cct = m_image_ctx->cct; ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl; save_result(r); if (r < 0) { - lderr(cct) << "failed to shut down image dispatcher: " + lderr(cct) << "failed to shut down object dispatcher: " << cpp_strerror(r) << dendl; } + send_flush_op_work_queue(); } diff --git a/src/librbd/image/CloseRequest.h b/src/librbd/image/CloseRequest.h index 95c02bb5de97..7f9f0d52ea6e 100644 --- a/src/librbd/image/CloseRequest.h +++ b/src/librbd/image/CloseRequest.h @@ -33,10 +33,7 @@ private: * BLOCK_IMAGE_WATCHER (skip if R/O) * | * v - * SHUT_DOWN_UPDATE_WATCHERS - * | - * v - * SHUT_DOWN_AIO_WORK_QUEUE . . . + * SHUT_DOWN_UPDATE_WATCHERS . . * | . (exclusive lock disabled) * v v * SHUT_DOWN_EXCLUSIVE_LOCK FLUSH @@ -50,10 +47,10 @@ private: * FLUSH_READAHEAD * | * v - * SHUT_DOWN_OBJECT_DISPATCHER + * SHUT_DOWN_IMAGE_DISPATCHER * | * v - * SHUT_DOWN_IMAGE_DISPATCHER + * SHUT_DOWN_OBJECT_DISPATCHER * | * v * FLUSH_OP_WORK_QUEUE @@ -85,9 +82,6 @@ private: void send_shut_down_update_watchers(); void handle_shut_down_update_watchers(int r); - void send_shut_down_io_queue(); - void handle_shut_down_io_queue(int r); - void send_shut_down_exclusive_lock(); void handle_shut_down_exclusive_lock(int r); @@ -100,12 +94,12 @@ private: void send_flush_readahead(); void handle_flush_readahead(int r); - void send_shut_down_object_dispatcher(); - void handle_shut_down_object_dispatcher(int r); - void send_shut_down_image_dispatcher(); void handle_shut_down_image_dispatcher(int r); + void send_shut_down_object_dispatcher(); + void handle_shut_down_object_dispatcher(int r); + void send_flush_op_work_queue(); void handle_flush_op_work_queue(int r); diff --git a/src/librbd/io/ImageRequestWQ.cc b/src/librbd/io/ImageRequestWQ.cc deleted file mode 100644 index dbd558bdc403..000000000000 --- a/src/librbd/io/ImageRequestWQ.cc +++ /dev/null @@ -1,1012 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "librbd/io/ImageRequestWQ.h" -#include "common/errno.h" -#include "common/zipkin_trace.h" -#include "common/Cond.h" -#include "librbd/ExclusiveLock.h" -#include "librbd/ImageCtx.h" -#include "librbd/ImageState.h" -#include "librbd/ImageWatcher.h" -#include "librbd/internal.h" -#include "librbd/Utils.h" -#include "librbd/exclusive_lock/Policy.h" -#include "librbd/io/AioCompletion.h" -#include "librbd/io/ImageRequest.h" -#include "librbd/io/ImageDispatchSpec.h" -#include "librbd/io/ImageDispatcher.h" -#include "librbd/io/QosImageDispatch.h" -#include "common/EventTrace.h" - -#define dout_subsys ceph_subsys_rbd -#undef dout_prefix -#define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \ - << " " << __func__ << ": " - -namespace librbd { - -using util::create_context_callback; - -namespace io { - -template -struct ImageRequestWQ::C_AcquireLock : public Context { - ImageRequestWQ *work_queue; - ImageDispatchSpec *image_request; - - C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec *image_request) - : work_queue(work_queue), image_request(image_request) { - } - - void finish(int r) override { - work_queue->handle_acquire_lock(r, image_request); - } -}; - -template -struct ImageRequestWQ::C_BlockedWrites : public Context { - ImageRequestWQ *work_queue; - explicit C_BlockedWrites(ImageRequestWQ *_work_queue) - : work_queue(_work_queue) { - } - - void finish(int r) override { - work_queue->handle_blocked_writes(r); - } -}; - -template -struct ImageRequestWQ::C_RefreshFinish : public Context { - ImageRequestWQ *work_queue; - ImageDispatchSpec *image_request; - - C_RefreshFinish(ImageRequestWQ *work_queue, - ImageDispatchSpec *image_request) - : work_queue(work_queue), image_request(image_request) { - } - void finish(int r) override { - work_queue->handle_refreshed(r, image_request); - } -}; - -template -ImageRequestWQ::ImageRequestWQ(I *image_ctx, const string &name, - time_t ti, ThreadPool *tp) - : ThreadPool::PointerWQ >(name, ti, 0, tp), - m_image_ctx(*image_ctx), - m_lock(ceph::make_shared_mutex( - util::unique_lock_name("ImageRequestWQ::m_lock", this))) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 5) << "ictx=" << image_ctx << dendl; - - this->register_work_queue(); -} - -template -ssize_t ImageRequestWQ::read(uint64_t off, uint64_t len, - ReadResult &&read_result, int op_flags) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " - << "len = " << len << dendl; - - C_SaferCond cond; - AioCompletion *c = AioCompletion::create(&cond); - aio_read(c, off, len, std::move(read_result), op_flags, false); - return cond.wait(); -} - -template -ssize_t ImageRequestWQ::write(uint64_t off, uint64_t len, - bufferlist &&bl, int op_flags) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " - << "len = " << len << dendl; - - m_image_ctx.image_lock.lock_shared(); - int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); - m_image_ctx.image_lock.unlock_shared(); - if (r < 0) { - lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; - return r; - } - - C_SaferCond cond; - AioCompletion *c = AioCompletion::create(&cond); - aio_write(c, off, len, std::move(bl), op_flags, false); - - r = cond.wait(); - if (r < 0) { - return r; - } - return len; -} - -template -ssize_t ImageRequestWQ::discard(uint64_t off, uint64_t len, - uint32_t discard_granularity_bytes) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " - << "len = " << len << dendl; - - m_image_ctx.image_lock.lock_shared(); - int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); - m_image_ctx.image_lock.unlock_shared(); - if (r < 0) { - lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; - return r; - } - - C_SaferCond cond; - AioCompletion *c = AioCompletion::create(&cond); - aio_discard(c, off, len, discard_granularity_bytes, false); - - r = cond.wait(); - if (r < 0) { - return r; - } - return len; -} - -template -ssize_t ImageRequestWQ::writesame(uint64_t off, uint64_t len, - bufferlist &&bl, int op_flags) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " - << "len = " << len << ", data_len " << bl.length() << dendl; - - m_image_ctx.image_lock.lock_shared(); - int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); - m_image_ctx.image_lock.unlock_shared(); - if (r < 0) { - lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; - return r; - } - - C_SaferCond cond; - AioCompletion *c = AioCompletion::create(&cond); - aio_writesame(c, off, len, std::move(bl), op_flags, false); - - r = cond.wait(); - if (r < 0) { - return r; - } - return len; -} - -template -ssize_t ImageRequestWQ::compare_and_write(uint64_t off, uint64_t len, - bufferlist &&cmp_bl, - bufferlist &&bl, - uint64_t *mismatch_off, - int op_flags){ - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off=" - << off << ", " << "len = " << len << dendl; - - m_image_ctx.image_lock.lock_shared(); - int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); - m_image_ctx.image_lock.unlock_shared(); - if (r < 0) { - lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; - return r; - } - - C_SaferCond cond; - AioCompletion *c = AioCompletion::create(&cond); - aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl), - mismatch_off, op_flags, false); - - r = cond.wait(); - if (r < 0) { - return r; - } - - return len; -} - -template -int ImageRequestWQ::flush() { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; - - C_SaferCond cond; - AioCompletion *c = AioCompletion::create(&cond); - aio_flush(c, false); - - int r = cond.wait(); - if (r < 0) { - return r; - } - - return 0; -} - -template -void ImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len, - ReadResult &&read_result, int op_flags, - bool native_async) { - CephContext *cct = m_image_ctx.cct; - FUNCTRACE(cct); - ZTracer::Trace trace; - if (m_image_ctx.blkin_trace_all) { - trace.init("wq: read", &m_image_ctx.trace_endpoint); - trace.event("start"); - } - - c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ); - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " - << "completion=" << c << ", off=" << off << ", " - << "len=" << len << ", " << "flags=" << op_flags << dendl; - - if (native_async && m_image_ctx.event_socket.is_valid()) { - c->set_event_notify(true); - } - - if (!start_in_flight_io(c)) { - return; - } - - // if journaling is enabled -- we need to replay the journal because - // it might contain an uncommitted write - std::shared_lock owner_locker{m_image_ctx.owner_lock}; - if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() || - require_lock_on_read()) { - queue(ImageDispatchSpec::create_read( - m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, {{off, len}}, - std::move(read_result), op_flags, trace)); - } else { - c->start_op(); - ImageRequest::aio_read(&m_image_ctx, c, {{off, len}}, - std::move(read_result), op_flags, trace); - finish_in_flight_io(); - } - trace.event("finish"); -} - -template -void ImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len, - bufferlist &&bl, int op_flags, - bool native_async) { - CephContext *cct = m_image_ctx.cct; - FUNCTRACE(cct); - ZTracer::Trace trace; - if (m_image_ctx.blkin_trace_all) { - trace.init("wq: write", &m_image_ctx.trace_endpoint); - trace.event("init"); - } - - c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE); - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " - << "completion=" << c << ", off=" << off << ", " - << "len=" << len << ", flags=" << op_flags << dendl; - - if (native_async && m_image_ctx.event_socket.is_valid()) { - c->set_event_notify(true); - } - - if (!start_in_flight_io(c)) { - return; - } - - auto tid = ++m_last_tid; - - { - std::lock_guard locker{m_lock}; - m_queued_or_blocked_io_tids.insert(tid); - } - - ImageDispatchSpec *req = ImageDispatchSpec::create_write( - m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, {{off, len}}, std::move(bl), - op_flags, trace, tid); - - std::shared_lock owner_locker{m_image_ctx.owner_lock}; - queue(req); - trace.event("finish"); -} - -template -void ImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off, - uint64_t len, - uint32_t discard_granularity_bytes, - bool native_async) { - CephContext *cct = m_image_ctx.cct; - FUNCTRACE(cct); - ZTracer::Trace trace; - if (m_image_ctx.blkin_trace_all) { - trace.init("wq: discard", &m_image_ctx.trace_endpoint); - trace.event("init"); - } - - c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD); - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " - << "completion=" << c << ", off=" << off << ", len=" << len - << dendl; - - if (native_async && m_image_ctx.event_socket.is_valid()) { - c->set_event_notify(true); - } - - if (!start_in_flight_io(c)) { - return; - } - - auto tid = ++m_last_tid; - - { - std::lock_guard locker{m_lock}; - m_queued_or_blocked_io_tids.insert(tid); - } - - ImageDispatchSpec *req = ImageDispatchSpec::create_discard( - m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, off, len, - discard_granularity_bytes, trace, tid); - - std::shared_lock owner_locker{m_image_ctx.owner_lock}; - queue(req); - trace.event("finish"); -} - -template -void ImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) { - CephContext *cct = m_image_ctx.cct; - FUNCTRACE(cct); - ZTracer::Trace trace; - if (m_image_ctx.blkin_trace_all) { - trace.init("wq: flush", &m_image_ctx.trace_endpoint); - trace.event("init"); - } - - c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH); - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " - << "completion=" << c << dendl; - - if (native_async && m_image_ctx.event_socket.is_valid()) { - c->set_event_notify(true); - } - - if (!start_in_flight_io(c)) { - return; - } - - auto tid = ++m_last_tid; - - ImageDispatchSpec *req = ImageDispatchSpec::create_flush( - m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, FLUSH_SOURCE_USER, trace); - - { - std::lock_guard locker{m_lock}; - if(!m_queued_or_blocked_io_tids.empty()) { - ldout(cct, 20) << "queueing flush, tid: " << tid << dendl; - m_queued_flushes.emplace(tid, req); - --m_in_flight_ios; - return; - } - } - - std::shared_lock owner_locker{m_image_ctx.owner_lock}; - queue(req); - trace.event("finish"); -} - -template -void ImageRequestWQ::aio_writesame(AioCompletion *c, uint64_t off, - uint64_t len, bufferlist &&bl, - int op_flags, bool native_async) { - CephContext *cct = m_image_ctx.cct; - FUNCTRACE(cct); - ZTracer::Trace trace; - if (m_image_ctx.blkin_trace_all) { - trace.init("wq: writesame", &m_image_ctx.trace_endpoint); - trace.event("init"); - } - - c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME); - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " - << "completion=" << c << ", off=" << off << ", " - << "len=" << len << ", data_len = " << bl.length() << ", " - << "flags=" << op_flags << dendl; - - if (native_async && m_image_ctx.event_socket.is_valid()) { - c->set_event_notify(true); - } - - if (!start_in_flight_io(c)) { - return; - } - - auto tid = ++m_last_tid; - - { - std::lock_guard locker{m_lock}; - m_queued_or_blocked_io_tids.insert(tid); - } - - ImageDispatchSpec *req = ImageDispatchSpec::create_write_same( - m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, off, len, std::move(bl), - op_flags, trace, tid); - - std::shared_lock owner_locker{m_image_ctx.owner_lock}; - queue(req); - trace.event("finish"); -} - -template -void ImageRequestWQ::aio_compare_and_write(AioCompletion *c, - uint64_t off, uint64_t len, - bufferlist &&cmp_bl, - bufferlist &&bl, - uint64_t *mismatch_off, - int op_flags, bool native_async) { - CephContext *cct = m_image_ctx.cct; - FUNCTRACE(cct); - ZTracer::Trace trace; - if (m_image_ctx.blkin_trace_all) { - trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint); - trace.event("init"); - } - - c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE); - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " - << "completion=" << c << ", off=" << off << ", " - << "len=" << len << dendl; - - if (native_async && m_image_ctx.event_socket.is_valid()) { - c->set_event_notify(true); - } - - if (!start_in_flight_io(c)) { - return; - } - - auto tid = ++m_last_tid; - - { - std::lock_guard locker{m_lock}; - m_queued_or_blocked_io_tids.insert(tid); - } - - ImageDispatchSpec *req = ImageDispatchSpec::create_compare_and_write( - m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, {{off, len}}, - std::move(cmp_bl), std::move(bl), mismatch_off, op_flags, trace, tid); - - std::shared_lock owner_locker{m_image_ctx.owner_lock}; - queue(req); - trace.event("finish"); -} - -template -bool ImageRequestWQ::block_overlapping_io( - ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx - << "off: " << off << " len: " << len <empty() || - !in_flight_image_extents->intersects(off, len)) { - in_flight_image_extents->insert(off, len); - return false; - } - - return true; -} - -template -void ImageRequestWQ::unblock_overlapping_io(uint64_t offset, uint64_t length, - uint64_t tid) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; - - remove_in_flight_write_ios(offset, length, true, tid); - - std::unique_lock locker{m_lock}; - if (!m_blocked_ios.empty()) { - auto it = m_blocked_ios.begin(); - while (it != m_blocked_ios.end()) { - auto blocked_io = *it; - - const auto& extents = blocked_io->get_image_extents(); - uint64_t off = extents.size() ? extents.front().first : 0; - uint64_t len = extents.size() ? extents.front().second : 0; - - if (block_overlapping_io(&m_in_flight_extents, off, len)) { - break; - } - ldout(cct, 20) << "unblocking off: " << off << ", " - << "len: " << len << dendl; - AioCompletion *aio_comp = blocked_io->get_aio_completion(); - - m_blocked_ios.erase(it); - locker.unlock(); - queue_unblocked_io(aio_comp, blocked_io); - locker.lock(); - } - } -} - -template -void ImageRequestWQ::unblock_flushes() { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; - std::unique_lock locker{m_lock}; - auto io_tid_it = m_queued_or_blocked_io_tids.begin(); - while (true) { - auto it = m_queued_flushes.begin(); - if (it == m_queued_flushes.end() || - (io_tid_it != m_queued_or_blocked_io_tids.end() && - *io_tid_it < it->first)) { - break; - } - - auto blocked_flush = *it; - ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl; - - AioCompletion *aio_comp = blocked_flush.second->get_aio_completion(); - - m_queued_flushes.erase(it); - locker.unlock(); - queue_unblocked_io(aio_comp, blocked_flush.second); - locker.lock(); - } -} - -template -void ImageRequestWQ::queue_unblocked_io(AioCompletion *comp, - ImageDispatchSpec *req) { - if (!start_in_flight_io(comp)) { - return; - } - - std::shared_lock owner_locker{m_image_ctx.owner_lock}; - queue(req); -} - -template -void ImageRequestWQ::shut_down(Context *on_shutdown) { - ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); - - { - std::unique_lock locker{m_lock}; - ceph_assert(!m_shutdown); - m_shutdown = true; - - CephContext *cct = m_image_ctx.cct; - ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load() - << dendl; - if (m_in_flight_ios > 0) { - m_on_shutdown = on_shutdown; - return; - } - } - - m_image_ctx.op_work_queue->queue(on_shutdown, 0); -} - -template -void ImageRequestWQ::block_writes(Context *on_blocked) { - ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); - CephContext *cct = m_image_ctx.cct; - - { - std::unique_lock locker{m_lock}; - ++m_write_blockers; - ldout(cct, 5) << &m_image_ctx << ", " << "num=" - << m_write_blockers << dendl; - if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) { - m_write_blocker_contexts.push_back(on_blocked); - return; - } - } - - m_image_ctx.op_work_queue->queue(on_blocked, 0); -} - -template -void ImageRequestWQ::unblock_writes() { - CephContext *cct = m_image_ctx.cct; - - bool wake_up = false; - Contexts waiter_contexts; - { - std::unique_lock locker{m_lock}; - ceph_assert(m_write_blockers > 0); - --m_write_blockers; - - ldout(cct, 5) << &m_image_ctx << ", " << "num=" - << m_write_blockers << dendl; - if (m_write_blockers == 0) { - wake_up = true; - std::swap(waiter_contexts, m_unblocked_write_waiter_contexts); - } - } - - if (wake_up) { - for (auto ctx : waiter_contexts) { - ctx->complete(0); - } - this->signal(); - } -} - -template -void ImageRequestWQ::wait_on_writes_unblocked(Context *on_unblocked) { - ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); - CephContext *cct = m_image_ctx.cct; - - { - std::unique_lock locker{m_lock}; - ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers=" - << m_write_blockers << dendl; - if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) { - m_unblocked_write_waiter_contexts.push_back(on_unblocked); - return; - } - } - - on_unblocked->complete(0); -} - -template -void ImageRequestWQ::set_require_lock(Direction direction, bool enabled) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << dendl; - - bool wake_up = false; - { - std::unique_lock locker{m_lock}; - switch (direction) { - case DIRECTION_READ: - wake_up = (enabled != m_require_lock_on_read); - m_require_lock_on_read = enabled; - break; - case DIRECTION_WRITE: - wake_up = (enabled != m_require_lock_on_write); - m_require_lock_on_write = enabled; - break; - case DIRECTION_BOTH: - wake_up = (enabled != m_require_lock_on_read || - enabled != m_require_lock_on_write); - m_require_lock_on_read = enabled; - m_require_lock_on_write = enabled; - break; - } - } - - // wake up the thread pool whenever the state changes so that - // we can re-request the lock if required - if (wake_up) { - this->signal(); - } -} - -template -void *ImageRequestWQ::_void_dequeue() { - CephContext *cct = m_image_ctx.cct; - ImageDispatchSpec *peek_item = this->front(); - - // no queued IO requests or all IO is blocked/stalled - if (peek_item == nullptr || m_io_blockers.load() > 0) { - return nullptr; - } - - bool lock_required; - bool refresh_required = m_image_ctx.state->is_refresh_required(); - { - std::shared_lock locker{m_lock}; - bool write_op = peek_item->is_write_op(); - lock_required = is_lock_required(write_op); - if (write_op) { - if (!lock_required && m_write_blockers > 0) { - // missing lock is not the write blocker - return nullptr; - } - - if (!lock_required && !refresh_required && !peek_item->blocked) { - // completed ops will requeue the IO -- don't count it as in-progress - m_in_flight_writes++; - } - } - } - - auto item = reinterpret_cast *>( - ThreadPool::PointerWQ >::_void_dequeue()); - ceph_assert(peek_item == item); - - if (lock_required) { - this->get_pool_lock().unlock(); - m_image_ctx.owner_lock.lock_shared(); - if (m_image_ctx.exclusive_lock != nullptr) { - ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl; - if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) { - lderr(cct) << "op requires exclusive lock" << dendl; - fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(), - item); - - // wake up the IO since we won't be returning a request to process - this->signal(); - } else { - // stall IO until the acquire completes - ++m_io_blockers; - Context *ctx = new C_AcquireLock(this, item); - ctx = create_context_callback< - Context, &Context::complete>( - ctx, m_image_ctx.exclusive_lock); - m_image_ctx.exclusive_lock->acquire_lock(ctx); - } - } else { - // raced with the exclusive lock being disabled - lock_required = false; - } - m_image_ctx.owner_lock.unlock_shared(); - this->get_pool_lock().lock(); - - if (lock_required) { - return nullptr; - } - } - - if (refresh_required) { - ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl; - - // stall IO until the refresh completes - ++m_io_blockers; - - this->get_pool_lock().unlock(); - m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); - this->get_pool_lock().lock(); - return nullptr; - } - - return item; -} - -template -void ImageRequestWQ::process_io(ImageDispatchSpec *req, - bool non_blocking_io) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " - << "req=" << req << dendl; - - //extents are invalidated after the request is sent - //so gather them ahead of that - const auto& extents = req->get_image_extents(); - bool write_op = req->is_write_op(); - uint64_t tid = req->get_tid(); - uint64_t offset = extents.size() ? extents.front().first : 0; - uint64_t length = extents.size() ? extents.front().second : 0; - - if (write_op && !req->blocked) { - std::lock_guard locker{m_lock}; - bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length); - if (blocked) { - ldout(cct, 20) << "blocking overlapping IO: " << "ictx=" - << &m_image_ctx << ", " - << "off=" << offset << ", len=" << length << dendl; - req->blocked = true; - m_blocked_ios.push_back(req); - return; - } - } - - req->start_op(); - req->send(); - - if (write_op) { - if (non_blocking_io) { - finish_in_flight_write(); - } - unblock_overlapping_io(offset, length, tid); - unblock_flushes(); - } -} - -template -void ImageRequestWQ::process(ImageDispatchSpec *req) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " - << "req=" << req << dendl; - - bool write_op = req->is_write_op(); - - process_io(req, true); - - finish_queued_io(write_op); - finish_in_flight_io(); -} - -template -void ImageRequestWQ::remove_in_flight_write_ios(uint64_t offset, uint64_t length, - bool write_op, uint64_t tid) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; - { - std::lock_guard locker{m_lock}; - if (write_op) { - if (length > 0) { - if(!m_in_flight_extents.empty()) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "erasing in flight extents with tid:" - << tid << ", offset: " << offset << dendl; - ImageExtentIntervals extents; - extents.insert(offset, length); - ImageExtentIntervals intersect; - intersect.intersection_of(extents, m_in_flight_extents); - m_in_flight_extents.subtract(intersect); - } - } - m_queued_or_blocked_io_tids.erase(tid); - } - } -} - -template -void ImageRequestWQ::finish_queued_io(bool write_op) { - std::shared_lock locker{m_lock}; - if (write_op) { - ceph_assert(m_queued_writes > 0); - m_queued_writes--; - } else { - ceph_assert(m_queued_reads > 0); - m_queued_reads--; - } -} - -template -void ImageRequestWQ::finish_in_flight_write() { - bool writes_blocked = false; - { - std::shared_lock locker{m_lock}; - ceph_assert(m_in_flight_writes > 0); - if (--m_in_flight_writes == 0 && - !m_write_blocker_contexts.empty()) { - writes_blocked = true; - } - } - - if (writes_blocked) { - m_image_ctx.op_work_queue->queue(create_context_callback< - ImageRequestWQ, &ImageRequestWQ::handle_blocked_writes>(this), 0); - } -} - -template -int ImageRequestWQ::start_in_flight_io(AioCompletion *c) { - std::shared_lock locker{m_lock}; - - if (m_shutdown) { - CephContext *cct = m_image_ctx.cct; - lderr(cct) << "IO received on closed image" << dendl; - - c->fail(-ESHUTDOWN); - return false; - } - - if (!m_image_ctx.data_ctx.is_valid()) { - CephContext *cct = m_image_ctx.cct; - lderr(cct) << "missing data pool" << dendl; - - c->fail(-ENODEV); - return false; - } - - m_in_flight_ios++; - return true; -} - -template -void ImageRequestWQ::finish_in_flight_io() { - Context *on_shutdown; - { - std::shared_lock locker{m_lock}; - if (--m_in_flight_ios > 0 || !m_shutdown) { - return; - } - on_shutdown = m_on_shutdown; - } - - CephContext *cct = m_image_ctx.cct; - ldout(cct, 5) << "completing shut down" << dendl; - - on_shutdown->complete(0); -} - -template -void ImageRequestWQ::fail_in_flight_io( - int r, ImageDispatchSpec *req) { - this->process_finish(); - req->fail(r); - - bool write_op = req->is_write_op(); - uint64_t tid = req->get_tid(); - const auto& extents = req->get_image_extents(); - uint64_t offset = extents.size() ? extents.front().first : 0; - uint64_t length = extents.size() ? extents.front().second : 0; - - finish_queued_io(write_op); - remove_in_flight_write_ios(offset, length, write_op, tid); - finish_in_flight_io(); -} - -template -bool ImageRequestWQ::is_lock_required(bool write_op) const { - ceph_assert(ceph_mutex_is_locked(m_lock)); - return ((write_op && m_require_lock_on_write) || - (!write_op && m_require_lock_on_read)); -} - -template -void ImageRequestWQ::queue(ImageDispatchSpec *req) { - ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); - - CephContext *cct = m_image_ctx.cct; - ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " - << "req=" << req << dendl; - - if (req->is_write_op()) { - m_queued_writes++; - } else { - m_queued_reads++; - } - - ThreadPool::PointerWQ >::queue(req); -} - -template -void ImageRequestWQ::handle_acquire_lock( - int r, ImageDispatchSpec *req) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl; - - if (r < 0) { - fail_in_flight_io(r, req); - } else { - // since IO was stalled for acquire -- original IO order is preserved - // if we requeue this op for work queue processing - this->requeue_front(req); - } - - ceph_assert(m_io_blockers.load() > 0); - --m_io_blockers; - this->signal(); -} - -template -void ImageRequestWQ::handle_refreshed( - int r, ImageDispatchSpec *req) { - CephContext *cct = m_image_ctx.cct; - ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", " - << "req=" << req << dendl; - if (r < 0) { - fail_in_flight_io(r, req); - } else { - // since IO was stalled for refresh -- original IO order is preserved - // if we requeue this op for work queue processing - this->requeue_front(req); - } - - ceph_assert(m_io_blockers.load() > 0); - --m_io_blockers; - this->signal(); -} - -template -void ImageRequestWQ::handle_blocked_writes(int r) { - Contexts contexts; - { - std::unique_lock locker{m_lock}; - contexts.swap(m_write_blocker_contexts); - } - - for (auto ctx : contexts) { - ctx->complete(0); - } -} - -template class librbd::io::ImageRequestWQ; - -} // namespace io -} // namespace librbd diff --git a/src/librbd/io/ImageRequestWQ.h b/src/librbd/io/ImageRequestWQ.h deleted file mode 100644 index 1783d3405491..000000000000 --- a/src/librbd/io/ImageRequestWQ.h +++ /dev/null @@ -1,149 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H -#define CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H - -#include "include/Context.h" -#include "common/ceph_mutex.h" -#include "common/WorkQueue.h" -#include "librbd/io/Types.h" -#include "include/interval_set.h" -#include -#include -#include - -namespace librbd { - -class ImageCtx; - -namespace io { - -class AioCompletion; -template class ImageDispatchSpec; -class ReadResult; - -template -class ImageRequestWQ - : public ThreadPool::PointerWQ > { -public: - ImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti, - ThreadPool *tp); - - ssize_t read(uint64_t off, uint64_t len, ReadResult &&read_result, - int op_flags); - ssize_t write(uint64_t off, uint64_t len, bufferlist &&bl, int op_flags); - ssize_t discard(uint64_t off, uint64_t len, - uint32_t discard_granularity_bytes); - ssize_t writesame(uint64_t off, uint64_t len, bufferlist &&bl, int op_flags); - ssize_t compare_and_write(uint64_t off, uint64_t len, - bufferlist &&cmp_bl, bufferlist &&bl, - uint64_t *mismatch_off, int op_flags); - int flush(); - - void aio_read(AioCompletion *c, uint64_t off, uint64_t len, - ReadResult &&read_result, int op_flags, bool native_async=true); - void aio_write(AioCompletion *c, uint64_t off, uint64_t len, - bufferlist &&bl, int op_flags, bool native_async=true); - void aio_discard(AioCompletion *c, uint64_t off, uint64_t len, - uint32_t discard_granularity_bytes, bool native_async=true); - void aio_flush(AioCompletion *c, bool native_async=true); - void aio_writesame(AioCompletion *c, uint64_t off, uint64_t len, - bufferlist &&bl, int op_flags, bool native_async=true); - void aio_compare_and_write(AioCompletion *c, uint64_t off, - uint64_t len, bufferlist &&cmp_bl, - bufferlist &&bl, uint64_t *mismatch_off, - int op_flags, bool native_async=true); - - using ThreadPool::PointerWQ >::drain; - using ThreadPool::PointerWQ >::empty; - - void shut_down(Context *on_shutdown); - - inline bool writes_blocked() const { - std::shared_lock locker{m_lock}; - return (m_write_blockers > 0); - } - - void block_writes(Context *on_blocked); - void unblock_writes(); - - void wait_on_writes_unblocked(Context *on_unblocked); - - void set_require_lock(Direction direction, bool enabled); - -protected: - void *_void_dequeue() override; - void process(ImageDispatchSpec *req) override; - -private: - typedef std::list Contexts; - - struct C_AcquireLock; - struct C_BlockedWrites; - struct C_RefreshFinish; - - ImageCtxT &m_image_ctx; - mutable ceph::shared_mutex m_lock; - Contexts m_write_blocker_contexts; - uint32_t m_write_blockers = 0; - Contexts m_unblocked_write_waiter_contexts; - bool m_require_lock_on_read = false; - bool m_require_lock_on_write = false; - std::atomic m_queued_reads { 0 }; - std::atomic m_queued_writes { 0 }; - std::atomic m_in_flight_ios { 0 }; - std::atomic m_in_flight_writes { 0 }; - std::atomic m_io_blockers { 0 }; - - typedef interval_set ImageExtentIntervals; - ImageExtentIntervals m_in_flight_extents; - - std::vector*> m_blocked_ios; - std::atomic m_last_tid { 0 }; - std::set m_queued_or_blocked_io_tids; - std::map*> m_queued_flushes; - - bool m_shutdown = false; - Context *m_on_shutdown = nullptr; - - bool is_lock_required(bool write_op) const; - - inline bool require_lock_on_read() const { - std::shared_lock locker{m_lock}; - return m_require_lock_on_read; - } - inline bool writes_empty() const { - std::shared_lock locker{m_lock}; - return (m_queued_writes == 0); - } - - void finish_queued_io(bool write_op); - void remove_in_flight_write_ios(uint64_t offset, uint64_t length, - bool write_op, uint64_t tid); - void finish_in_flight_write(); - - void unblock_flushes(); - bool block_overlapping_io(ImageExtentIntervals* in_flight_image_extents, - uint64_t object_off, uint64_t object_len); - void unblock_overlapping_io(uint64_t offset, uint64_t length, uint64_t tid); - int start_in_flight_io(AioCompletion *c); - void finish_in_flight_io(); - void fail_in_flight_io(int r, ImageDispatchSpec *req); - void process_io(ImageDispatchSpec *req, bool non_blocking_io); - - void queue(ImageDispatchSpec *req); - void queue_unblocked_io(AioCompletion *comp, - ImageDispatchSpec *req); - - void handle_acquire_lock(int r, ImageDispatchSpec *req); - void handle_refreshed(int r, ImageDispatchSpec *req); - void handle_blocked_writes(int r); -}; - -} // namespace io -} // namespace librbd - -extern template class librbd::io::ImageRequestWQ; - -#endif // CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H diff --git a/src/librbd/io/QueueImageDispatch.cc b/src/librbd/io/QueueImageDispatch.cc index 1b370d08b150..93bb56a372c9 100644 --- a/src/librbd/io/QueueImageDispatch.cc +++ b/src/librbd/io/QueueImageDispatch.cc @@ -4,11 +4,11 @@ #include "librbd/io/QueueImageDispatch.h" #include "common/dout.h" #include "common/Cond.h" +#include "common/WorkQueue.h" #include "librbd/ImageCtx.h" #include "librbd/Utils.h" #include "librbd/io/AioCompletion.h" #include "librbd/io/ImageDispatchSpec.h" -#include "librbd/io/ImageRequestWQ.h" #define dout_subsys ceph_subsys_rbd #undef dout_prefix @@ -49,10 +49,6 @@ void QueueImageDispatch::block_writes(Context *on_blocked) { on_blocked = util::create_async_context_callback( *m_image_ctx, on_blocked); - // TODO temp - auto ctx = new C_Gather(cct, on_blocked); - m_image_ctx->io_work_queue->block_writes(ctx->new_sub()); - { std::unique_lock locker{m_lock}; ++m_write_blockers; @@ -61,15 +57,13 @@ void QueueImageDispatch::block_writes(Context *on_blocked) { if (!m_write_blocker_contexts.empty() || !m_in_flight_write_tids.empty()) { ldout(cct, 5) << "waiting for in-flight writes to complete: " << "write_tids=" << m_in_flight_write_tids << dendl; - m_write_blocker_contexts.push_back(ctx->new_sub()); - ctx->activate(); + m_write_blocker_contexts.push_back(on_blocked); return; } } // ensure that all in-flight IO is flushed - flush_image(ctx->new_sub()); - ctx->activate(); + flush_image(on_blocked); }; template @@ -98,9 +92,6 @@ void QueueImageDispatch::unblock_writes() { for (auto ctx : dispatch_contexts) { ctx->complete(0); } - - // TODO temp - m_image_ctx->io_work_queue->unblock_writes(); } template @@ -108,22 +99,17 @@ void QueueImageDispatch::wait_on_writes_unblocked(Context *on_unblocked) { ceph_assert(ceph_mutex_is_locked(m_image_ctx->owner_lock)); auto cct = m_image_ctx->cct; - // TODO temp - auto ctx = new C_Gather(cct, on_unblocked); - m_image_ctx->io_work_queue->wait_on_writes_unblocked(ctx->new_sub()); - { std::unique_lock locker{m_lock}; ldout(cct, 20) << m_image_ctx << ", " << "write_blockers=" << m_write_blockers << dendl; if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) { - m_unblocked_write_waiter_contexts.push_back(ctx->new_sub()); - ctx->activate(); + m_unblocked_write_waiter_contexts.push_back(on_unblocked); return; } } - ctx->activate(); + on_unblocked->complete(0); } template diff --git a/src/test/librbd/CMakeLists.txt b/src/test/librbd/CMakeLists.txt index c219660200df..6a2e76fd1e1d 100644 --- a/src/test/librbd/CMakeLists.txt +++ b/src/test/librbd/CMakeLists.txt @@ -73,7 +73,6 @@ set(unittest_librbd_srcs image/test_mock_ValidatePoolRequest.cc io/test_mock_CopyupRequest.cc io/test_mock_ImageRequest.cc - io/test_mock_ImageRequestWQ.cc io/test_mock_ObjectRequest.cc io/test_mock_SimpleSchedulerObjectDispatch.cc journal/test_mock_OpenRequest.cc diff --git a/src/test/librbd/io/test_mock_ImageRequestWQ.cc b/src/test/librbd/io/test_mock_ImageRequestWQ.cc deleted file mode 100644 index 67b98432bc5e..000000000000 --- a/src/test/librbd/io/test_mock_ImageRequestWQ.cc +++ /dev/null @@ -1,380 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "test/librbd/test_mock_fixture.h" -#include "test/librbd/test_support.h" -#include "test/librbd/mock/MockImageCtx.h" -#include "test/librbd/mock/exclusive_lock/MockPolicy.h" -#include "test/librbd/mock/io/MockQosImageDispatch.h" -#include "librbd/io/ImageDispatchSpec.h" -#include "librbd/io/ImageRequestWQ.h" -#include "librbd/io/ImageRequest.h" - -namespace librbd { -namespace { - -struct MockTestImageCtx : public MockImageCtx { - MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) { - } -}; - -} // anonymous namespace - -namespace io { - -template <> -struct ImageRequest { - static ImageRequest* s_instance; - AioCompletion *aio_comp = nullptr; - - static void aio_write(librbd::MockTestImageCtx *ictx, AioCompletion *c, - Extents &&image_extents, bufferlist &&bl, int op_flags, - const ZTracer::Trace &parent_trace) { - } - - ImageRequest() { - s_instance = this; - } -}; - -template <> -struct ImageDispatchSpec { - static ImageDispatchSpec* s_instance; - AioCompletion *aio_comp = nullptr; - bool blocked = false; - - std::atomic image_dispatch_flags = {0}; - - static ImageDispatchSpec* create_write( - librbd::MockTestImageCtx &image_ctx, ImageDispatchLayer dispatch_layer, - AioCompletion *aio_comp, Extents &&image_extents, bufferlist &&bl, - int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) { - ceph_assert(s_instance != nullptr); - s_instance->aio_comp = aio_comp; - return s_instance; - } - - static ImageDispatchSpec* create_flush( - librbd::MockTestImageCtx &image_ctx, ImageDispatchLayer dispatch_layer, - AioCompletion *aio_comp, FlushSource flush_source, - const ZTracer::Trace &parent_trace) { - ceph_assert(s_instance != nullptr); - s_instance->aio_comp = aio_comp; - return s_instance; - } - - MOCK_CONST_METHOD0(is_write_op, bool()); - MOCK_CONST_METHOD0(start_op, void()); - MOCK_CONST_METHOD0(send, void()); - MOCK_CONST_METHOD1(fail, void(int)); - MOCK_CONST_METHOD0(get_image_extents, Extents()); - MOCK_CONST_METHOD0(get_aio_completion, AioCompletion*()); - MOCK_CONST_METHOD0(get_tid, uint64_t()); - - ImageDispatchSpec() { - s_instance = this; - } -}; -} // namespace io - -namespace util { - -inline ImageCtx *get_image_ctx(MockTestImageCtx *image_ctx) { - return image_ctx->image_ctx; -} - -} // namespace util - -} // namespace librbd - -template <> -struct ThreadPool::PointerWQ> { - typedef librbd::io::ImageDispatchSpec ImageDispatchSpec; - static PointerWQ* s_instance; - - ceph::mutex m_lock; - - PointerWQ(const std::string &name, time_t, int, ThreadPool *) - : m_lock(ceph::make_mutex(name)) { - s_instance = this; - } - virtual ~PointerWQ() { - } - - MOCK_METHOD0(drain, void()); - MOCK_METHOD0(empty, bool()); - MOCK_METHOD0(mock_empty, bool()); - MOCK_METHOD0(signal, void()); - MOCK_METHOD0(process_finish, void()); - - MOCK_METHOD0(front, ImageDispatchSpec*()); - MOCK_METHOD1(requeue_front, void(ImageDispatchSpec*)); - MOCK_METHOD1(requeue_back, void(ImageDispatchSpec*)); - - MOCK_METHOD0(dequeue, void*()); - MOCK_METHOD1(queue, void(ImageDispatchSpec*)); - - void register_work_queue() { - // no-op - } - ceph::mutex &get_pool_lock() { - return m_lock; - } - - void* invoke_dequeue() { - std::lock_guard locker{m_lock}; - return _void_dequeue(); - } - void invoke_process(ImageDispatchSpec *image_request) { - process(image_request); - } - - virtual void *_void_dequeue() { - return dequeue(); - } - virtual void process(ImageDispatchSpec *req) = 0; - virtual bool _empty() { - return mock_empty(); - } - -}; - -ThreadPool::PointerWQ>* - ThreadPool::PointerWQ>::s_instance = nullptr; -librbd::io::ImageRequest* - librbd::io::ImageRequest::s_instance = nullptr; -librbd::io::ImageDispatchSpec* - librbd::io::ImageDispatchSpec::s_instance = nullptr; - -#include "librbd/io/ImageRequestWQ.cc" - -namespace librbd { -namespace io { - -using ::testing::_; -using ::testing::InSequence; -using ::testing::Invoke; -using ::testing::Return; -using ::testing::WithArg; - -struct TestMockIoImageRequestWQ : public TestMockFixture { - typedef ImageRequestWQ MockImageRequestWQ; - typedef ImageRequest MockImageRequest; - typedef ImageDispatchSpec MockImageDispatchSpec; - - void expect_is_write_op(MockImageDispatchSpec &image_request, - bool write_op) { - EXPECT_CALL(image_request, is_write_op()).WillOnce(Return(write_op)); - } - - void expect_signal(MockImageRequestWQ &image_request_wq) { - EXPECT_CALL(image_request_wq, signal()); - } - - void expect_queue(MockImageRequestWQ &image_request_wq) { - EXPECT_CALL(image_request_wq, queue(_)); - } - - void expect_requeue_back(MockImageRequestWQ &image_request_wq) { - EXPECT_CALL(image_request_wq, requeue_back(_)); - } - - void expect_front(MockImageRequestWQ &image_request_wq, - MockImageDispatchSpec& image_request) { - EXPECT_CALL(image_request_wq, front()).WillOnce(Return(&image_request)); - } - - void expect_is_refresh_request(MockTestImageCtx &mock_image_ctx, - bool required) { - EXPECT_CALL(*mock_image_ctx.state, is_refresh_required()).WillOnce( - Return(required)); - } - - void expect_dequeue(MockImageRequestWQ &image_request_wq, - MockImageDispatchSpec &image_request) { - EXPECT_CALL(image_request_wq, dequeue()).WillOnce(Return(&image_request)); - } - - void expect_get_exclusive_lock_policy(MockTestImageCtx &mock_image_ctx, - librbd::exclusive_lock::MockPolicy &policy) { - EXPECT_CALL(mock_image_ctx, - get_exclusive_lock_policy()).WillOnce(Return(&policy)); - } - - void expect_may_auto_request_lock(librbd::exclusive_lock::MockPolicy &policy, - bool value) { - EXPECT_CALL(policy, may_auto_request_lock()).WillOnce(Return(value)); - } - - void expect_acquire_lock(MockExclusiveLock &mock_exclusive_lock, - Context **on_finish) { - EXPECT_CALL(mock_exclusive_lock, acquire_lock(_)) - .WillOnce(Invoke([on_finish](Context *ctx) { - *on_finish = ctx; - })); - } - - void expect_process_finish(MockImageRequestWQ &mock_image_request_wq) { - EXPECT_CALL(mock_image_request_wq, process_finish()).Times(1); - } - - void expect_fail(MockImageDispatchSpec &mock_image_request, int r) { - EXPECT_CALL(mock_image_request, fail(r)) - .WillOnce(Invoke([&mock_image_request](int r) { - mock_image_request.aio_comp->fail(r); - })); - } - - void expect_refresh(MockTestImageCtx &mock_image_ctx, Context **on_finish) { - EXPECT_CALL(*mock_image_ctx.state, refresh(_)) - .WillOnce(Invoke([on_finish](Context *ctx) { - *on_finish = ctx; - })); - } - - void expect_start_op(MockImageDispatchSpec &mock_image_request) { - EXPECT_CALL(mock_image_request, start_op()).Times(1); - } - - void expect_get_image_extents(MockImageDispatchSpec &mock_image_request, - const Extents &extents) { - EXPECT_CALL(mock_image_request, get_image_extents()) - .WillRepeatedly(Return(extents)); - } - - void expect_get_tid(MockImageDispatchSpec &mock_image_request, uint64_t tid) { - EXPECT_CALL(mock_image_request, get_tid()).WillOnce(Return(tid)); - } -}; - -TEST_F(TestMockIoImageRequestWQ, AcquireLockError) { - REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK); - - librbd::ImageCtx *ictx; - ASSERT_EQ(0, open_image(m_image_name, &ictx)); - - MockTestImageCtx mock_image_ctx(*ictx); - MockExclusiveLock mock_exclusive_lock; - mock_image_ctx.exclusive_lock = &mock_exclusive_lock; - - MockImageDispatchSpec mock_queued_image_request; - expect_get_image_extents(mock_queued_image_request, {}); - expect_get_tid(mock_queued_image_request, 0); - - InSequence seq; - MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr); - expect_signal(mock_image_request_wq); - mock_image_request_wq.set_require_lock(DIRECTION_WRITE, true); - - expect_is_write_op(mock_queued_image_request, true); - expect_queue(mock_image_request_wq); - auto *aio_comp = new librbd::io::AioCompletion(); - mock_image_request_wq.aio_write(aio_comp, 0, 0, {}, 0); - - librbd::exclusive_lock::MockPolicy mock_exclusive_lock_policy; - expect_front(mock_image_request_wq, mock_queued_image_request); - expect_is_refresh_request(mock_image_ctx, false); - expect_is_write_op(mock_queued_image_request, true); - expect_dequeue(mock_image_request_wq, mock_queued_image_request); - expect_get_exclusive_lock_policy(mock_image_ctx, mock_exclusive_lock_policy); - expect_may_auto_request_lock(mock_exclusive_lock_policy, true); - Context *on_acquire = nullptr; - expect_acquire_lock(mock_exclusive_lock, &on_acquire); - ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr); - ASSERT_TRUE(on_acquire != nullptr); - - expect_process_finish(mock_image_request_wq); - expect_fail(mock_queued_image_request, -EPERM); - expect_is_write_op(mock_queued_image_request, true); - expect_signal(mock_image_request_wq); - on_acquire->complete(-EPERM); - - ASSERT_EQ(0, aio_comp->wait_for_complete()); - ASSERT_EQ(-EPERM, aio_comp->get_return_value()); - aio_comp->release(); -} - -TEST_F(TestMockIoImageRequestWQ, AcquireLockBlacklisted) { - REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK); - - librbd::ImageCtx *ictx; - ASSERT_EQ(0, open_image(m_image_name, &ictx)); - - MockTestImageCtx mock_image_ctx(*ictx); - MockExclusiveLock mock_exclusive_lock; - mock_image_ctx.exclusive_lock = &mock_exclusive_lock; - - MockImageDispatchSpec mock_queued_image_request; - expect_get_image_extents(mock_queued_image_request, {}); - expect_get_tid(mock_queued_image_request, 0); - - InSequence seq; - MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr); - expect_signal(mock_image_request_wq); - mock_image_request_wq.set_require_lock(DIRECTION_WRITE, true); - - expect_is_write_op(mock_queued_image_request, true); - expect_queue(mock_image_request_wq); - auto *aio_comp = new librbd::io::AioCompletion(); - mock_image_request_wq.aio_write(aio_comp, 0, 0, {}, 0); - - librbd::exclusive_lock::MockPolicy mock_exclusive_lock_policy; - expect_front(mock_image_request_wq, mock_queued_image_request); - expect_is_refresh_request(mock_image_ctx, false); - expect_is_write_op(mock_queued_image_request, true); - expect_dequeue(mock_image_request_wq, mock_queued_image_request); - expect_get_exclusive_lock_policy(mock_image_ctx, mock_exclusive_lock_policy); - expect_may_auto_request_lock(mock_exclusive_lock_policy, false); - EXPECT_CALL(*mock_image_ctx.exclusive_lock, get_unlocked_op_error()) - .WillOnce(Return(-EBLACKLISTED)); - expect_process_finish(mock_image_request_wq); - expect_fail(mock_queued_image_request, -EBLACKLISTED); - expect_is_write_op(mock_queued_image_request, true); - expect_signal(mock_image_request_wq); - ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr); - - ASSERT_EQ(0, aio_comp->wait_for_complete()); - ASSERT_EQ(-EBLACKLISTED, aio_comp->get_return_value()); - aio_comp->release(); -} - -TEST_F(TestMockIoImageRequestWQ, RefreshError) { - librbd::ImageCtx *ictx; - ASSERT_EQ(0, open_image(m_image_name, &ictx)); - - MockTestImageCtx mock_image_ctx(*ictx); - - MockImageDispatchSpec mock_queued_image_request; - expect_get_image_extents(mock_queued_image_request, {}); - expect_get_tid(mock_queued_image_request, 0); - - InSequence seq; - MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr); - - expect_is_write_op(mock_queued_image_request, true); - expect_queue(mock_image_request_wq); - auto *aio_comp = new librbd::io::AioCompletion(); - mock_image_request_wq.aio_write(aio_comp, 0, 0, {}, 0); - - expect_front(mock_image_request_wq, mock_queued_image_request); - expect_is_refresh_request(mock_image_ctx, true); - expect_is_write_op(mock_queued_image_request, true); - expect_dequeue(mock_image_request_wq, mock_queued_image_request); - Context *on_refresh = nullptr; - expect_refresh(mock_image_ctx, &on_refresh); - ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr); - ASSERT_TRUE(on_refresh != nullptr); - - expect_process_finish(mock_image_request_wq); - expect_fail(mock_queued_image_request, -EPERM); - expect_is_write_op(mock_queued_image_request, true); - expect_signal(mock_image_request_wq); - on_refresh->complete(-EPERM); - - ASSERT_EQ(0, aio_comp->wait_for_complete()); - ASSERT_EQ(-EPERM, aio_comp->get_return_value()); - aio_comp->release(); -} - -} // namespace io -} // namespace librbd diff --git a/src/test/librbd/mock/MockImageCtx.h b/src/test/librbd/mock/MockImageCtx.h index 1a35b2933f17..b86a87cb13ac 100644 --- a/src/test/librbd/mock/MockImageCtx.h +++ b/src/test/librbd/mock/MockImageCtx.h @@ -14,7 +14,6 @@ #include "test/librbd/mock/MockOperations.h" #include "test/librbd/mock/MockReadahead.h" #include "test/librbd/mock/io/MockImageDispatcher.h" -#include "test/librbd/mock/io/MockImageRequestWQ.h" #include "test/librbd/mock/io/MockObjectDispatcher.h" #include "common/RWLock.h" #include "common/WorkQueue.h" @@ -84,7 +83,6 @@ struct MockImageCtx { format_string(image_ctx.format_string), group_spec(image_ctx.group_spec), layout(image_ctx.layout), - io_work_queue(new io::MockImageRequestWQ()), io_image_dispatcher(new io::MockImageDispatcher()), io_object_dispatcher(new io::MockObjectDispatcher()), op_work_queue(new MockContextWQ()), @@ -127,7 +125,6 @@ struct MockImageCtx { delete operations; delete image_watcher; delete op_work_queue; - delete io_work_queue; delete io_image_dispatcher; delete io_object_dispatcher; } @@ -284,7 +281,6 @@ struct MockImageCtx { std::map*> copyup_list; - io::MockImageRequestWQ *io_work_queue; io::MockImageDispatcher *io_image_dispatcher; io::MockObjectDispatcher *io_object_dispatcher; MockContextWQ *op_work_queue; diff --git a/src/test/librbd/mock/io/MockImageRequestWQ.h b/src/test/librbd/mock/io/MockImageRequestWQ.h deleted file mode 100644 index ab0804523c12..000000000000 --- a/src/test/librbd/mock/io/MockImageRequestWQ.h +++ /dev/null @@ -1,25 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef CEPH_TEST_LIBRBD_MOCK_IO_IMAGE_REQUEST_WQ_H -#define CEPH_TEST_LIBRBD_MOCK_IO_IMAGE_REQUEST_WQ_H - -#include "gmock/gmock.h" -#include "librbd/io/Types.h" - -class Context; - -namespace librbd { -namespace io { - -struct MockImageRequestWQ { - MOCK_METHOD1(block_writes, void(Context *)); - MOCK_METHOD0(unblock_writes, void()); - - MOCK_METHOD2(set_require_lock, void(Direction, bool)); -}; - -} // namespace io -} // namespace librbd - -#endif // CEPH_TEST_LIBRBD_MOCK_IO_IMAGE_REQUEST_WQ_H diff --git a/src/test/librbd/test_ImageWatcher.cc b/src/test/librbd/test_ImageWatcher.cc index 96fdafef2e34..58a831ab7884 100644 --- a/src/test/librbd/test_ImageWatcher.cc +++ b/src/test/librbd/test_ImageWatcher.cc @@ -16,7 +16,6 @@ #include "librbd/ImageWatcher.h" #include "librbd/WatchNotifyTypes.h" #include "librbd/io/AioCompletion.h" -#include "librbd/io/ImageRequestWQ.h" #include "test/librados/test.h" #include "gtest/gtest.h" #include diff --git a/src/test/librbd/test_mirroring.cc b/src/test/librbd/test_mirroring.cc index a3eacd75747b..e3277a0a2518 100644 --- a/src/test/librbd/test_mirroring.cc +++ b/src/test/librbd/test_mirroring.cc @@ -23,7 +23,6 @@ #include "librbd/api/Namespace.h" #include "librbd/io/AioCompletion.h" #include "librbd/io/ImageRequest.h" -#include "librbd/io/ImageRequestWQ.h" #include "librbd/journal/Types.h" #include "librbd/mirror/snapshot/GetImageStateRequest.h" #include "librbd/mirror/snapshot/RemoveImageStateRequest.h"