#include "librbd/AsioEngine.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
+#include "librbd/io/FlushTracker.h"
#include "librbd/io/ObjectDispatchSpec.h"
#include "librbd/io/ObjectDispatcher.h"
#include "librbd/io/Utils.h"
SimpleSchedulerObjectDispatch<I>::SimpleSchedulerObjectDispatch(
I* image_ctx)
: m_image_ctx(image_ctx),
+ m_flush_tracker(new FlushTracker<I>(image_ctx)),
m_lock(ceph::make_mutex(librbd::util::unique_lock_name(
"librbd::io::SimpleSchedulerObjectDispatch::lock", this))),
m_max_delay(image_ctx->config.template get_val<uint64_t>(
template <typename I>
SimpleSchedulerObjectDispatch<I>::~SimpleSchedulerObjectDispatch() {
+ delete m_flush_tracker;
}
template <typename I>
auto cct = m_image_ctx->cct;
ldout(cct, 5) << dendl;
+ m_flush_tracker->shut_down();
on_finish->complete(0);
}
std::lock_guard locker{m_lock};
if (try_delay_write(object_no, object_off, std::move(data), snapc, op_flags,
*object_dispatch_flags, on_dispatched)) {
+
+ auto dispatch_seq = ++m_dispatch_seq;
+ m_flush_tracker->start_io(dispatch_seq);
+ *on_finish = new LambdaContext(
+ [this, dispatch_seq, ctx=*on_finish](int r) {
+ ctx->complete(r);
+ m_flush_tracker->finish_io(dispatch_seq);
+ });
+
*dispatch_result = DISPATCH_RESULT_COMPLETE;
return true;
}
auto cct = m_image_ctx->cct;
ldout(cct, 20) << dendl;
- std::lock_guard locker{m_lock};
- dispatch_all_delayed_requests();
+ {
+ std::lock_guard locker{m_lock};
+ dispatch_all_delayed_requests();
+ }
- return false;
+ *dispatch_result = DISPATCH_RESULT_CONTINUE;
+ m_flush_tracker->flush(on_dispatched);
+
+ return true;
}
template <typename I>
auto it = res.first;
auto dispatch_seq = ++m_dispatch_seq;
+ m_flush_tracker->start_io(dispatch_seq);
+
it->second->set_dispatch_seq(dispatch_seq);
*on_finish = new LambdaContext(
[this, object_no, dispatch_seq, start_time, ctx=*on_finish](int r) {
ctx->complete(r);
- std::lock_guard locker{m_lock};
+ std::unique_lock locker{m_lock};
if (m_latency_stats && start_time != utime_t()) {
auto latency = ceph_clock_now() - start_time;
m_latency_stats->add(latency.to_nsec());
}
+
auto it = m_requests.find(object_no);
if (it == m_requests.end() ||
it->second->get_dispatch_seq() != dispatch_seq) {
ldout(m_image_ctx->cct, 20) << "already dispatched" << dendl;
- return;
+ } else {
+ dispatch_delayed_requests(it->second);
+ m_requests.erase(it);
}
- dispatch_delayed_requests(it->second);
- m_requests.erase(it);
+ locker.unlock();
+
+ m_flush_tracker->finish_io(dispatch_seq);
});
}
typedef ::MockSafeTimer SafeTimer;
};
+template <>
+struct FlushTracker<MockTestImageCtx> {
+ FlushTracker(MockTestImageCtx*) {
+ }
+
+ void shut_down() {
+ }
+
+ void flush(Context*) {
+ }
+
+ void start_io(uint64_t) {
+ }
+
+ void finish_io(uint64_t) {
+ }
+
+};
+
} // namespace io
} // namespace librbd
MockSimpleSchedulerObjectDispatch
mock_simple_scheduler_object_dispatch(&mock_image_ctx);
+ io::DispatchResult dispatch_result;
C_SaferCond cond;
Context *on_finish = &cond;
- ASSERT_FALSE(mock_simple_scheduler_object_dispatch.flush(
- FLUSH_SOURCE_USER, {}, nullptr, nullptr, &on_finish, nullptr));
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.flush(
+ FLUSH_SOURCE_USER, {}, nullptr, &dispatch_result, &on_finish, nullptr));
+ ASSERT_EQ(io::DISPATCH_RESULT_CONTINUE, dispatch_result);
ASSERT_EQ(on_finish, &cond); // not modified
on_finish->complete(0);
ASSERT_EQ(0, cond.wait());
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish2,
&on_dispatched));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(on_finish2, &cond2);
ASSERT_NE(timer_task, nullptr);
expect_dispatch_delayed_requests(mock_image_ctx, 0);
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish2,
&on_dispatched));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(on_finish2, &cond2);
ASSERT_NE(timer_task, nullptr);
expect_dispatch_delayed_requests(mock_image_ctx, 0);
C_SaferCond cond3;
Context *on_finish3 = &cond3;
- ASSERT_FALSE(mock_simple_scheduler_object_dispatch.flush(
- FLUSH_SOURCE_USER, {}, nullptr, nullptr, &on_finish3, nullptr));
+ ASSERT_TRUE(mock_simple_scheduler_object_dispatch.flush(
+ FLUSH_SOURCE_USER, {}, nullptr, &dispatch_result, &on_finish3, nullptr));
+ ASSERT_EQ(io::DISPATCH_RESULT_CONTINUE, dispatch_result);
ASSERT_EQ(on_finish3, &cond3);
on_finish1->complete(0);
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish2,
&on_dispatched2));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(on_finish2, &cond2);
ASSERT_NE(timer_task, nullptr);
object_off = 0;
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish3,
&on_dispatched3));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish3, &cond3);
+ ASSERT_NE(on_finish3, &cond3);
object_off = 10;
data.clear();
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish4,
&on_dispatched4));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish4, &cond4);
+ ASSERT_NE(on_finish4, &cond4);
object_off = 30;
data.clear();
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish5,
&on_dispatched5));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish5, &cond5);
+ ASSERT_NE(on_finish5, &cond5);
object_off = 50;
data.clear();
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish6,
&on_dispatched6));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish6, &cond6);
+ ASSERT_NE(on_finish6, &cond6);
// expect two requests dispatched:
// 0~40 (merged 0~10, 10~10, 20~10, 30~10) and 50~10
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish2,
&on_dispatched2));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(on_finish2, &cond2);
ASSERT_NE(timer_task, nullptr);
expect_dispatch_delayed_requests(mock_image_ctx, 0);
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish2,
&on_dispatched2));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(on_finish2, &cond2);
ASSERT_NE(timer_task, nullptr);
// write (3) 10~10 (delayed)
0, object_off, std::move(data), mock_image_ctx.snapc, 0, {},
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish3,
&on_dispatched3));
- ASSERT_EQ(on_finish3, &cond3);
+ ASSERT_NE(on_finish3, &cond3);
// discard (1) (non-seq io)
// will dispatch the delayed writes (2) and (3) and wrap on_finish
0, object_off, std::move(data), mock_image_ctx.snapc, 0, {},
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish5,
&on_dispatched5));
- ASSERT_EQ(on_finish5, &cond5);
+ ASSERT_NE(on_finish5, &cond5);
ASSERT_NE(timer_task, nullptr);
// discard (2) (non-seq io)
0, object_off, std::move(data), mock_image_ctx.snapc, 0, {},
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish7,
&on_dispatched7));
- ASSERT_EQ(on_finish7, &cond7);
+ ASSERT_NE(on_finish7, &cond7);
ASSERT_NE(timer_task, nullptr);
// write (1) finishes
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish2,
&on_dispatched2));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(on_finish2, &cond2);
ASSERT_NE(timer_task, nullptr);
// send 2 writes to object 1
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish4,
&on_dispatched4));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish4, &cond4);
+ ASSERT_NE(on_finish4, &cond4);
// finish write (1) to object 0
expect_dispatch_delayed_requests(mock_image_ctx, 0);
&object_dispatch_flags, nullptr, &dispatch_result, &on_finish2,
&on_dispatched));
ASSERT_EQ(dispatch_result, io::DISPATCH_RESULT_COMPLETE);
- ASSERT_EQ(on_finish2, &cond2);
+ ASSERT_NE(on_finish2, &cond2);
ASSERT_NE(timer_task, nullptr);
expect_dispatch_delayed_requests(mock_image_ctx, 0);