From 1fc3be248097eb6087560c22374c1f924bfe0735 Mon Sep 17 00:00:00 2001 From: Jianpeng Ma Date: Mon, 1 Nov 2021 09:25:52 +0800 Subject: [PATCH] librbd/cache/pwl: fix reorder when flush cache-data to osd. Consider the following workload: writeA(0, 4096) writeB(0, 512). pwl can makre sure writeA persist to cache before writeB. But when flush to osd, it use async-read to read data from cache and in the callback function they issue write to osd. So although we by order issue aio-read(4096), aio-read(512). But we can't make sure the return order. If aio-read(512) firstly return, the write order to next layer is writeB(0, 512) writeA(0, 4096). This is wrong from the user point. To avoid this occur, we should firstly read all data from cache. And then send write by order. Fiexs: https://tracker.ceph.com/issues/52511 Tested-by: Feng Hualong Signed-off-by: Jianpeng Ma --- src/librbd/cache/pwl/AbstractWriteLog.cc | 11 ++- src/librbd/cache/pwl/AbstractWriteLog.h | 7 +- src/librbd/cache/pwl/rwl/WriteLog.cc | 32 ++++--- src/librbd/cache/pwl/rwl/WriteLog.h | 5 +- src/librbd/cache/pwl/ssd/WriteLog.cc | 114 +++++++++++++++-------- src/librbd/cache/pwl/ssd/WriteLog.h | 5 +- 6 files changed, 112 insertions(+), 62 deletions(-) diff --git a/src/librbd/cache/pwl/AbstractWriteLog.cc b/src/librbd/cache/pwl/AbstractWriteLog.cc index c01d2d411f9da..421d5dd8bc49e 100644 --- a/src/librbd/cache/pwl/AbstractWriteLog.cc +++ b/src/librbd/cache/pwl/AbstractWriteLog.cc @@ -1687,13 +1687,16 @@ void AbstractWriteLog::process_writeback_dirty_entries() { CephContext *cct = m_image_ctx.cct; bool all_clean = false; int flushed = 0; + bool has_write_entry = false; ldout(cct, 20) << "Look for dirty entries" << dendl; { DeferredContexts post_unlock; + GenericLogEntries entries_to_flush; + std::shared_lock entry_reader_locker(m_entry_reader_lock); + std::lock_guard locker(m_lock); while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) { - std::lock_guard locker(m_lock); if (m_shutting_down) { ldout(cct, 5) << "Flush during shutdown supressed" << dendl; /* Do flush complete only when all flush ops are finished */ @@ -1709,14 +1712,18 @@ void AbstractWriteLog::process_writeback_dirty_entries() { auto candidate = m_dirty_log_entries.front(); bool flushable = can_flush_entry(candidate); if (flushable) { - post_unlock.add(construct_flush_entry_ctx(candidate)); + entries_to_flush.push_back(candidate); flushed++; + if (!has_write_entry) + has_write_entry = candidate->is_write_entry(); m_dirty_log_entries.pop_front(); } else { ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl; break; } } + + construct_flush_entries(entries_to_flush, post_unlock, has_write_entry); } if (all_clean) { diff --git a/src/librbd/cache/pwl/AbstractWriteLog.h b/src/librbd/cache/pwl/AbstractWriteLog.h index c982d2631fe3f..128c00d99d82a 100644 --- a/src/librbd/cache/pwl/AbstractWriteLog.h +++ b/src/librbd/cache/pwl/AbstractWriteLog.h @@ -388,10 +388,9 @@ protected: virtual void persist_last_flushed_sync_gen() {} virtual void reserve_cache(C_BlockIORequestT *req, bool &alloc_succeeds, bool &no_space) {} - virtual Context *construct_flush_entry_ctx( - const std::shared_ptr log_entry) { - return nullptr; - } + virtual void construct_flush_entries(pwl::GenericLogEntries entries_to_flush, + DeferredContexts &post_unlock, + bool has_write_entry) = 0; virtual uint64_t get_max_extent() { return 0; } diff --git a/src/librbd/cache/pwl/rwl/WriteLog.cc b/src/librbd/cache/pwl/rwl/WriteLog.cc index bf4bd1557b031..f635015329a6c 100644 --- a/src/librbd/cache/pwl/rwl/WriteLog.cc +++ b/src/librbd/cache/pwl/rwl/WriteLog.cc @@ -576,23 +576,27 @@ bool WriteLog::retire_entries(const unsigned long int frees_per_tx) { } template -Context* WriteLog::construct_flush_entry_ctx( - std::shared_ptr log_entry) { +void WriteLog::construct_flush_entries(pwl::GenericLogEntries entries_to_flush, + DeferredContexts &post_unlock, + bool has_write_entry) { bool invalidating = this->m_invalidating; // snapshot so we behave consistently - Context *ctx = this->construct_flush_entry(log_entry, invalidating); - if (invalidating) { - return ctx; - } - return new LambdaContext( - [this, log_entry, ctx](int r) { - m_image_ctx.op_work_queue->queue(new LambdaContext( + for (auto &log_entry : entries_to_flush) { + Context *ctx = this->construct_flush_entry(log_entry, invalidating); + + if (!invalidating) { + ctx = new LambdaContext( [this, log_entry, ctx](int r) { - ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry - << " " << *log_entry << dendl; - log_entry->writeback(this->m_image_writeback, ctx); - }), 0); - }); + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + }), 0); + }); + } + post_unlock.add(ctx); + } } const unsigned long int ops_flushed_together = 4; diff --git a/src/librbd/cache/pwl/rwl/WriteLog.h b/src/librbd/cache/pwl/rwl/WriteLog.h index dabee07d742aa..5083a2568d497 100644 --- a/src/librbd/cache/pwl/rwl/WriteLog.h +++ b/src/librbd/cache/pwl/rwl/WriteLog.h @@ -102,8 +102,9 @@ protected: void setup_schedule_append( pwl::GenericLogOperationsVector &ops, bool do_early_flush, C_BlockIORequestT *req) override; - Context *construct_flush_entry_ctx( - const std::shared_ptr log_entry) override; + void construct_flush_entries(pwl::GenericLogEntries entries_to_flush, + DeferredContexts &post_unlock, + bool has_write_entry) override; bool initialize_pool(Context *on_finish, pwl::DeferredContexts &later) override; void write_data_to_buffer( std::shared_ptr ws_entry, diff --git a/src/librbd/cache/pwl/ssd/WriteLog.cc b/src/librbd/cache/pwl/ssd/WriteLog.cc index 550123924de0f..9d887227e96d6 100644 --- a/src/librbd/cache/pwl/ssd/WriteLog.cc +++ b/src/librbd/cache/pwl/ssd/WriteLog.cc @@ -521,49 +521,87 @@ void WriteLog::alloc_op_log_entries(GenericLogOperations &ops) { } template -Context* WriteLog::construct_flush_entry_ctx( - std::shared_ptr log_entry) { +void WriteLog::construct_flush_entries(pwl::GenericLogEntries entries_to_flush, + DeferredContexts &post_unlock, + bool has_write_entry) { // snapshot so we behave consistently bool invalidating = this->m_invalidating; - Context *ctx = this->construct_flush_entry(log_entry, invalidating); - - if (invalidating) { - return ctx; - } - if (log_entry->is_write_entry()) { - bufferlist *read_bl_ptr = new bufferlist; - ctx = new LambdaContext( - [this, log_entry, read_bl_ptr, ctx](int r) { - bufferlist captured_entry_bl; - captured_entry_bl.claim_append(*read_bl_ptr); - delete read_bl_ptr; - m_image_ctx.op_work_queue->queue(new LambdaContext( - [this, log_entry, entry_bl=move(captured_entry_bl), ctx](int r) { - auto captured_entry_bl = std::move(entry_bl); - ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry - << " " << *log_entry << dendl; - log_entry->writeback_bl(this->m_image_writeback, ctx, - std::move(captured_entry_bl)); - }), 0); - }); - ctx = new LambdaContext( - [this, log_entry, read_bl_ptr, ctx](int r) { - auto write_entry = static_pointer_cast(log_entry); - write_entry->inc_bl_refs(); - aio_read_data_block(std::move(write_entry), read_bl_ptr, ctx); - }); - return ctx; + if (invalidating || !has_write_entry) { + for (auto &log_entry : entries_to_flush) { + Context *ctx = this->construct_flush_entry(log_entry, invalidating); + + if (!invalidating) { + ctx = new LambdaContext( + [this, log_entry, ctx](int r) { + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + }), 0); + }); + } + post_unlock.add(ctx); + } } else { - return new LambdaContext( - [this, log_entry, ctx](int r) { - m_image_ctx.op_work_queue->queue(new LambdaContext( - [this, log_entry, ctx](int r) { - ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry - << " " << *log_entry << dendl; - log_entry->writeback(this->m_image_writeback, ctx); - }), 0); + int count = entries_to_flush.size(); + std::vector> log_entries; + std::vector read_bls; + std::vector contexts; + + log_entries.reserve(count); + read_bls.reserve(count); + contexts.reserve(count); + + for (auto &log_entry : entries_to_flush) { + // log_entry already removed from m_dirty_log_entries and + // in construct_flush_entry() it will inc(m_flush_ops_in_flight). + // We call this func here to make ops can track. + Context *ctx = this->construct_flush_entry(log_entry, invalidating); + if (log_entry->is_write_entry()) { + bufferlist *bl = new bufferlist; + auto write_entry = static_pointer_cast(log_entry); + write_entry->inc_bl_refs(); + log_entries.push_back(write_entry); + read_bls.push_back(bl); + } + contexts.push_back(ctx); + } + + Context *ctx = new LambdaContext( + [this, entries_to_flush, read_bls, contexts](int r) { + int i = 0, j = 0; + + for (auto &log_entry : entries_to_flush) { + Context *ctx = contexts[j++]; + + if (log_entry->is_write_entry()) { + bufferlist captured_entry_bl; + + captured_entry_bl.claim_append(*read_bls[i]); + delete read_bls[i++]; + + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) { + auto captured_entry_bl = std::move(entry_bl); + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback_bl(this->m_image_writeback, ctx, + std::move(captured_entry_bl)); + }), 0); + } else { + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + }), 0); + } + } }); + + aio_read_data_blocks(log_entries, read_bls, ctx); } } diff --git a/src/librbd/cache/pwl/ssd/WriteLog.h b/src/librbd/cache/pwl/ssd/WriteLog.h index 8df2057b6d4ee..69cc366628f24 100644 --- a/src/librbd/cache/pwl/ssd/WriteLog.h +++ b/src/librbd/cache/pwl/ssd/WriteLog.h @@ -122,8 +122,9 @@ private: bool has_sync_point_logs(GenericLogOperations &ops); void append_op_log_entries(GenericLogOperations &ops); void alloc_op_log_entries(GenericLogOperations &ops); - Context* construct_flush_entry_ctx( - std::shared_ptr log_entry); + void construct_flush_entries(pwl::GenericLogEntries entires_to_flush, + DeferredContexts &post_unlock, + bool has_write_entry) override; void append_ops(GenericLogOperations &ops, Context *ctx, uint64_t* new_first_free_entry); void write_log_entries(GenericLogEntriesVector log_entries, -- 2.39.5