--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/SimpleSchedulerObjectDispatch.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
+#include "common/errno.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/Utils.h"
+
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/rolling_count.hpp>
+#include <boost/accumulators/statistics/rolling_sum.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::SimpleSchedulerObjectDispatch: " \
+ << this << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+using namespace boost::accumulators;
+
+static const int LATENCY_STATS_WINDOW_SIZE = 10;
+
+class LatencyStats {
+private:
+ accumulator_set<uint64_t, stats<tag::rolling_count, tag::rolling_sum>> m_acc;
+
+public:
+ LatencyStats()
+ : m_acc(tag::rolling_window::window_size = LATENCY_STATS_WINDOW_SIZE) {
+ }
+
+ bool is_ready() const {
+ return rolling_count(m_acc) == LATENCY_STATS_WINDOW_SIZE;
+ }
+
+ void add(uint64_t latency) {
+ m_acc(latency);
+ }
+
+ uint64_t avg() const {
+ auto count = rolling_count(m_acc);
+
+ if (count > 0) {
+ return rolling_sum(m_acc);
+ }
+ return 0;
+ }
+};
+
+template <typename I>
+bool SimpleSchedulerObjectDispatch<I>::ObjectRequests::try_delay_request(
+ uint64_t object_off, ceph::bufferlist&& data, const ::SnapContext &snapc,
+ int op_flags, int object_dispatch_flags, Context* on_dispatched) {
+ if (!m_delayed_requests.empty()) {
+ if (snapc.seq != m_snapc.seq || op_flags != m_op_flags ||
+ data.length() == 0 ||
+ m_delayed_request_extents.intersects(object_off, data.length())) {
+ return false;
+ }
+ } else {
+ m_snapc = snapc;
+ m_op_flags = op_flags;
+ }
+
+ if (data.length() == 0) {
+ // a zero length write is usually a special case,
+ // and we don't want it to be merged with others
+ ceph_assert(m_delayed_requests.empty());
+ m_delayed_request_extents.insert(0, UINT64_MAX);
+ } else {
+ m_delayed_request_extents.insert(object_off, data.length());
+ }
+ m_object_dispatch_flags |= object_dispatch_flags;
+
+ if (!m_delayed_requests.empty()) {
+ // try to merge front to an existing request
+ auto iter = m_delayed_requests.find(object_off + data.length());
+ if (iter != m_delayed_requests.end()) {
+ auto new_iter = m_delayed_requests.insert({object_off, {}}).first;
+ new_iter->second.data = std::move(data);
+ new_iter->second.data.append(std::move(iter->second.data));
+ new_iter->second.requests = std::move(iter->second.requests);
+ new_iter->second.requests.push_back(on_dispatched);
+ m_delayed_requests.erase(iter);
+
+ if (new_iter != m_delayed_requests.begin()) {
+ auto prev = new_iter;
+ try_merge_delayed_requests(--prev, new_iter);
+ }
+ return true;
+ }
+
+ // try to merge back to an existing request
+ iter = m_delayed_requests.lower_bound(object_off);
+ if (iter == m_delayed_requests.end() || iter->first > object_off) {
+ iter--;
+ }
+ if (iter != m_delayed_requests.end() &&
+ iter->first + iter->second.data.length() == object_off) {
+ iter->second.data.append(std::move(data));
+ iter->second.requests.push_back(on_dispatched);
+
+ auto next = iter;
+ if (++next != m_delayed_requests.end()) {
+ try_merge_delayed_requests(iter, next);
+ }
+ return true;
+ }
+ }
+
+ // create a new request
+ auto iter = m_delayed_requests.insert({object_off, {}}).first;
+ iter->second.data = std::move(data);
+ iter->second.requests.push_back(on_dispatched);
+ return true;
+}
+
+template <typename I>
+void SimpleSchedulerObjectDispatch<I>::ObjectRequests::try_merge_delayed_requests(
+ typename std::map<uint64_t, MergedRequests>::iterator &iter1,
+ typename std::map<uint64_t, MergedRequests>::iterator &iter2) {
+ if (iter1->first + iter1->second.data.length() != iter2->first) {
+ return;
+ }
+
+ iter1->second.data.append(std::move(iter2->second.data));
+ iter1->second.requests.insert(iter1->second.requests.end(),
+ iter2->second.requests.begin(),
+ iter2->second.requests.end());
+ m_delayed_requests.erase(iter2);
+}
+
+template <typename I>
+void SimpleSchedulerObjectDispatch<I>::ObjectRequests::dispatch_delayed_requests(
+ I *image_ctx, LatencyStats *latency_stats, Mutex *latency_stats_lock) {
+ for (auto &it : m_delayed_requests) {
+ auto offset = it.first;
+ auto &merged_requests = it.second;
+
+ auto ctx = new FunctionContext(
+ [this, requests=std::move(merged_requests.requests), latency_stats,
+ latency_stats_lock, start_time=ceph_clock_now()](int r) {
+ if (latency_stats) {
+ Mutex::Locker locker(*latency_stats_lock);
+ auto latency = ceph_clock_now() - start_time;
+ latency_stats->add(latency.to_nsec());
+ }
+ for (auto on_dispatched : requests) {
+ on_dispatched->complete(r);
+ }
+ });
+
+ auto req = io::ObjectDispatchSpec::create_write(
+ image_ctx, io::OBJECT_DISPATCH_LAYER_SCHEDULER,
+ image_ctx->get_object_name(m_object_no), m_object_no, offset,
+ std::move(merged_requests.data), m_snapc, m_op_flags, 0, {}, ctx);
+
+ req->object_dispatch_flags = m_object_dispatch_flags;
+ req->send();
+ }
+
+ m_dispatch_time = utime_t();
+}
+
+template <typename I>
+SimpleSchedulerObjectDispatch<I>::SimpleSchedulerObjectDispatch(
+ I* image_ctx)
+ : m_image_ctx(image_ctx),
+ m_lock(librbd::util::unique_lock_name(
+ "librbd::io::SimpleSchedulerObjectDispatch::lock", this)),
+ m_max_delay(image_ctx->config.template get_val<uint64_t>(
+ "rbd_io_scheduler_simple_max_delay")) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 5) << "ictx=" << image_ctx << dendl;
+
+ I::get_timer_instance(cct, &m_timer, &m_timer_lock);
+
+ if (m_max_delay == 0) {
+ m_latency_stats = std::make_unique<LatencyStats>();
+ }
+}
+
+template <typename I>
+SimpleSchedulerObjectDispatch<I>::~SimpleSchedulerObjectDispatch() {
+}
+
+template <typename I>
+void SimpleSchedulerObjectDispatch<I>::init() {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << dendl;
+
+ // add ourself to the IO object dispatcher chain
+ m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
+}
+
+template <typename I>
+void SimpleSchedulerObjectDispatch<I>::shut_down(Context* on_finish) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 5) << dendl;
+
+ on_finish->complete(0);
+}
+
+template <typename I>
+bool SimpleSchedulerObjectDispatch<I>::read(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, librados::snap_t snap_id, int op_flags,
+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
+ io::ExtentMap* extent_map, int* object_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
+ << object_len << dendl;
+
+ Mutex::Locker locker(m_lock);
+ dispatch_delayed_requests(object_no);
+
+ return false;
+}
+
+template <typename I>
+bool SimpleSchedulerObjectDispatch<I>::discard(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
+ << object_len << dendl;
+
+ Mutex::Locker locker(m_lock);
+ dispatch_delayed_requests(object_no);
+ register_in_flight_request(object_no, {}, on_finish);
+
+ return false;
+}
+
+template <typename I>
+bool SimpleSchedulerObjectDispatch<I>::write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
+ << data.length() << dendl;
+
+ Mutex::Locker locker(m_lock);
+ if (try_delay_write(object_no, object_off, std::move(data), snapc, op_flags,
+ *object_dispatch_flags, on_dispatched)) {
+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+ return true;
+ }
+
+ dispatch_delayed_requests(object_no);
+ register_in_flight_request(object_no, ceph_clock_now(), on_finish);
+
+ return false;
+}
+
+template <typename I>
+bool SimpleSchedulerObjectDispatch<I>::write_same(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ uint64_t object_len, io::Extents&& buffer_extents, ceph::bufferlist&& data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+ Context** on_finish, Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
+ << object_len << dendl;
+
+ Mutex::Locker locker(m_lock);
+ dispatch_delayed_requests(object_no);
+ register_in_flight_request(object_no, {}, on_finish);
+
+ return false;
+}
+
+template <typename I>
+bool SimpleSchedulerObjectDispatch<I>::compare_and_write(
+ const std::string &oid, uint64_t object_no, uint64_t object_off,
+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
+ const ::SnapContext &snapc, int op_flags,
+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+ int* object_dispatch_flags, uint64_t* journal_tid,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
+ << cmp_data.length() << dendl;
+
+ Mutex::Locker locker(m_lock);
+ dispatch_delayed_requests(object_no);
+ register_in_flight_request(object_no, {}, on_finish);
+
+ return false;
+}
+
+template <typename I>
+bool SimpleSchedulerObjectDispatch<I>::flush(
+ io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ Mutex::Locker locker(m_lock);
+ dispatch_all_delayed_requests();
+
+ return false;
+}
+
+template <typename I>
+bool SimpleSchedulerObjectDispatch<I>::try_delay_write(
+ uint64_t object_no, uint64_t object_off, ceph::bufferlist&& data,
+ const ::SnapContext &snapc, int op_flags, int object_dispatch_flags,
+ Context* on_dispatched) {
+ ceph_assert(m_lock.is_locked());
+ auto cct = m_image_ctx->cct;
+
+ if (m_latency_stats && !m_latency_stats->is_ready()) {
+ ldout(cct, 20) << "latency stats not collected yet" << dendl;
+ return false;
+ }
+
+ auto it = m_requests.find(object_no);
+ if (it == m_requests.end()) {
+ ldout(cct, 20) << "no pending requests" << dendl;
+ return false;
+ }
+
+ auto &object_requests = it->second;
+ bool delayed = object_requests->try_delay_request(
+ object_off, std::move(data), snapc, op_flags, object_dispatch_flags,
+ on_dispatched);
+
+ ldout(cct, 20) << "delayed: " << delayed << dendl;
+
+ // schedule dispatch on the first request added
+ if (delayed && !object_requests->is_scheduled_dispatch()) {
+ auto dispatch_time = ceph_clock_now();
+ if (m_latency_stats) {
+ dispatch_time += utime_t(0, m_latency_stats->avg() / 2);
+ } else {
+ dispatch_time += utime_t(0, m_max_delay * 1000000);
+ }
+ object_requests->set_scheduled_dispatch(dispatch_time);
+ m_dispatch_queue.push_back(object_requests);
+ if (m_dispatch_queue.front() == object_requests) {
+ schedule_dispatch_delayed_requests();
+ }
+ }
+
+ return delayed;
+}
+
+template <typename I>
+void SimpleSchedulerObjectDispatch<I>::dispatch_all_delayed_requests() {
+ ceph_assert(m_lock.is_locked());
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+
+ while (!m_requests.empty()) {
+ auto it = m_requests.begin();
+ dispatch_delayed_requests(it->second);
+ m_requests.erase(it);
+ }
+}
+
+template <typename I>
+void SimpleSchedulerObjectDispatch<I>::register_in_flight_request(
+ uint64_t object_no, const utime_t &start_time, Context **on_finish) {
+ auto res = m_requests.insert(
+ {object_no, std::make_shared<ObjectRequests>(object_no)});
+ ceph_assert(res.second);
+ auto it = res.first;
+
+ auto dispatch_seq = ++m_dispatch_seq;
+ it->second->set_dispatch_seq(dispatch_seq);
+ *on_finish = new FunctionContext(
+ [this, object_no, dispatch_seq, start_time, ctx=*on_finish](int r) {
+ ctx->complete(r);
+
+ Mutex::Locker locker(m_lock);
+ if (m_latency_stats && start_time != utime_t()) {
+ auto latency = ceph_clock_now() - start_time;
+ m_latency_stats->add(latency.to_nsec());
+ }
+ auto it = m_requests.find(object_no);
+ if (it == m_requests.end() ||
+ it->second->get_dispatch_seq() != dispatch_seq) {
+ ldout(m_image_ctx->cct, 20) << "already dispatched" << dendl;
+ return;
+ }
+ dispatch_delayed_requests(it->second);
+ m_requests.erase(it);
+ });
+}
+
+template <typename I>
+void SimpleSchedulerObjectDispatch<I>::dispatch_delayed_requests(
+ uint64_t object_no) {
+ ceph_assert(m_lock.is_locked());
+ auto cct = m_image_ctx->cct;
+
+ auto it = m_requests.find(object_no);
+ if (it == m_requests.end()) {
+ ldout(cct, 20) << "object_no=" << object_no << ": not found" << dendl;
+ return;
+ }
+
+ dispatch_delayed_requests(it->second);
+ m_requests.erase(it);
+}
+
+template <typename I>
+void SimpleSchedulerObjectDispatch<I>::dispatch_delayed_requests(
+ ObjectRequestsRef object_requests) {
+ ceph_assert(m_lock.is_locked());
+ auto cct = m_image_ctx->cct;
+
+ ldout(cct, 20) << "object_no=" << object_requests->get_object_no() << ", "
+ << object_requests->delayed_requests_size() << " requests, "
+ << "dispatch_time=" << object_requests->get_dispatch_time()
+ << dendl;
+
+ if (!object_requests->is_scheduled_dispatch()) {
+ return;
+ }
+
+ object_requests->dispatch_delayed_requests(m_image_ctx, m_latency_stats.get(),
+ &m_lock);
+
+ ceph_assert(!m_dispatch_queue.empty());
+ if (m_dispatch_queue.front() == object_requests) {
+ m_dispatch_queue.pop_front();
+ schedule_dispatch_delayed_requests();
+ }
+}
+
+template <typename I>
+void SimpleSchedulerObjectDispatch<I>::schedule_dispatch_delayed_requests() {
+ ceph_assert(m_lock.is_locked());
+ auto cct = m_image_ctx->cct;
+
+ Mutex::Locker timer_locker(*m_timer_lock);
+
+ if (m_timer_task != nullptr) {
+ ldout(cct, 20) << "canceling task " << m_timer_task << dendl;
+
+ bool canceled = m_timer->cancel_event(m_timer_task);
+ ceph_assert(canceled);
+ m_timer_task = nullptr;
+ }
+
+ if (m_dispatch_queue.empty()) {
+ ldout(cct, 20) << "nothing to schedule" << dendl;
+ return;
+ }
+
+ auto object_requests = m_dispatch_queue.front().get();
+
+ while (!object_requests->is_scheduled_dispatch()) {
+ ldout(cct, 20) << "garbage collecting " << object_requests << dendl;
+ m_dispatch_queue.pop_front();
+
+ if (m_dispatch_queue.empty()) {
+ ldout(cct, 20) << "nothing to schedule" << dendl;
+ return;
+ }
+ object_requests = m_dispatch_queue.front().get();
+ }
+
+ auto ctx = new FunctionContext(
+ [this, object_no=object_requests->get_object_no()](int r) {
+ Mutex::Locker locker(m_lock);
+ dispatch_delayed_requests(object_no);
+ });
+
+ m_timer_task = new FunctionContext(
+ [this, ctx](int r) {
+ ceph_assert(m_timer_lock->is_locked());
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "running timer task " << m_timer_task << dendl;
+
+ m_timer_task = nullptr;
+ m_image_ctx->op_work_queue->queue(ctx, 0);
+ });
+
+ ldout(cct, 20) << "scheduling task " << m_timer_task << " at "
+ << object_requests->get_dispatch_time() << dendl;
+
+ m_timer->add_event_at(object_requests->get_dispatch_time(), m_timer_task);
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::SimpleSchedulerObjectDispatch<librbd::ImageCtx>;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librbd/test_mock_fixture.h"
+#include "test/librbd/test_support.h"
+#include "test/librbd/mock/MockImageCtx.h"
+#include "test/librbd/mock/MockSafeTimer.h"
+#include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
+#include "test/librados_test_stub/MockTestMemRadosClient.h"
+#include "include/rbd/librbd.hpp"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/SimpleSchedulerObjectDispatch.h"
+
+namespace librbd {
+namespace {
+
+struct MockTestImageCtx : public MockImageCtx {
+ MockTestImageCtx(ImageCtx &image_ctx) : MockImageCtx(image_ctx) {
+ }
+};
+
+} // anonymous namespace
+
+namespace io {
+
+template <>
+struct TypeTraits<MockTestImageCtx> {
+ typedef ::MockSafeTimer SafeTimer;
+};
+
+} // namespace io
+} // namespace librbd
+
+#include "librbd/io/SimpleSchedulerObjectDispatch.cc"
+
+namespace librbd {
+namespace io {
+
+using ::testing::_;
+using ::testing::InSequence;
+using ::testing::Invoke;
+using ::testing::Return;
+
+struct TestMockIoSimpleSchedulerObjectDispatch : public TestMockFixture {
+ typedef SimpleSchedulerObjectDispatch<librbd::MockTestImageCtx> MockSimpleSchedulerObjectDispatch;
+
+ MockSafeTimer m_mock_timer;
+ Mutex m_mock_timer_lock;
+
+ TestMockIoSimpleSchedulerObjectDispatch()
+ : m_mock_timer_lock("TestMockIoSimpleSchedulerObjectDispatch::Mutex") {
+ MockTestImageCtx::set_timer_instance(&m_mock_timer, &m_mock_timer_lock);
+ EXPECT_EQ(0, _rados.conf_set("rbd_io_scheduler_simple_max_delay", "1"));
+ }
+
+ void expect_get_object_name(MockTestImageCtx &mock_image_ctx,
+ uint64_t object_no) {
+ EXPECT_CALL(mock_image_ctx, get_object_name(object_no))
+ .WillRepeatedly(Return(
+ mock_image_ctx.image_ctx->get_object_name(object_no)));
+ }
+
+ void expect_dispatch_delayed_requests(MockTestImageCtx &mock_image_ctx,
+ int r) {
+ EXPECT_CALL(*mock_image_ctx.io_object_dispatcher, send(_))
+ .WillOnce(Invoke([&mock_image_ctx, r](ObjectDispatchSpec* spec) {
+ spec->dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+ mock_image_ctx.image_ctx->op_work_queue->queue(
+ &spec->dispatcher_ctx, r);
+ }));
+ }
+
+ void expect_cancel_timer_task(Context *timer_task) {
+ EXPECT_CALL(m_mock_timer, cancel_event(timer_task))
+ .WillOnce(Invoke([](Context *timer_task) {
+ delete timer_task;
+ return true;
+ }));
+ }
+
+ void expect_add_timer_task(Context **timer_task) {
+ EXPECT_CALL(m_mock_timer, add_event_at(_, _))
+ .WillOnce(Invoke([timer_task](utime_t, Context *task) {
+ *timer_task = task;
+ return task;
+ }));
+ }
+
+ void expect_schedule_dispatch_delayed_requests(Context *current_task,
+ Context **new_task) {
+ if (current_task != nullptr) {
+ expect_cancel_timer_task(current_task);
+ }
+ if (new_task != nullptr) {
+ expect_add_timer_task(new_task);
+ }
+ }
+
+ void run_timer_task(Context *timer_task) {
+ Mutex::Locker timer_locker(m_mock_timer_lock);
+ timer_task->complete(0);
+ }
+};
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, Read) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ C_SaferCond cond;
+ Context *on_finish = &cond;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.read(
+ ictx->get_object_name(0), 0, 0, 4096, CEPH_NOSNAP, 0, {}, nullptr,
+ nullptr, nullptr, nullptr, &on_finish, nullptr));
+ ASSERT_EQ(on_finish, &cond); // not modified
+ on_finish->complete(0);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, Discard) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ C_SaferCond cond;
+ Context *on_finish = &cond;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.discard(
+ ictx->get_object_name(0), 0, 0, 4096, mock_image_ctx.snapc, 0, {},
+ nullptr, nullptr, nullptr, &on_finish, nullptr));
+ ASSERT_NE(on_finish, &cond);
+ on_finish->complete(0);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, Write) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ ceph::bufferlist data;
+ data.append("X");
+ int object_dispatch_flags = 0;
+ C_SaferCond cond;
+ Context *on_finish = &cond;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, 0, std::move(data), mock_image_ctx.snapc, 0,
+ {}, &object_dispatch_flags, nullptr, nullptr, &on_finish, nullptr));
+ ASSERT_NE(on_finish, &cond);
+ on_finish->complete(0);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, WriteSame) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ io::Extents buffer_extents;
+ ceph::bufferlist data;
+ C_SaferCond cond;
+ Context *on_finish = &cond;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write_same(
+ ictx->get_object_name(0), 0, 0, 4096, std::move(buffer_extents),
+ std::move(data), mock_image_ctx.snapc, 0, {}, nullptr, nullptr, nullptr,
+ &on_finish, nullptr));
+ ASSERT_NE(on_finish, &cond);
+ on_finish->complete(0);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, CompareAndWrite) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ ceph::bufferlist cmp_data;
+ ceph::bufferlist write_data;
+ C_SaferCond cond;
+ Context *on_finish = &cond;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.compare_and_write(
+ ictx->get_object_name(0), 0, 0, std::move(cmp_data),
+ std::move(write_data), mock_image_ctx.snapc, 0, {}, nullptr, nullptr,
+ nullptr, nullptr, &on_finish, nullptr));
+ ASSERT_NE(on_finish, &cond);
+ on_finish->complete(0);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, Flush) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ C_SaferCond cond;
+ Context *on_finish = &cond;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.flush(
+ FLUSH_SOURCE_USER, {}, nullptr, &on_finish, nullptr));
+ ASSERT_EQ(on_finish, &cond); // not modified
+ on_finish->complete(0);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, WriteDelayed) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ expect_get_object_name(mock_image_ctx, 0);
+
+ InSequence seq;
+
+ ceph::bufferlist data;
+ data.append("X");
+ int object_dispatch_flags = 0;
+ C_SaferCond cond1;
+ Context *on_finish1 = &cond1;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, 0, std::move(data), mock_image_ctx.snapc, 0,
+ {}, &object_dispatch_flags, nullptr, nullptr, &on_finish1, nullptr));
+ ASSERT_NE(on_finish1, &cond1);
+
+ Context *timer_task = nullptr;
+ expect_schedule_dispatch_delayed_requests(nullptr, &timer_task);
+
+ io::DispatchResult dispatch_result;
+ C_SaferCond cond2;
+ Context *on_finish2 = &cond2;
+ C_SaferCond on_dispatched;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, 0, std::move(data), mock_image_ctx.snapc, 0,
+ {}, &object_dispatch_flags, nullptr, &dispatch_result, &on_finish2,
+ &on_dispatched));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(timer_task, nullptr);
+
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+ expect_schedule_dispatch_delayed_requests(timer_task, nullptr);
+
+ on_finish1->complete(0);
+ ASSERT_EQ(0, cond1.wait());
+ ASSERT_EQ(0, on_dispatched.wait());
+ on_finish2->complete(0);
+ ASSERT_EQ(0, cond2.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, WriteDelayedFlush) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ expect_get_object_name(mock_image_ctx, 0);
+
+ InSequence seq;
+
+ ceph::bufferlist data;
+ data.append("X");
+ int object_dispatch_flags = 0;
+ C_SaferCond cond1;
+ Context *on_finish1 = &cond1;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, 0, std::move(data), mock_image_ctx.snapc, 0,
+ {}, &object_dispatch_flags, nullptr, nullptr, &on_finish1, nullptr));
+ ASSERT_NE(on_finish1, &cond1);
+
+ Context *timer_task = nullptr;
+ expect_schedule_dispatch_delayed_requests(nullptr, &timer_task);
+
+ io::DispatchResult dispatch_result;
+ C_SaferCond cond2;
+ Context *on_finish2 = &cond2;
+ C_SaferCond on_dispatched;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, 0, std::move(data), mock_image_ctx.snapc, 0,
+ {}, &object_dispatch_flags, nullptr, &dispatch_result, &on_finish2,
+ &on_dispatched));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(timer_task, nullptr);
+
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+ expect_schedule_dispatch_delayed_requests(timer_task, nullptr);
+
+ C_SaferCond cond3;
+ Context *on_finish3 = &cond3;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.flush(
+ FLUSH_SOURCE_USER, {}, nullptr, &on_finish3, nullptr));
+ ASSERT_EQ(on_finish3, &cond3);
+
+ on_finish1->complete(0);
+ ASSERT_EQ(0, cond1.wait());
+ ASSERT_EQ(0, on_dispatched.wait());
+ on_finish2->complete(0);
+ ASSERT_EQ(0, cond2.wait());
+ on_finish3->complete(0);
+ ASSERT_EQ(0, cond3.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, WriteMerged) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ expect_get_object_name(mock_image_ctx, 0);
+
+ InSequence seq;
+
+ ceph::bufferlist data;
+ data.append("X");
+ int object_dispatch_flags = 0;
+ C_SaferCond cond1;
+ Context *on_finish1 = &cond1;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, 0, std::move(data), mock_image_ctx.snapc, 0,
+ {}, &object_dispatch_flags, nullptr, nullptr, &on_finish1, nullptr));
+ ASSERT_NE(on_finish1, &cond1);
+
+ Context *timer_task = nullptr;
+ expect_schedule_dispatch_delayed_requests(nullptr, &timer_task);
+
+ uint64_t object_off = 20;
+ data.clear();
+ data.append(std::string(10, 'A'));
+ io::DispatchResult dispatch_result;
+ C_SaferCond cond2;
+ Context *on_finish2 = &cond2;
+ C_SaferCond on_dispatched2;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish2, &on_dispatched2));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(timer_task, nullptr);
+
+ object_off = 0;
+ data.clear();
+ data.append(std::string(10, 'B'));
+ C_SaferCond cond3;
+ Context *on_finish3 = &cond3;
+ C_SaferCond on_dispatched3;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish3, &on_dispatched3));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish3, &cond3);
+
+ object_off = 10;
+ data.clear();
+ data.append(std::string(10, 'C'));
+ C_SaferCond cond4;
+ Context *on_finish4 = &cond4;
+ C_SaferCond on_dispatched4;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish4, &on_dispatched4));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish4, &cond4);
+
+ object_off = 30;
+ data.clear();
+ data.append(std::string(10, 'D'));
+ C_SaferCond cond5;
+ Context *on_finish5 = &cond5;
+ C_SaferCond on_dispatched5;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish5, &on_dispatched5));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish5, &cond5);
+
+ object_off = 50;
+ data.clear();
+ data.append(std::string(10, 'E'));
+ C_SaferCond cond6;
+ Context *on_finish6 = &cond6;
+ C_SaferCond on_dispatched6;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish6, &on_dispatched6));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish6, &cond6);
+
+ // expect two requests dispatched:
+ // 0~40 (merged 0~10, 10~10, 20~10, 30~10) and 50~10
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+ expect_schedule_dispatch_delayed_requests(timer_task, nullptr);
+
+ on_finish1->complete(0);
+ ASSERT_EQ(0, cond1.wait());
+ ASSERT_EQ(0, on_dispatched2.wait());
+ ASSERT_EQ(0, on_dispatched3.wait());
+ ASSERT_EQ(0, on_dispatched4.wait());
+ ASSERT_EQ(0, on_dispatched5.wait());
+ ASSERT_EQ(0, on_dispatched6.wait());
+ on_finish2->complete(0);
+ on_finish3->complete(0);
+ on_finish4->complete(0);
+ on_finish5->complete(0);
+ on_finish6->complete(0);
+ ASSERT_EQ(0, cond2.wait());
+ ASSERT_EQ(0, cond3.wait());
+ ASSERT_EQ(0, cond4.wait());
+ ASSERT_EQ(0, cond5.wait());
+ ASSERT_EQ(0, cond6.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, WriteNonSequential) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ expect_get_object_name(mock_image_ctx, 0);
+
+ InSequence seq;
+
+ ceph::bufferlist data;
+ int object_dispatch_flags = 0;
+ C_SaferCond cond1;
+ Context *on_finish1 = &cond1;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, 0, std::move(data), mock_image_ctx.snapc, 0,
+ {}, &object_dispatch_flags, nullptr, nullptr, &on_finish1, nullptr));
+ ASSERT_NE(on_finish1, &cond1);
+
+ Context *timer_task = nullptr;
+ expect_schedule_dispatch_delayed_requests(nullptr, &timer_task);
+
+ uint64_t object_off = 0;
+ data.clear();
+ data.append(std::string(10, 'X'));
+ io::DispatchResult dispatch_result;
+ C_SaferCond cond2;
+ Context *on_finish2 = &cond2;
+ C_SaferCond on_dispatched2;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish2, &on_dispatched2));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(timer_task, nullptr);
+
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+ expect_schedule_dispatch_delayed_requests(timer_task, nullptr);
+
+ object_off = 5;
+ data.clear();
+ data.append(std::string(10, 'Y'));
+ C_SaferCond cond3;
+ Context *on_finish3 = &cond3;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish3, nullptr));
+ ASSERT_NE(on_finish3, &cond3);
+
+ on_finish1->complete(0);
+ ASSERT_EQ(0, cond1.wait());
+ ASSERT_EQ(0, on_dispatched2.wait());
+ on_finish2->complete(0);
+ ASSERT_EQ(0, cond2.wait());
+ on_finish3->complete(0);
+ ASSERT_EQ(0, cond3.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, Mixed) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ expect_get_object_name(mock_image_ctx, 0);
+
+ InSequence seq;
+
+ // write (1) 0~0 (in-flight)
+ // will wrap on_finish with dispatch_seq=1 to dispatch future delayed writes
+ ceph::bufferlist data;
+ int object_dispatch_flags = 0;
+ C_SaferCond cond1;
+ Context *on_finish1 = &cond1;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, 0, std::move(data), mock_image_ctx.snapc, 0,
+ {}, &object_dispatch_flags, nullptr, nullptr, &on_finish1, nullptr));
+ ASSERT_NE(on_finish1, &cond1);
+
+ // write (2) 0~10 (delayed)
+ // will wait for write (1) to finish or a non-seq io comes
+ Context *timer_task = nullptr;
+ expect_schedule_dispatch_delayed_requests(nullptr, &timer_task);
+ uint64_t object_off = 0;
+ data.clear();
+ data.append(std::string(10, 'A'));
+ io::DispatchResult dispatch_result;
+ C_SaferCond cond2;
+ Context *on_finish2 = &cond2;
+ C_SaferCond on_dispatched2;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish2, &on_dispatched2));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(timer_task, nullptr);
+
+ // write (3) 10~10 (delayed)
+ // will be merged with write (2)
+ object_off = 10;
+ data.clear();
+ data.append(std::string(10, 'B'));
+ C_SaferCond cond3;
+ Context *on_finish3 = &cond3;
+ C_SaferCond on_dispatched3;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish3, &on_dispatched3));
+ ASSERT_EQ(on_finish3, &cond3);
+
+ // discard (1) (non-seq io)
+ // will dispatch the delayed writes (2) and (3) and wrap on_finish
+ // with dispatch_seq=2 to dispatch future delayed writes
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+ expect_schedule_dispatch_delayed_requests(timer_task, nullptr);
+ C_SaferCond cond4;
+ Context *on_finish4 = &cond4;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.discard(
+ ictx->get_object_name(0), 0, 4096, 4096, mock_image_ctx.snapc, 0, {},
+ nullptr, nullptr, nullptr, &on_finish4, nullptr));
+ ASSERT_NE(on_finish4, &cond4);
+ ASSERT_EQ(0, on_dispatched2.wait());
+ ASSERT_EQ(0, on_dispatched3.wait());
+
+ // write (4) 20~10 (delayed)
+ // will wait for discard (1) to finish or a non-seq io comes
+ timer_task = nullptr;
+ expect_schedule_dispatch_delayed_requests(nullptr, &timer_task);
+ object_off = 20;
+ data.clear();
+ data.append(std::string(10, 'C'));
+ C_SaferCond cond5;
+ Context *on_finish5 = &cond5;
+ C_SaferCond on_dispatched5;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish5, &on_dispatched5));
+ ASSERT_EQ(on_finish5, &cond5);
+ ASSERT_NE(timer_task, nullptr);
+
+ // discard (2) (non-seq io)
+ // will dispatch the delayed write (4) and wrap on_finish with dispatch_seq=3
+ // to dispatch future delayed writes
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+ expect_schedule_dispatch_delayed_requests(timer_task, nullptr);
+ C_SaferCond cond6;
+ Context *on_finish6 = &cond6;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.discard(
+ ictx->get_object_name(0), 0, 4096, 4096, mock_image_ctx.snapc, 0, {},
+ nullptr, nullptr, nullptr, &on_finish6, nullptr));
+ ASSERT_NE(on_finish6, &cond6);
+ ASSERT_EQ(0, on_dispatched5.wait());
+
+ // write (5) 30~10 (delayed)
+ // will wait for discard (2) to finish or a non-seq io comes
+ timer_task = nullptr;
+ expect_schedule_dispatch_delayed_requests(nullptr, &timer_task);
+ object_off = 30;
+ data.clear();
+ data.append(std::string(10, 'D'));
+ C_SaferCond cond7;
+ Context *on_finish7 = &cond7;
+ C_SaferCond on_dispatched7;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, object_off, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish7, &on_dispatched7));
+ ASSERT_EQ(on_finish7, &cond7);
+ ASSERT_NE(timer_task, nullptr);
+
+ // write (1) finishes
+ // on_finish wrapper will skip dispatch delayed write (5)
+ // due to dispatch_seq(1) < m_dispatch_seq(3)
+ on_finish1->complete(0);
+ ASSERT_EQ(0, cond1.wait());
+
+ // writes (2) and (3) finish ("dispatch delayed" is not called)
+ on_finish2->complete(0);
+ on_finish3->complete(0);
+ ASSERT_EQ(0, cond2.wait());
+ ASSERT_EQ(0, cond3.wait());
+
+ // discard (1) finishes
+ // on_finish wrapper will skip dispatch delayed write (5)
+ // due to dispatch_seq(2) < m_dispatch_seq(3)
+ on_finish4->complete(0);
+ ASSERT_EQ(0, cond4.wait());
+
+ // writes (4) finishes ("dispatch delayed" is not called)
+ on_finish5->complete(0);
+ ASSERT_EQ(0, cond5.wait());
+
+ // discard (2) finishes
+ // on_finish wrapper will dispatch the delayed write (5)
+ // due to dispatch_seq(3) == m_dispatch_seq(3)
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+ expect_schedule_dispatch_delayed_requests(timer_task, nullptr);
+ on_finish6->complete(0);
+ ASSERT_EQ(0, cond6.wait());
+ ASSERT_EQ(0, on_dispatched7.wait());
+
+ // write (5) finishes ("dispatch delayed" is not called)
+ on_finish7->complete(0);
+ ASSERT_EQ(0, cond7.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, DispatchQueue) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ expect_get_object_name(mock_image_ctx, 0);
+ expect_get_object_name(mock_image_ctx, 1);
+
+ InSequence seq;
+
+ // send 2 writes to object 0
+
+ uint64_t object_no = 0;
+ ceph::bufferlist data;
+ int object_dispatch_flags = 0;
+ C_SaferCond cond1;
+ Context *on_finish1 = &cond1;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(object_no), object_no, 0, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr, nullptr,
+ &on_finish1, nullptr));
+ ASSERT_NE(on_finish1, &cond1);
+
+ Context *timer_task = nullptr;
+ expect_schedule_dispatch_delayed_requests(nullptr, &timer_task);
+
+ data.clear();
+ io::DispatchResult dispatch_result;
+ C_SaferCond cond2;
+ Context *on_finish2 = &cond2;
+ C_SaferCond on_dispatched2;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(object_no), object_no, 0, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish2, &on_dispatched2));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(timer_task, nullptr);
+
+ // send 2 writes to object 1
+
+ object_no = 1;
+ data.clear();
+ C_SaferCond cond3;
+ Context *on_finish3 = &cond3;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(object_no), object_no, 0, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr, nullptr,
+ &on_finish3, nullptr));
+ ASSERT_NE(on_finish3, &cond3);
+
+ data.clear();
+ C_SaferCond cond4;
+ Context *on_finish4 = &cond4;
+ C_SaferCond on_dispatched4;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(object_no), object_no, 0, std::move(data),
+ mock_image_ctx.snapc, 0, {}, &object_dispatch_flags, nullptr,
+ &dispatch_result, &on_finish4, &on_dispatched4));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish4, &cond4);
+
+ // finish write (1) to object 0
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+ expect_schedule_dispatch_delayed_requests(timer_task, &timer_task);
+ on_finish1->complete(0);
+ ASSERT_EQ(0, cond1.wait());
+ ASSERT_EQ(0, on_dispatched2.wait());
+
+ // finish write (2) to object 0
+ on_finish2->complete(0);
+ ASSERT_EQ(0, cond2.wait());
+
+ // finish write (1) to object 1
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+ expect_schedule_dispatch_delayed_requests(timer_task, nullptr);
+ on_finish3->complete(0);
+ ASSERT_EQ(0, cond3.wait());
+ ASSERT_EQ(0, on_dispatched4.wait());
+
+ // finish write (2) to object 1
+ on_finish4->complete(0);
+ ASSERT_EQ(0, cond4.wait());
+}
+
+TEST_F(TestMockIoSimpleSchedulerObjectDispatch, Timer) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockTestImageCtx mock_image_ctx(*ictx);
+ MockSimpleSchedulerObjectDispatch
+ mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+
+ expect_get_object_name(mock_image_ctx, 0);
+ expect_op_work_queue(mock_image_ctx);
+
+ InSequence seq;
+
+ ceph::bufferlist data;
+ int object_dispatch_flags = 0;
+ C_SaferCond cond1;
+ Context *on_finish1 = &cond1;
+ ASSERT_FALSE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, 0, std::move(data), mock_image_ctx.snapc, 0,
+ {}, &object_dispatch_flags, nullptr, nullptr, &on_finish1, nullptr));
+ ASSERT_NE(on_finish1, &cond1);
+
+ Context *timer_task = nullptr;
+ expect_schedule_dispatch_delayed_requests(nullptr, &timer_task);
+
+ data.clear();
+ io::DispatchResult dispatch_result;
+ C_SaferCond cond2;
+ Context *on_finish2 = &cond2;
+ C_SaferCond on_dispatched;
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.write(
+ ictx->get_object_name(0), 0, 0, std::move(data), mock_image_ctx.snapc, 0,
+ {}, &object_dispatch_flags, nullptr, &dispatch_result, &on_finish2,
+ &on_dispatched));
+ ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
+ ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(timer_task, nullptr);
+
+ expect_dispatch_delayed_requests(mock_image_ctx, 0);
+
+ run_timer_task(timer_task);
+ ASSERT_EQ(0, on_dispatched.wait());
+
+ on_finish1->complete(0);
+ ASSERT_EQ(0, cond1.wait());
+ on_finish2->complete(0);
+ ASSERT_EQ(0, cond2.wait());
+}
+
+} // namespace io
+} // namespace librbd