io/ImageDispatchSpec.cc
io/ImageDispatcher.cc
io/ImageRequest.cc
- io/ImageRequestWQ.cc
io/ObjectDispatch.cc
io/ObjectDispatchSpec.cc
io/ObjectDispatcher.cc
#include "librbd/exclusive_lock/PostAcquireRequest.h"
#include "librbd/exclusive_lock/PreReleaseRequest.h"
#include "librbd/io/ImageDispatcherInterface.h"
-#include "librbd/io/ImageRequestWQ.h"
#include "librbd/Utils.h"
#include "common/ceph_mutex.h"
#include "common/dout.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/io/AsyncOperation.h"
#include "librbd/io/ImageDispatcher.h"
-#include "librbd/io/ImageRequestWQ.h"
#include "librbd/io/ObjectDispatcher.h"
#include "librbd/io/QosImageDispatch.h"
#include "librbd/journal/StandardPolicy.h"
state(new ImageState<>(this)),
operations(new Operations<>(*this)),
exclusive_lock(nullptr), object_map(nullptr),
- io_work_queue(nullptr), op_work_queue(nullptr),
+ op_work_queue(nullptr),
external_callback_completions(32),
event_socket_completions(32),
asok_hook(nullptr),
ThreadPool *thread_pool;
get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
- io_work_queue = new io::ImageRequestWQ<>(
- this, "librbd::io_work_queue",
- cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
- thread_pool);
io_image_dispatcher = new io::ImageDispatcher<ImageCtx>(this);
io_object_dispatcher = new io::ObjectDispatcher<ImageCtx>(this);
if (data_ctx.is_valid()) {
data_ctx.aio_flush();
}
- io_work_queue->drain();
delete io_object_dispatcher;
delete io_image_dispatcher;
delete journal_policy;
delete exclusive_lock_policy;
- delete io_work_queue;
delete operations;
delete state;
}
class AioCompletion;
class AsyncOperation;
template <typename> class CopyupRequest;
- template <typename> class ImageRequestWQ;
struct ImageDispatcherInterface;
struct ObjectDispatcherInterface;
}
xlist<operation::ResizeRequest<ImageCtx>*> resize_reqs;
- io::ImageRequestWQ<ImageCtx> *io_work_queue;
io::ImageDispatcherInterface *io_image_dispatcher = nullptr;
io::ObjectDispatcherInterface *io_object_dispatcher = nullptr;
#include "journal/Settings.h"
#include "journal/Utils.h"
#include "librbd/ImageCtx.h"
-#include "librbd/io/ImageRequestWQ.h"
#include "librbd/io/ObjectDispatchSpec.h"
#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/journal/CreateRequest.h"
#include "librbd/internal.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ImageDispatchSpec.h"
-#include "librbd/io/ImageRequestWQ.h"
#include "librbd/object_map/DiffRequest.h"
#include "include/rados/librados.hpp"
#include "include/interval_set.h"
#include "librbd/image/RemoveRequest.h"
#include "librbd/image/Types.h"
#include "librbd/internal.h"
-#include "librbd/io/ImageRequestWQ.h"
#include "librbd/mirror/DisableRequest.h"
#include "librbd/mirror/EnableRequest.h"
#include "librbd/io/FlushTracker.h"
#include "librbd/io/ImageDispatchSpec.h"
#include "librbd/io/ImageDispatcherInterface.h"
-#include "librbd/io/ImageRequestWQ.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
ldout(cct, 20) << "direction=" << direction << ", enabled=" << enabled
<< dendl;
- // TODO remove when ImageRequestWQ is removed
- m_image_ctx->io_work_queue->set_require_lock(direction, enabled);
-
std::unique_lock locker{m_lock};
auto prev_require_lock = (m_require_lock_on_read || m_require_lock_on_write);
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ImageDispatcher.h"
#include "librbd/io/ImageDispatchSpec.h"
-#include "librbd/io/ImageRequestWQ.h"
#include "librbd/io/ObjectDispatcherInterface.h"
#define dout_subsys ceph_subsys_rbd
<< dendl;
}
- send_shut_down_io_queue();
-}
-
-template <typename I>
-void CloseRequest<I>::send_shut_down_io_queue() {
- CephContext *cct = m_image_ctx->cct;
- ldout(cct, 10) << this << " " << __func__ << dendl;
-
- std::shared_lock owner_locker{m_image_ctx->owner_lock};
- m_image_ctx->io_work_queue->shut_down(create_context_callback<
- CloseRequest<I>, &CloseRequest<I>::handle_shut_down_io_queue>(this));
-}
-
-template <typename I>
-void CloseRequest<I>::handle_shut_down_io_queue(int r) {
- CephContext *cct = m_image_ctx->cct;
- ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
-
send_shut_down_exclusive_lock();
}
CephContext *cct = m_image_ctx->cct;
ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
- send_shut_down_object_dispatcher();
+ send_shut_down_image_dispatcher();
}
template <typename I>
-void CloseRequest<I>::send_shut_down_object_dispatcher() {
+void CloseRequest<I>::send_shut_down_image_dispatcher() {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 10) << this << " " << __func__ << dendl;
- m_image_ctx->io_object_dispatcher->shut_down(create_context_callback<
+ m_image_ctx->io_image_dispatcher->shut_down(create_context_callback<
CloseRequest<I>,
- &CloseRequest<I>::handle_shut_down_object_dispatcher>(this));
+ &CloseRequest<I>::handle_shut_down_image_dispatcher>(this));
}
template <typename I>
-void CloseRequest<I>::handle_shut_down_object_dispatcher(int r) {
+void CloseRequest<I>::handle_shut_down_image_dispatcher(int r) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
save_result(r);
if (r < 0) {
- lderr(cct) << "failed to shut down object dispatcher: "
+ lderr(cct) << "failed to shut down image dispatcher: "
<< cpp_strerror(r) << dendl;
}
- send_shut_down_image_dispatcher();
+
+ send_shut_down_object_dispatcher();
}
template <typename I>
-void CloseRequest<I>::send_shut_down_image_dispatcher() {
+void CloseRequest<I>::send_shut_down_object_dispatcher() {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 10) << this << " " << __func__ << dendl;
- m_image_ctx->io_image_dispatcher->shut_down(create_context_callback<
+ m_image_ctx->io_object_dispatcher->shut_down(create_context_callback<
CloseRequest<I>,
- &CloseRequest<I>::handle_shut_down_image_dispatcher>(this));
+ &CloseRequest<I>::handle_shut_down_object_dispatcher>(this));
}
template <typename I>
-void CloseRequest<I>::handle_shut_down_image_dispatcher(int r) {
+void CloseRequest<I>::handle_shut_down_object_dispatcher(int r) {
CephContext *cct = m_image_ctx->cct;
ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
save_result(r);
if (r < 0) {
- lderr(cct) << "failed to shut down image dispatcher: "
+ lderr(cct) << "failed to shut down object dispatcher: "
<< cpp_strerror(r) << dendl;
}
+
send_flush_op_work_queue();
}
* BLOCK_IMAGE_WATCHER (skip if R/O)
* |
* v
- * SHUT_DOWN_UPDATE_WATCHERS
- * |
- * v
- * SHUT_DOWN_AIO_WORK_QUEUE . . .
+ * SHUT_DOWN_UPDATE_WATCHERS . .
* | . (exclusive lock disabled)
* v v
* SHUT_DOWN_EXCLUSIVE_LOCK FLUSH
* FLUSH_READAHEAD
* |
* v
- * SHUT_DOWN_OBJECT_DISPATCHER
+ * SHUT_DOWN_IMAGE_DISPATCHER
* |
* v
- * SHUT_DOWN_IMAGE_DISPATCHER
+ * SHUT_DOWN_OBJECT_DISPATCHER
* |
* v
* FLUSH_OP_WORK_QUEUE
void send_shut_down_update_watchers();
void handle_shut_down_update_watchers(int r);
- void send_shut_down_io_queue();
- void handle_shut_down_io_queue(int r);
-
void send_shut_down_exclusive_lock();
void handle_shut_down_exclusive_lock(int r);
void send_flush_readahead();
void handle_flush_readahead(int r);
- void send_shut_down_object_dispatcher();
- void handle_shut_down_object_dispatcher(int r);
-
void send_shut_down_image_dispatcher();
void handle_shut_down_image_dispatcher(int r);
+ void send_shut_down_object_dispatcher();
+ void handle_shut_down_object_dispatcher(int r);
+
void send_flush_op_work_queue();
void handle_flush_op_work_queue(int r);
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "librbd/io/ImageRequestWQ.h"
-#include "common/errno.h"
-#include "common/zipkin_trace.h"
-#include "common/Cond.h"
-#include "librbd/ExclusiveLock.h"
-#include "librbd/ImageCtx.h"
-#include "librbd/ImageState.h"
-#include "librbd/ImageWatcher.h"
-#include "librbd/internal.h"
-#include "librbd/Utils.h"
-#include "librbd/exclusive_lock/Policy.h"
-#include "librbd/io/AioCompletion.h"
-#include "librbd/io/ImageRequest.h"
-#include "librbd/io/ImageDispatchSpec.h"
-#include "librbd/io/ImageDispatcher.h"
-#include "librbd/io/QosImageDispatch.h"
-#include "common/EventTrace.h"
-
-#define dout_subsys ceph_subsys_rbd
-#undef dout_prefix
-#define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
- << " " << __func__ << ": "
-
-namespace librbd {
-
-using util::create_context_callback;
-
-namespace io {
-
-template <typename I>
-struct ImageRequestWQ<I>::C_AcquireLock : public Context {
- ImageRequestWQ *work_queue;
- ImageDispatchSpec<I> *image_request;
-
- C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request)
- : work_queue(work_queue), image_request(image_request) {
- }
-
- void finish(int r) override {
- work_queue->handle_acquire_lock(r, image_request);
- }
-};
-
-template <typename I>
-struct ImageRequestWQ<I>::C_BlockedWrites : public Context {
- ImageRequestWQ *work_queue;
- explicit C_BlockedWrites(ImageRequestWQ *_work_queue)
- : work_queue(_work_queue) {
- }
-
- void finish(int r) override {
- work_queue->handle_blocked_writes(r);
- }
-};
-
-template <typename I>
-struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
- ImageRequestWQ *work_queue;
- ImageDispatchSpec<I> *image_request;
-
- C_RefreshFinish(ImageRequestWQ *work_queue,
- ImageDispatchSpec<I> *image_request)
- : work_queue(work_queue), image_request(image_request) {
- }
- void finish(int r) override {
- work_queue->handle_refreshed(r, image_request);
- }
-};
-
-template <typename I>
-ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
- time_t ti, ThreadPool *tp)
- : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp),
- m_image_ctx(*image_ctx),
- m_lock(ceph::make_shared_mutex(
- util::unique_lock_name("ImageRequestWQ<I>::m_lock", this))) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 5) << "ictx=" << image_ctx << dendl;
-
- this->register_work_queue();
-}
-
-template <typename I>
-ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
- ReadResult &&read_result, int op_flags) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
- << "len = " << len << dendl;
-
- C_SaferCond cond;
- AioCompletion *c = AioCompletion::create(&cond);
- aio_read(c, off, len, std::move(read_result), op_flags, false);
- return cond.wait();
-}
-
-template <typename I>
-ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
- bufferlist &&bl, int op_flags) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
- << "len = " << len << dendl;
-
- m_image_ctx.image_lock.lock_shared();
- int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
- m_image_ctx.image_lock.unlock_shared();
- if (r < 0) {
- lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- C_SaferCond cond;
- AioCompletion *c = AioCompletion::create(&cond);
- aio_write(c, off, len, std::move(bl), op_flags, false);
-
- r = cond.wait();
- if (r < 0) {
- return r;
- }
- return len;
-}
-
-template <typename I>
-ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
- uint32_t discard_granularity_bytes) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
- << "len = " << len << dendl;
-
- m_image_ctx.image_lock.lock_shared();
- int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
- m_image_ctx.image_lock.unlock_shared();
- if (r < 0) {
- lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- C_SaferCond cond;
- AioCompletion *c = AioCompletion::create(&cond);
- aio_discard(c, off, len, discard_granularity_bytes, false);
-
- r = cond.wait();
- if (r < 0) {
- return r;
- }
- return len;
-}
-
-template <typename I>
-ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
- bufferlist &&bl, int op_flags) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
- << "len = " << len << ", data_len " << bl.length() << dendl;
-
- m_image_ctx.image_lock.lock_shared();
- int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
- m_image_ctx.image_lock.unlock_shared();
- if (r < 0) {
- lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- C_SaferCond cond;
- AioCompletion *c = AioCompletion::create(&cond);
- aio_writesame(c, off, len, std::move(bl), op_flags, false);
-
- r = cond.wait();
- if (r < 0) {
- return r;
- }
- return len;
-}
-
-template <typename I>
-ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len,
- bufferlist &&cmp_bl,
- bufferlist &&bl,
- uint64_t *mismatch_off,
- int op_flags){
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off="
- << off << ", " << "len = " << len << dendl;
-
- m_image_ctx.image_lock.lock_shared();
- int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
- m_image_ctx.image_lock.unlock_shared();
- if (r < 0) {
- lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- C_SaferCond cond;
- AioCompletion *c = AioCompletion::create(&cond);
- aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl),
- mismatch_off, op_flags, false);
-
- r = cond.wait();
- if (r < 0) {
- return r;
- }
-
- return len;
-}
-
-template <typename I>
-int ImageRequestWQ<I>::flush() {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
-
- C_SaferCond cond;
- AioCompletion *c = AioCompletion::create(&cond);
- aio_flush(c, false);
-
- int r = cond.wait();
- if (r < 0) {
- return r;
- }
-
- return 0;
-}
-
-template <typename I>
-void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
- ReadResult &&read_result, int op_flags,
- bool native_async) {
- CephContext *cct = m_image_ctx.cct;
- FUNCTRACE(cct);
- ZTracer::Trace trace;
- if (m_image_ctx.blkin_trace_all) {
- trace.init("wq: read", &m_image_ctx.trace_endpoint);
- trace.event("start");
- }
-
- c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ);
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
- << "completion=" << c << ", off=" << off << ", "
- << "len=" << len << ", " << "flags=" << op_flags << dendl;
-
- if (native_async && m_image_ctx.event_socket.is_valid()) {
- c->set_event_notify(true);
- }
-
- if (!start_in_flight_io(c)) {
- return;
- }
-
- // if journaling is enabled -- we need to replay the journal because
- // it might contain an uncommitted write
- std::shared_lock owner_locker{m_image_ctx.owner_lock};
- if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
- require_lock_on_read()) {
- queue(ImageDispatchSpec<I>::create_read(
- m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, {{off, len}},
- std::move(read_result), op_flags, trace));
- } else {
- c->start_op();
- ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
- std::move(read_result), op_flags, trace);
- finish_in_flight_io();
- }
- trace.event("finish");
-}
-
-template <typename I>
-void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
- bufferlist &&bl, int op_flags,
- bool native_async) {
- CephContext *cct = m_image_ctx.cct;
- FUNCTRACE(cct);
- ZTracer::Trace trace;
- if (m_image_ctx.blkin_trace_all) {
- trace.init("wq: write", &m_image_ctx.trace_endpoint);
- trace.event("init");
- }
-
- c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE);
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
- << "completion=" << c << ", off=" << off << ", "
- << "len=" << len << ", flags=" << op_flags << dendl;
-
- if (native_async && m_image_ctx.event_socket.is_valid()) {
- c->set_event_notify(true);
- }
-
- if (!start_in_flight_io(c)) {
- return;
- }
-
- auto tid = ++m_last_tid;
-
- {
- std::lock_guard locker{m_lock};
- m_queued_or_blocked_io_tids.insert(tid);
- }
-
- ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write(
- m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, {{off, len}}, std::move(bl),
- op_flags, trace, tid);
-
- std::shared_lock owner_locker{m_image_ctx.owner_lock};
- queue(req);
- trace.event("finish");
-}
-
-template <typename I>
-void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
- uint64_t len,
- uint32_t discard_granularity_bytes,
- bool native_async) {
- CephContext *cct = m_image_ctx.cct;
- FUNCTRACE(cct);
- ZTracer::Trace trace;
- if (m_image_ctx.blkin_trace_all) {
- trace.init("wq: discard", &m_image_ctx.trace_endpoint);
- trace.event("init");
- }
-
- c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD);
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
- << "completion=" << c << ", off=" << off << ", len=" << len
- << dendl;
-
- if (native_async && m_image_ctx.event_socket.is_valid()) {
- c->set_event_notify(true);
- }
-
- if (!start_in_flight_io(c)) {
- return;
- }
-
- auto tid = ++m_last_tid;
-
- {
- std::lock_guard locker{m_lock};
- m_queued_or_blocked_io_tids.insert(tid);
- }
-
- ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_discard(
- m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, off, len,
- discard_granularity_bytes, trace, tid);
-
- std::shared_lock owner_locker{m_image_ctx.owner_lock};
- queue(req);
- trace.event("finish");
-}
-
-template <typename I>
-void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
- CephContext *cct = m_image_ctx.cct;
- FUNCTRACE(cct);
- ZTracer::Trace trace;
- if (m_image_ctx.blkin_trace_all) {
- trace.init("wq: flush", &m_image_ctx.trace_endpoint);
- trace.event("init");
- }
-
- c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH);
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
- << "completion=" << c << dendl;
-
- if (native_async && m_image_ctx.event_socket.is_valid()) {
- c->set_event_notify(true);
- }
-
- if (!start_in_flight_io(c)) {
- return;
- }
-
- auto tid = ++m_last_tid;
-
- ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_flush(
- m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, FLUSH_SOURCE_USER, trace);
-
- {
- std::lock_guard locker{m_lock};
- if(!m_queued_or_blocked_io_tids.empty()) {
- ldout(cct, 20) << "queueing flush, tid: " << tid << dendl;
- m_queued_flushes.emplace(tid, req);
- --m_in_flight_ios;
- return;
- }
- }
-
- std::shared_lock owner_locker{m_image_ctx.owner_lock};
- queue(req);
- trace.event("finish");
-}
-
-template <typename I>
-void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
- uint64_t len, bufferlist &&bl,
- int op_flags, bool native_async) {
- CephContext *cct = m_image_ctx.cct;
- FUNCTRACE(cct);
- ZTracer::Trace trace;
- if (m_image_ctx.blkin_trace_all) {
- trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
- trace.event("init");
- }
-
- c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME);
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
- << "completion=" << c << ", off=" << off << ", "
- << "len=" << len << ", data_len = " << bl.length() << ", "
- << "flags=" << op_flags << dendl;
-
- if (native_async && m_image_ctx.event_socket.is_valid()) {
- c->set_event_notify(true);
- }
-
- if (!start_in_flight_io(c)) {
- return;
- }
-
- auto tid = ++m_last_tid;
-
- {
- std::lock_guard locker{m_lock};
- m_queued_or_blocked_io_tids.insert(tid);
- }
-
- ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_same(
- m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, off, len, std::move(bl),
- op_flags, trace, tid);
-
- std::shared_lock owner_locker{m_image_ctx.owner_lock};
- queue(req);
- trace.event("finish");
-}
-
-template <typename I>
-void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
- uint64_t off, uint64_t len,
- bufferlist &&cmp_bl,
- bufferlist &&bl,
- uint64_t *mismatch_off,
- int op_flags, bool native_async) {
- CephContext *cct = m_image_ctx.cct;
- FUNCTRACE(cct);
- ZTracer::Trace trace;
- if (m_image_ctx.blkin_trace_all) {
- trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint);
- trace.event("init");
- }
-
- c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE);
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
- << "completion=" << c << ", off=" << off << ", "
- << "len=" << len << dendl;
-
- if (native_async && m_image_ctx.event_socket.is_valid()) {
- c->set_event_notify(true);
- }
-
- if (!start_in_flight_io(c)) {
- return;
- }
-
- auto tid = ++m_last_tid;
-
- {
- std::lock_guard locker{m_lock};
- m_queued_or_blocked_io_tids.insert(tid);
- }
-
- ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_compare_and_write(
- m_image_ctx, IMAGE_DISPATCH_LAYER_API_START, c, {{off, len}},
- std::move(cmp_bl), std::move(bl), mismatch_off, op_flags, trace, tid);
-
- std::shared_lock owner_locker{m_image_ctx.owner_lock};
- queue(req);
- trace.event("finish");
-}
-
-template <typename I>
-bool ImageRequestWQ<I>::block_overlapping_io(
- ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx
- << "off: " << off << " len: " << len <<dendl;
-
- if(len == 0) {
- return false;
- }
-
- if (in_flight_image_extents->empty() ||
- !in_flight_image_extents->intersects(off, len)) {
- in_flight_image_extents->insert(off, len);
- return false;
- }
-
- return true;
-}
-
-template <typename I>
-void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length,
- uint64_t tid) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
-
- remove_in_flight_write_ios(offset, length, true, tid);
-
- std::unique_lock locker{m_lock};
- if (!m_blocked_ios.empty()) {
- auto it = m_blocked_ios.begin();
- while (it != m_blocked_ios.end()) {
- auto blocked_io = *it;
-
- const auto& extents = blocked_io->get_image_extents();
- uint64_t off = extents.size() ? extents.front().first : 0;
- uint64_t len = extents.size() ? extents.front().second : 0;
-
- if (block_overlapping_io(&m_in_flight_extents, off, len)) {
- break;
- }
- ldout(cct, 20) << "unblocking off: " << off << ", "
- << "len: " << len << dendl;
- AioCompletion *aio_comp = blocked_io->get_aio_completion();
-
- m_blocked_ios.erase(it);
- locker.unlock();
- queue_unblocked_io(aio_comp, blocked_io);
- locker.lock();
- }
- }
-}
-
-template <typename I>
-void ImageRequestWQ<I>::unblock_flushes() {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
- std::unique_lock locker{m_lock};
- auto io_tid_it = m_queued_or_blocked_io_tids.begin();
- while (true) {
- auto it = m_queued_flushes.begin();
- if (it == m_queued_flushes.end() ||
- (io_tid_it != m_queued_or_blocked_io_tids.end() &&
- *io_tid_it < it->first)) {
- break;
- }
-
- auto blocked_flush = *it;
- ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl;
-
- AioCompletion *aio_comp = blocked_flush.second->get_aio_completion();
-
- m_queued_flushes.erase(it);
- locker.unlock();
- queue_unblocked_io(aio_comp, blocked_flush.second);
- locker.lock();
- }
-}
-
-template <typename I>
-void ImageRequestWQ<I>::queue_unblocked_io(AioCompletion *comp,
- ImageDispatchSpec<I> *req) {
- if (!start_in_flight_io(comp)) {
- return;
- }
-
- std::shared_lock owner_locker{m_image_ctx.owner_lock};
- queue(req);
-}
-
-template <typename I>
-void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
- ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
-
- {
- std::unique_lock locker{m_lock};
- ceph_assert(!m_shutdown);
- m_shutdown = true;
-
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load()
- << dendl;
- if (m_in_flight_ios > 0) {
- m_on_shutdown = on_shutdown;
- return;
- }
- }
-
- m_image_ctx.op_work_queue->queue(on_shutdown, 0);
-}
-
-template <typename I>
-void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
- ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
- CephContext *cct = m_image_ctx.cct;
-
- {
- std::unique_lock locker{m_lock};
- ++m_write_blockers;
- ldout(cct, 5) << &m_image_ctx << ", " << "num="
- << m_write_blockers << dendl;
- if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) {
- m_write_blocker_contexts.push_back(on_blocked);
- return;
- }
- }
-
- m_image_ctx.op_work_queue->queue(on_blocked, 0);
-}
-
-template <typename I>
-void ImageRequestWQ<I>::unblock_writes() {
- CephContext *cct = m_image_ctx.cct;
-
- bool wake_up = false;
- Contexts waiter_contexts;
- {
- std::unique_lock locker{m_lock};
- ceph_assert(m_write_blockers > 0);
- --m_write_blockers;
-
- ldout(cct, 5) << &m_image_ctx << ", " << "num="
- << m_write_blockers << dendl;
- if (m_write_blockers == 0) {
- wake_up = true;
- std::swap(waiter_contexts, m_unblocked_write_waiter_contexts);
- }
- }
-
- if (wake_up) {
- for (auto ctx : waiter_contexts) {
- ctx->complete(0);
- }
- this->signal();
- }
-}
-
-template <typename I>
-void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) {
- ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
- CephContext *cct = m_image_ctx.cct;
-
- {
- std::unique_lock locker{m_lock};
- ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers="
- << m_write_blockers << dendl;
- if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) {
- m_unblocked_write_waiter_contexts.push_back(on_unblocked);
- return;
- }
- }
-
- on_unblocked->complete(0);
-}
-
-template <typename I>
-void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << dendl;
-
- bool wake_up = false;
- {
- std::unique_lock locker{m_lock};
- switch (direction) {
- case DIRECTION_READ:
- wake_up = (enabled != m_require_lock_on_read);
- m_require_lock_on_read = enabled;
- break;
- case DIRECTION_WRITE:
- wake_up = (enabled != m_require_lock_on_write);
- m_require_lock_on_write = enabled;
- break;
- case DIRECTION_BOTH:
- wake_up = (enabled != m_require_lock_on_read ||
- enabled != m_require_lock_on_write);
- m_require_lock_on_read = enabled;
- m_require_lock_on_write = enabled;
- break;
- }
- }
-
- // wake up the thread pool whenever the state changes so that
- // we can re-request the lock if required
- if (wake_up) {
- this->signal();
- }
-}
-
-template <typename I>
-void *ImageRequestWQ<I>::_void_dequeue() {
- CephContext *cct = m_image_ctx.cct;
- ImageDispatchSpec<I> *peek_item = this->front();
-
- // no queued IO requests or all IO is blocked/stalled
- if (peek_item == nullptr || m_io_blockers.load() > 0) {
- return nullptr;
- }
-
- bool lock_required;
- bool refresh_required = m_image_ctx.state->is_refresh_required();
- {
- std::shared_lock locker{m_lock};
- bool write_op = peek_item->is_write_op();
- lock_required = is_lock_required(write_op);
- if (write_op) {
- if (!lock_required && m_write_blockers > 0) {
- // missing lock is not the write blocker
- return nullptr;
- }
-
- if (!lock_required && !refresh_required && !peek_item->blocked) {
- // completed ops will requeue the IO -- don't count it as in-progress
- m_in_flight_writes++;
- }
- }
- }
-
- auto item = reinterpret_cast<ImageDispatchSpec<I> *>(
- ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue());
- ceph_assert(peek_item == item);
-
- if (lock_required) {
- this->get_pool_lock().unlock();
- m_image_ctx.owner_lock.lock_shared();
- if (m_image_ctx.exclusive_lock != nullptr) {
- ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
- if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
- lderr(cct) << "op requires exclusive lock" << dendl;
- fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(),
- item);
-
- // wake up the IO since we won't be returning a request to process
- this->signal();
- } else {
- // stall IO until the acquire completes
- ++m_io_blockers;
- Context *ctx = new C_AcquireLock(this, item);
- ctx = create_context_callback<
- Context, &Context::complete>(
- ctx, m_image_ctx.exclusive_lock);
- m_image_ctx.exclusive_lock->acquire_lock(ctx);
- }
- } else {
- // raced with the exclusive lock being disabled
- lock_required = false;
- }
- m_image_ctx.owner_lock.unlock_shared();
- this->get_pool_lock().lock();
-
- if (lock_required) {
- return nullptr;
- }
- }
-
- if (refresh_required) {
- ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
-
- // stall IO until the refresh completes
- ++m_io_blockers;
-
- this->get_pool_lock().unlock();
- m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
- this->get_pool_lock().lock();
- return nullptr;
- }
-
- return item;
-}
-
-template <typename I>
-void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req,
- bool non_blocking_io) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
- << "req=" << req << dendl;
-
- //extents are invalidated after the request is sent
- //so gather them ahead of that
- const auto& extents = req->get_image_extents();
- bool write_op = req->is_write_op();
- uint64_t tid = req->get_tid();
- uint64_t offset = extents.size() ? extents.front().first : 0;
- uint64_t length = extents.size() ? extents.front().second : 0;
-
- if (write_op && !req->blocked) {
- std::lock_guard locker{m_lock};
- bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length);
- if (blocked) {
- ldout(cct, 20) << "blocking overlapping IO: " << "ictx="
- << &m_image_ctx << ", "
- << "off=" << offset << ", len=" << length << dendl;
- req->blocked = true;
- m_blocked_ios.push_back(req);
- return;
- }
- }
-
- req->start_op();
- req->send();
-
- if (write_op) {
- if (non_blocking_io) {
- finish_in_flight_write();
- }
- unblock_overlapping_io(offset, length, tid);
- unblock_flushes();
- }
-}
-
-template <typename I>
-void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
- << "req=" << req << dendl;
-
- bool write_op = req->is_write_op();
-
- process_io(req, true);
-
- finish_queued_io(write_op);
- finish_in_flight_io();
-}
-
-template <typename I>
-void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t length,
- bool write_op, uint64_t tid) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
- {
- std::lock_guard locker{m_lock};
- if (write_op) {
- if (length > 0) {
- if(!m_in_flight_extents.empty()) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "erasing in flight extents with tid:"
- << tid << ", offset: " << offset << dendl;
- ImageExtentIntervals extents;
- extents.insert(offset, length);
- ImageExtentIntervals intersect;
- intersect.intersection_of(extents, m_in_flight_extents);
- m_in_flight_extents.subtract(intersect);
- }
- }
- m_queued_or_blocked_io_tids.erase(tid);
- }
- }
-}
-
-template <typename I>
-void ImageRequestWQ<I>::finish_queued_io(bool write_op) {
- std::shared_lock locker{m_lock};
- if (write_op) {
- ceph_assert(m_queued_writes > 0);
- m_queued_writes--;
- } else {
- ceph_assert(m_queued_reads > 0);
- m_queued_reads--;
- }
-}
-
-template <typename I>
-void ImageRequestWQ<I>::finish_in_flight_write() {
- bool writes_blocked = false;
- {
- std::shared_lock locker{m_lock};
- ceph_assert(m_in_flight_writes > 0);
- if (--m_in_flight_writes == 0 &&
- !m_write_blocker_contexts.empty()) {
- writes_blocked = true;
- }
- }
-
- if (writes_blocked) {
- m_image_ctx.op_work_queue->queue(create_context_callback<
- ImageRequestWQ<I>, &ImageRequestWQ<I>::handle_blocked_writes>(this), 0);
- }
-}
-
-template <typename I>
-int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
- std::shared_lock locker{m_lock};
-
- if (m_shutdown) {
- CephContext *cct = m_image_ctx.cct;
- lderr(cct) << "IO received on closed image" << dendl;
-
- c->fail(-ESHUTDOWN);
- return false;
- }
-
- if (!m_image_ctx.data_ctx.is_valid()) {
- CephContext *cct = m_image_ctx.cct;
- lderr(cct) << "missing data pool" << dendl;
-
- c->fail(-ENODEV);
- return false;
- }
-
- m_in_flight_ios++;
- return true;
-}
-
-template <typename I>
-void ImageRequestWQ<I>::finish_in_flight_io() {
- Context *on_shutdown;
- {
- std::shared_lock locker{m_lock};
- if (--m_in_flight_ios > 0 || !m_shutdown) {
- return;
- }
- on_shutdown = m_on_shutdown;
- }
-
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 5) << "completing shut down" << dendl;
-
- on_shutdown->complete(0);
-}
-
-template <typename I>
-void ImageRequestWQ<I>::fail_in_flight_io(
- int r, ImageDispatchSpec<I> *req) {
- this->process_finish();
- req->fail(r);
-
- bool write_op = req->is_write_op();
- uint64_t tid = req->get_tid();
- const auto& extents = req->get_image_extents();
- uint64_t offset = extents.size() ? extents.front().first : 0;
- uint64_t length = extents.size() ? extents.front().second : 0;
-
- finish_queued_io(write_op);
- remove_in_flight_write_ios(offset, length, write_op, tid);
- finish_in_flight_io();
-}
-
-template <typename I>
-bool ImageRequestWQ<I>::is_lock_required(bool write_op) const {
- ceph_assert(ceph_mutex_is_locked(m_lock));
- return ((write_op && m_require_lock_on_write) ||
- (!write_op && m_require_lock_on_read));
-}
-
-template <typename I>
-void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) {
- ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
-
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
- << "req=" << req << dendl;
-
- if (req->is_write_op()) {
- m_queued_writes++;
- } else {
- m_queued_reads++;
- }
-
- ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req);
-}
-
-template <typename I>
-void ImageRequestWQ<I>::handle_acquire_lock(
- int r, ImageDispatchSpec<I> *req) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
-
- if (r < 0) {
- fail_in_flight_io(r, req);
- } else {
- // since IO was stalled for acquire -- original IO order is preserved
- // if we requeue this op for work queue processing
- this->requeue_front(req);
- }
-
- ceph_assert(m_io_blockers.load() > 0);
- --m_io_blockers;
- this->signal();
-}
-
-template <typename I>
-void ImageRequestWQ<I>::handle_refreshed(
- int r, ImageDispatchSpec<I> *req) {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
- << "req=" << req << dendl;
- if (r < 0) {
- fail_in_flight_io(r, req);
- } else {
- // since IO was stalled for refresh -- original IO order is preserved
- // if we requeue this op for work queue processing
- this->requeue_front(req);
- }
-
- ceph_assert(m_io_blockers.load() > 0);
- --m_io_blockers;
- this->signal();
-}
-
-template <typename I>
-void ImageRequestWQ<I>::handle_blocked_writes(int r) {
- Contexts contexts;
- {
- std::unique_lock locker{m_lock};
- contexts.swap(m_write_blocker_contexts);
- }
-
- for (auto ctx : contexts) {
- ctx->complete(0);
- }
-}
-
-template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
-
-} // namespace io
-} // namespace librbd
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H
-#define CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H
-
-#include "include/Context.h"
-#include "common/ceph_mutex.h"
-#include "common/WorkQueue.h"
-#include "librbd/io/Types.h"
-#include "include/interval_set.h"
-#include <list>
-#include <atomic>
-#include <vector>
-
-namespace librbd {
-
-class ImageCtx;
-
-namespace io {
-
-class AioCompletion;
-template <typename> class ImageDispatchSpec;
-class ReadResult;
-
-template <typename ImageCtxT = librbd::ImageCtx>
-class ImageRequestWQ
- : public ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT> > {
-public:
- ImageRequestWQ(ImageCtxT *image_ctx, const string &name, time_t ti,
- ThreadPool *tp);
-
- ssize_t read(uint64_t off, uint64_t len, ReadResult &&read_result,
- int op_flags);
- ssize_t write(uint64_t off, uint64_t len, bufferlist &&bl, int op_flags);
- ssize_t discard(uint64_t off, uint64_t len,
- uint32_t discard_granularity_bytes);
- ssize_t writesame(uint64_t off, uint64_t len, bufferlist &&bl, int op_flags);
- ssize_t compare_and_write(uint64_t off, uint64_t len,
- bufferlist &&cmp_bl, bufferlist &&bl,
- uint64_t *mismatch_off, int op_flags);
- int flush();
-
- void aio_read(AioCompletion *c, uint64_t off, uint64_t len,
- ReadResult &&read_result, int op_flags, bool native_async=true);
- void aio_write(AioCompletion *c, uint64_t off, uint64_t len,
- bufferlist &&bl, int op_flags, bool native_async=true);
- void aio_discard(AioCompletion *c, uint64_t off, uint64_t len,
- uint32_t discard_granularity_bytes, bool native_async=true);
- void aio_flush(AioCompletion *c, bool native_async=true);
- void aio_writesame(AioCompletion *c, uint64_t off, uint64_t len,
- bufferlist &&bl, int op_flags, bool native_async=true);
- void aio_compare_and_write(AioCompletion *c, uint64_t off,
- uint64_t len, bufferlist &&cmp_bl,
- bufferlist &&bl, uint64_t *mismatch_off,
- int op_flags, bool native_async=true);
-
- using ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT> >::drain;
- using ThreadPool::PointerWQ<ImageDispatchSpec<ImageCtxT> >::empty;
-
- void shut_down(Context *on_shutdown);
-
- inline bool writes_blocked() const {
- std::shared_lock locker{m_lock};
- return (m_write_blockers > 0);
- }
-
- void block_writes(Context *on_blocked);
- void unblock_writes();
-
- void wait_on_writes_unblocked(Context *on_unblocked);
-
- void set_require_lock(Direction direction, bool enabled);
-
-protected:
- void *_void_dequeue() override;
- void process(ImageDispatchSpec<ImageCtxT> *req) override;
-
-private:
- typedef std::list<Context *> Contexts;
-
- struct C_AcquireLock;
- struct C_BlockedWrites;
- struct C_RefreshFinish;
-
- ImageCtxT &m_image_ctx;
- mutable ceph::shared_mutex m_lock;
- Contexts m_write_blocker_contexts;
- uint32_t m_write_blockers = 0;
- Contexts m_unblocked_write_waiter_contexts;
- bool m_require_lock_on_read = false;
- bool m_require_lock_on_write = false;
- std::atomic<unsigned> m_queued_reads { 0 };
- std::atomic<unsigned> m_queued_writes { 0 };
- std::atomic<unsigned> m_in_flight_ios { 0 };
- std::atomic<unsigned> m_in_flight_writes { 0 };
- std::atomic<unsigned> m_io_blockers { 0 };
-
- typedef interval_set<uint64_t> ImageExtentIntervals;
- ImageExtentIntervals m_in_flight_extents;
-
- std::vector<ImageDispatchSpec<ImageCtxT>*> m_blocked_ios;
- std::atomic<unsigned> m_last_tid { 0 };
- std::set<uint64_t> m_queued_or_blocked_io_tids;
- std::map<uint64_t, ImageDispatchSpec<ImageCtxT>*> m_queued_flushes;
-
- bool m_shutdown = false;
- Context *m_on_shutdown = nullptr;
-
- bool is_lock_required(bool write_op) const;
-
- inline bool require_lock_on_read() const {
- std::shared_lock locker{m_lock};
- return m_require_lock_on_read;
- }
- inline bool writes_empty() const {
- std::shared_lock locker{m_lock};
- return (m_queued_writes == 0);
- }
-
- void finish_queued_io(bool write_op);
- void remove_in_flight_write_ios(uint64_t offset, uint64_t length,
- bool write_op, uint64_t tid);
- void finish_in_flight_write();
-
- void unblock_flushes();
- bool block_overlapping_io(ImageExtentIntervals* in_flight_image_extents,
- uint64_t object_off, uint64_t object_len);
- void unblock_overlapping_io(uint64_t offset, uint64_t length, uint64_t tid);
- int start_in_flight_io(AioCompletion *c);
- void finish_in_flight_io();
- void fail_in_flight_io(int r, ImageDispatchSpec<ImageCtxT> *req);
- void process_io(ImageDispatchSpec<ImageCtxT> *req, bool non_blocking_io);
-
- void queue(ImageDispatchSpec<ImageCtxT> *req);
- void queue_unblocked_io(AioCompletion *comp,
- ImageDispatchSpec<ImageCtxT> *req);
-
- void handle_acquire_lock(int r, ImageDispatchSpec<ImageCtxT> *req);
- void handle_refreshed(int r, ImageDispatchSpec<ImageCtxT> *req);
- void handle_blocked_writes(int r);
-};
-
-} // namespace io
-} // namespace librbd
-
-extern template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
-
-#endif // CEPH_LIBRBD_IO_IMAGE_REQUEST_WQ_H
#include "librbd/io/QueueImageDispatch.h"
#include "common/dout.h"
#include "common/Cond.h"
+#include "common/WorkQueue.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ImageDispatchSpec.h"
-#include "librbd/io/ImageRequestWQ.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
on_blocked = util::create_async_context_callback(
*m_image_ctx, on_blocked);
- // TODO temp
- auto ctx = new C_Gather(cct, on_blocked);
- m_image_ctx->io_work_queue->block_writes(ctx->new_sub());
-
{
std::unique_lock locker{m_lock};
++m_write_blockers;
if (!m_write_blocker_contexts.empty() || !m_in_flight_write_tids.empty()) {
ldout(cct, 5) << "waiting for in-flight writes to complete: "
<< "write_tids=" << m_in_flight_write_tids << dendl;
- m_write_blocker_contexts.push_back(ctx->new_sub());
- ctx->activate();
+ m_write_blocker_contexts.push_back(on_blocked);
return;
}
}
// ensure that all in-flight IO is flushed
- flush_image(ctx->new_sub());
- ctx->activate();
+ flush_image(on_blocked);
};
template <typename I>
for (auto ctx : dispatch_contexts) {
ctx->complete(0);
}
-
- // TODO temp
- m_image_ctx->io_work_queue->unblock_writes();
}
template <typename I>
ceph_assert(ceph_mutex_is_locked(m_image_ctx->owner_lock));
auto cct = m_image_ctx->cct;
- // TODO temp
- auto ctx = new C_Gather(cct, on_unblocked);
- m_image_ctx->io_work_queue->wait_on_writes_unblocked(ctx->new_sub());
-
{
std::unique_lock locker{m_lock};
ldout(cct, 20) << m_image_ctx << ", "
<< "write_blockers=" << m_write_blockers << dendl;
if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) {
- m_unblocked_write_waiter_contexts.push_back(ctx->new_sub());
- ctx->activate();
+ m_unblocked_write_waiter_contexts.push_back(on_unblocked);
return;
}
}
- ctx->activate();
+ on_unblocked->complete(0);
}
template <typename I>
image/test_mock_ValidatePoolRequest.cc
io/test_mock_CopyupRequest.cc
io/test_mock_ImageRequest.cc
- io/test_mock_ImageRequestWQ.cc
io/test_mock_ObjectRequest.cc
io/test_mock_SimpleSchedulerObjectDispatch.cc
journal/test_mock_OpenRequest.cc
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "test/librbd/test_mock_fixture.h"
-#include "test/librbd/test_support.h"
-#include "test/librbd/mock/MockImageCtx.h"
-#include "test/librbd/mock/exclusive_lock/MockPolicy.h"
-#include "test/librbd/mock/io/MockQosImageDispatch.h"
-#include "librbd/io/ImageDispatchSpec.h"
-#include "librbd/io/ImageRequestWQ.h"
-#include "librbd/io/ImageRequest.h"
-
-namespace librbd {
-namespace {
-
-struct MockTestImageCtx : public MockImageCtx {
- MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) {
- }
-};
-
-} // anonymous namespace
-
-namespace io {
-
-template <>
-struct ImageRequest<librbd::MockTestImageCtx> {
- static ImageRequest* s_instance;
- AioCompletion *aio_comp = nullptr;
-
- static void aio_write(librbd::MockTestImageCtx *ictx, AioCompletion *c,
- Extents &&image_extents, bufferlist &&bl, int op_flags,
- const ZTracer::Trace &parent_trace) {
- }
-
- ImageRequest() {
- s_instance = this;
- }
-};
-
-template <>
-struct ImageDispatchSpec<librbd::MockTestImageCtx> {
- static ImageDispatchSpec* s_instance;
- AioCompletion *aio_comp = nullptr;
- bool blocked = false;
-
- std::atomic<uint32_t> image_dispatch_flags = {0};
-
- static ImageDispatchSpec* create_write(
- librbd::MockTestImageCtx &image_ctx, ImageDispatchLayer dispatch_layer,
- AioCompletion *aio_comp, Extents &&image_extents, bufferlist &&bl,
- int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) {
- ceph_assert(s_instance != nullptr);
- s_instance->aio_comp = aio_comp;
- return s_instance;
- }
-
- static ImageDispatchSpec* create_flush(
- librbd::MockTestImageCtx &image_ctx, ImageDispatchLayer dispatch_layer,
- AioCompletion *aio_comp, FlushSource flush_source,
- const ZTracer::Trace &parent_trace) {
- ceph_assert(s_instance != nullptr);
- s_instance->aio_comp = aio_comp;
- return s_instance;
- }
-
- MOCK_CONST_METHOD0(is_write_op, bool());
- MOCK_CONST_METHOD0(start_op, void());
- MOCK_CONST_METHOD0(send, void());
- MOCK_CONST_METHOD1(fail, void(int));
- MOCK_CONST_METHOD0(get_image_extents, Extents());
- MOCK_CONST_METHOD0(get_aio_completion, AioCompletion*());
- MOCK_CONST_METHOD0(get_tid, uint64_t());
-
- ImageDispatchSpec() {
- s_instance = this;
- }
-};
-} // namespace io
-
-namespace util {
-
-inline ImageCtx *get_image_ctx(MockTestImageCtx *image_ctx) {
- return image_ctx->image_ctx;
-}
-
-} // namespace util
-
-} // namespace librbd
-
-template <>
-struct ThreadPool::PointerWQ<librbd::io::ImageDispatchSpec<librbd::MockTestImageCtx>> {
- typedef librbd::io::ImageDispatchSpec<librbd::MockTestImageCtx> ImageDispatchSpec;
- static PointerWQ* s_instance;
-
- ceph::mutex m_lock;
-
- PointerWQ(const std::string &name, time_t, int, ThreadPool *)
- : m_lock(ceph::make_mutex(name)) {
- s_instance = this;
- }
- virtual ~PointerWQ() {
- }
-
- MOCK_METHOD0(drain, void());
- MOCK_METHOD0(empty, bool());
- MOCK_METHOD0(mock_empty, bool());
- MOCK_METHOD0(signal, void());
- MOCK_METHOD0(process_finish, void());
-
- MOCK_METHOD0(front, ImageDispatchSpec*());
- MOCK_METHOD1(requeue_front, void(ImageDispatchSpec*));
- MOCK_METHOD1(requeue_back, void(ImageDispatchSpec*));
-
- MOCK_METHOD0(dequeue, void*());
- MOCK_METHOD1(queue, void(ImageDispatchSpec*));
-
- void register_work_queue() {
- // no-op
- }
- ceph::mutex &get_pool_lock() {
- return m_lock;
- }
-
- void* invoke_dequeue() {
- std::lock_guard locker{m_lock};
- return _void_dequeue();
- }
- void invoke_process(ImageDispatchSpec *image_request) {
- process(image_request);
- }
-
- virtual void *_void_dequeue() {
- return dequeue();
- }
- virtual void process(ImageDispatchSpec *req) = 0;
- virtual bool _empty() {
- return mock_empty();
- }
-
-};
-
-ThreadPool::PointerWQ<librbd::io::ImageDispatchSpec<librbd::MockTestImageCtx>>*
- ThreadPool::PointerWQ<librbd::io::ImageDispatchSpec<librbd::MockTestImageCtx>>::s_instance = nullptr;
-librbd::io::ImageRequest<librbd::MockTestImageCtx>*
- librbd::io::ImageRequest<librbd::MockTestImageCtx>::s_instance = nullptr;
-librbd::io::ImageDispatchSpec<librbd::MockTestImageCtx>*
- librbd::io::ImageDispatchSpec<librbd::MockTestImageCtx>::s_instance = nullptr;
-
-#include "librbd/io/ImageRequestWQ.cc"
-
-namespace librbd {
-namespace io {
-
-using ::testing::_;
-using ::testing::InSequence;
-using ::testing::Invoke;
-using ::testing::Return;
-using ::testing::WithArg;
-
-struct TestMockIoImageRequestWQ : public TestMockFixture {
- typedef ImageRequestWQ<librbd::MockTestImageCtx> MockImageRequestWQ;
- typedef ImageRequest<librbd::MockTestImageCtx> MockImageRequest;
- typedef ImageDispatchSpec<librbd::MockTestImageCtx> MockImageDispatchSpec;
-
- void expect_is_write_op(MockImageDispatchSpec &image_request,
- bool write_op) {
- EXPECT_CALL(image_request, is_write_op()).WillOnce(Return(write_op));
- }
-
- void expect_signal(MockImageRequestWQ &image_request_wq) {
- EXPECT_CALL(image_request_wq, signal());
- }
-
- void expect_queue(MockImageRequestWQ &image_request_wq) {
- EXPECT_CALL(image_request_wq, queue(_));
- }
-
- void expect_requeue_back(MockImageRequestWQ &image_request_wq) {
- EXPECT_CALL(image_request_wq, requeue_back(_));
- }
-
- void expect_front(MockImageRequestWQ &image_request_wq,
- MockImageDispatchSpec& image_request) {
- EXPECT_CALL(image_request_wq, front()).WillOnce(Return(&image_request));
- }
-
- void expect_is_refresh_request(MockTestImageCtx &mock_image_ctx,
- bool required) {
- EXPECT_CALL(*mock_image_ctx.state, is_refresh_required()).WillOnce(
- Return(required));
- }
-
- void expect_dequeue(MockImageRequestWQ &image_request_wq,
- MockImageDispatchSpec &image_request) {
- EXPECT_CALL(image_request_wq, dequeue()).WillOnce(Return(&image_request));
- }
-
- void expect_get_exclusive_lock_policy(MockTestImageCtx &mock_image_ctx,
- librbd::exclusive_lock::MockPolicy &policy) {
- EXPECT_CALL(mock_image_ctx,
- get_exclusive_lock_policy()).WillOnce(Return(&policy));
- }
-
- void expect_may_auto_request_lock(librbd::exclusive_lock::MockPolicy &policy,
- bool value) {
- EXPECT_CALL(policy, may_auto_request_lock()).WillOnce(Return(value));
- }
-
- void expect_acquire_lock(MockExclusiveLock &mock_exclusive_lock,
- Context **on_finish) {
- EXPECT_CALL(mock_exclusive_lock, acquire_lock(_))
- .WillOnce(Invoke([on_finish](Context *ctx) {
- *on_finish = ctx;
- }));
- }
-
- void expect_process_finish(MockImageRequestWQ &mock_image_request_wq) {
- EXPECT_CALL(mock_image_request_wq, process_finish()).Times(1);
- }
-
- void expect_fail(MockImageDispatchSpec &mock_image_request, int r) {
- EXPECT_CALL(mock_image_request, fail(r))
- .WillOnce(Invoke([&mock_image_request](int r) {
- mock_image_request.aio_comp->fail(r);
- }));
- }
-
- void expect_refresh(MockTestImageCtx &mock_image_ctx, Context **on_finish) {
- EXPECT_CALL(*mock_image_ctx.state, refresh(_))
- .WillOnce(Invoke([on_finish](Context *ctx) {
- *on_finish = ctx;
- }));
- }
-
- void expect_start_op(MockImageDispatchSpec &mock_image_request) {
- EXPECT_CALL(mock_image_request, start_op()).Times(1);
- }
-
- void expect_get_image_extents(MockImageDispatchSpec &mock_image_request,
- const Extents &extents) {
- EXPECT_CALL(mock_image_request, get_image_extents())
- .WillRepeatedly(Return(extents));
- }
-
- void expect_get_tid(MockImageDispatchSpec &mock_image_request, uint64_t tid) {
- EXPECT_CALL(mock_image_request, get_tid()).WillOnce(Return(tid));
- }
-};
-
-TEST_F(TestMockIoImageRequestWQ, AcquireLockError) {
- REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
-
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
- MockExclusiveLock mock_exclusive_lock;
- mock_image_ctx.exclusive_lock = &mock_exclusive_lock;
-
- MockImageDispatchSpec mock_queued_image_request;
- expect_get_image_extents(mock_queued_image_request, {});
- expect_get_tid(mock_queued_image_request, 0);
-
- InSequence seq;
- MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
- expect_signal(mock_image_request_wq);
- mock_image_request_wq.set_require_lock(DIRECTION_WRITE, true);
-
- expect_is_write_op(mock_queued_image_request, true);
- expect_queue(mock_image_request_wq);
- auto *aio_comp = new librbd::io::AioCompletion();
- mock_image_request_wq.aio_write(aio_comp, 0, 0, {}, 0);
-
- librbd::exclusive_lock::MockPolicy mock_exclusive_lock_policy;
- expect_front(mock_image_request_wq, mock_queued_image_request);
- expect_is_refresh_request(mock_image_ctx, false);
- expect_is_write_op(mock_queued_image_request, true);
- expect_dequeue(mock_image_request_wq, mock_queued_image_request);
- expect_get_exclusive_lock_policy(mock_image_ctx, mock_exclusive_lock_policy);
- expect_may_auto_request_lock(mock_exclusive_lock_policy, true);
- Context *on_acquire = nullptr;
- expect_acquire_lock(mock_exclusive_lock, &on_acquire);
- ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr);
- ASSERT_TRUE(on_acquire != nullptr);
-
- expect_process_finish(mock_image_request_wq);
- expect_fail(mock_queued_image_request, -EPERM);
- expect_is_write_op(mock_queued_image_request, true);
- expect_signal(mock_image_request_wq);
- on_acquire->complete(-EPERM);
-
- ASSERT_EQ(0, aio_comp->wait_for_complete());
- ASSERT_EQ(-EPERM, aio_comp->get_return_value());
- aio_comp->release();
-}
-
-TEST_F(TestMockIoImageRequestWQ, AcquireLockBlacklisted) {
- REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
-
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
- MockExclusiveLock mock_exclusive_lock;
- mock_image_ctx.exclusive_lock = &mock_exclusive_lock;
-
- MockImageDispatchSpec mock_queued_image_request;
- expect_get_image_extents(mock_queued_image_request, {});
- expect_get_tid(mock_queued_image_request, 0);
-
- InSequence seq;
- MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
- expect_signal(mock_image_request_wq);
- mock_image_request_wq.set_require_lock(DIRECTION_WRITE, true);
-
- expect_is_write_op(mock_queued_image_request, true);
- expect_queue(mock_image_request_wq);
- auto *aio_comp = new librbd::io::AioCompletion();
- mock_image_request_wq.aio_write(aio_comp, 0, 0, {}, 0);
-
- librbd::exclusive_lock::MockPolicy mock_exclusive_lock_policy;
- expect_front(mock_image_request_wq, mock_queued_image_request);
- expect_is_refresh_request(mock_image_ctx, false);
- expect_is_write_op(mock_queued_image_request, true);
- expect_dequeue(mock_image_request_wq, mock_queued_image_request);
- expect_get_exclusive_lock_policy(mock_image_ctx, mock_exclusive_lock_policy);
- expect_may_auto_request_lock(mock_exclusive_lock_policy, false);
- EXPECT_CALL(*mock_image_ctx.exclusive_lock, get_unlocked_op_error())
- .WillOnce(Return(-EBLACKLISTED));
- expect_process_finish(mock_image_request_wq);
- expect_fail(mock_queued_image_request, -EBLACKLISTED);
- expect_is_write_op(mock_queued_image_request, true);
- expect_signal(mock_image_request_wq);
- ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr);
-
- ASSERT_EQ(0, aio_comp->wait_for_complete());
- ASSERT_EQ(-EBLACKLISTED, aio_comp->get_return_value());
- aio_comp->release();
-}
-
-TEST_F(TestMockIoImageRequestWQ, RefreshError) {
- librbd::ImageCtx *ictx;
- ASSERT_EQ(0, open_image(m_image_name, &ictx));
-
- MockTestImageCtx mock_image_ctx(*ictx);
-
- MockImageDispatchSpec mock_queued_image_request;
- expect_get_image_extents(mock_queued_image_request, {});
- expect_get_tid(mock_queued_image_request, 0);
-
- InSequence seq;
- MockImageRequestWQ mock_image_request_wq(&mock_image_ctx, "io", 60, nullptr);
-
- expect_is_write_op(mock_queued_image_request, true);
- expect_queue(mock_image_request_wq);
- auto *aio_comp = new librbd::io::AioCompletion();
- mock_image_request_wq.aio_write(aio_comp, 0, 0, {}, 0);
-
- expect_front(mock_image_request_wq, mock_queued_image_request);
- expect_is_refresh_request(mock_image_ctx, true);
- expect_is_write_op(mock_queued_image_request, true);
- expect_dequeue(mock_image_request_wq, mock_queued_image_request);
- Context *on_refresh = nullptr;
- expect_refresh(mock_image_ctx, &on_refresh);
- ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == nullptr);
- ASSERT_TRUE(on_refresh != nullptr);
-
- expect_process_finish(mock_image_request_wq);
- expect_fail(mock_queued_image_request, -EPERM);
- expect_is_write_op(mock_queued_image_request, true);
- expect_signal(mock_image_request_wq);
- on_refresh->complete(-EPERM);
-
- ASSERT_EQ(0, aio_comp->wait_for_complete());
- ASSERT_EQ(-EPERM, aio_comp->get_return_value());
- aio_comp->release();
-}
-
-} // namespace io
-} // namespace librbd
#include "test/librbd/mock/MockOperations.h"
#include "test/librbd/mock/MockReadahead.h"
#include "test/librbd/mock/io/MockImageDispatcher.h"
-#include "test/librbd/mock/io/MockImageRequestWQ.h"
#include "test/librbd/mock/io/MockObjectDispatcher.h"
#include "common/RWLock.h"
#include "common/WorkQueue.h"
format_string(image_ctx.format_string),
group_spec(image_ctx.group_spec),
layout(image_ctx.layout),
- io_work_queue(new io::MockImageRequestWQ()),
io_image_dispatcher(new io::MockImageDispatcher()),
io_object_dispatcher(new io::MockObjectDispatcher()),
op_work_queue(new MockContextWQ()),
delete operations;
delete image_watcher;
delete op_work_queue;
- delete io_work_queue;
delete io_image_dispatcher;
delete io_object_dispatcher;
}
std::map<uint64_t, io::CopyupRequest<MockImageCtx>*> copyup_list;
- io::MockImageRequestWQ *io_work_queue;
io::MockImageDispatcher *io_image_dispatcher;
io::MockObjectDispatcher *io_object_dispatcher;
MockContextWQ *op_work_queue;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_TEST_LIBRBD_MOCK_IO_IMAGE_REQUEST_WQ_H
-#define CEPH_TEST_LIBRBD_MOCK_IO_IMAGE_REQUEST_WQ_H
-
-#include "gmock/gmock.h"
-#include "librbd/io/Types.h"
-
-class Context;
-
-namespace librbd {
-namespace io {
-
-struct MockImageRequestWQ {
- MOCK_METHOD1(block_writes, void(Context *));
- MOCK_METHOD0(unblock_writes, void());
-
- MOCK_METHOD2(set_require_lock, void(Direction, bool));
-};
-
-} // namespace io
-} // namespace librbd
-
-#endif // CEPH_TEST_LIBRBD_MOCK_IO_IMAGE_REQUEST_WQ_H
#include "librbd/ImageWatcher.h"
#include "librbd/WatchNotifyTypes.h"
#include "librbd/io/AioCompletion.h"
-#include "librbd/io/ImageRequestWQ.h"
#include "test/librados/test.h"
#include "gtest/gtest.h"
#include <boost/assign/std/set.hpp>
#include "librbd/api/Namespace.h"
#include "librbd/io/AioCompletion.h"
#include "librbd/io/ImageRequest.h"
-#include "librbd/io/ImageRequestWQ.h"
#include "librbd/journal/Types.h"
#include "librbd/mirror/snapshot/GetImageStateRequest.h"
#include "librbd/mirror/snapshot/RemoveImageStateRequest.h"