From: Jianpeng Ma Date: Mon, 29 Nov 2021 07:16:21 +0000 (+0800) Subject: librbd/cache/pwl: Using BlockGuard control overlap ops order when flush to osd. X-Git-Tag: v17.1.0~310^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8e8f3ef516e98da011f3086f8e78a2fa261293ed;p=ceph.git librbd/cache/pwl: Using BlockGuard control overlap ops order when flush to osd. In process of tests, we met some inconsistent-data problem. Test case mainly use write,then discard to detect data consistent. W/o pwl, write/discard are synchronous ops. After write, data already located into osd. But w/ pwl, we use asynchronous api to send ops to osd. Although we mare sure send order. But send-order don't makre sure complete order. This mean pwl keep order of write/discard. But it don't keep the same semantics which use synchronous api. W/ pwl, it make synchronous to asynchronous. For normal ops, it's not problem. But if connected-commands w/ overlap, it make data inconsistent. So we use BlockGuard to solve this issue. Fixes: https://tracker.ceph.com/issues/49876 Fixes: https://tracker.ceph.com/issues/53108 Signed-off-by: Jianpeng Ma --- diff --git a/src/librbd/cache/pwl/AbstractWriteLog.cc b/src/librbd/cache/pwl/AbstractWriteLog.cc index 7f61ce47ab2f..981571bbec9c 100644 --- a/src/librbd/cache/pwl/AbstractWriteLog.cc +++ b/src/librbd/cache/pwl/AbstractWriteLog.cc @@ -44,6 +44,9 @@ AbstractWriteLog::AbstractWriteLog( plugin::Api& plugin_api) : m_builder(builder), m_write_log_guard(image_ctx.cct), + m_flush_guard(image_ctx.cct), + m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name( + "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))), m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name( "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))), m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name( @@ -1627,21 +1630,34 @@ bool AbstractWriteLog::can_flush_entry(std::shared_ptr log_e } template -Context* AbstractWriteLog::construct_flush_entry(std::shared_ptr log_entry, - bool invalidating) { - CephContext *cct = m_image_ctx.cct; +void AbstractWriteLog::detain_flush_guard_request(std::shared_ptr log_entry, + GuardedRequestFunctionContext *guarded_ctx) { + ldout(m_image_ctx.cct, 20) << dendl; - ldout(cct, 20) << "" << dendl; - ceph_assert(m_entry_reader_lock.is_locked()); - ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); - if (!m_flush_ops_in_flight || - (log_entry->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) { - m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number; + BlockExtent extent; + if (log_entry->is_sync_point()) { + extent = block_extent(whole_volume_extent()); + } else { + extent = log_entry->ram_entry.block_extent(); } - m_flush_ops_in_flight += 1; - m_flush_ops_will_send += 1; - /* For write same this is the bytes affected by the flush op, not the bytes transferred */ - m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes; + + auto req = GuardedRequest(extent, guarded_ctx, false); + BlockGuardCell *cell = nullptr; + + { + std::lock_guard locker(m_flush_guard_lock); + m_flush_guard.detain(req.block_extent, &req, &cell); + } + if (cell) { + req.guard_ctx->cell = cell; + m_image_ctx.op_work_queue->queue(req.guard_ctx, 0); + } +} + +template +Context* AbstractWriteLog::construct_flush_entry(std::shared_ptr log_entry, + bool invalidating) { + ldout(m_image_ctx.cct, 20) << "" << dendl; /* Flush write completion action */ utime_t writeback_start_time = ceph_clock_now(); @@ -1672,7 +1688,24 @@ Context* AbstractWriteLog::construct_flush_entry(std::shared_ptrm_cell, &block_reqs); + + for (auto &req : block_reqs) { + m_flush_guard.detain(req.block_extent, &req, &detained_cell); + if (detained_cell) { + req.guard_ctx->cell = detained_cell; + m_image_ctx.op_work_queue->queue(req.guard_ctx, 0); + } + } + } + if (r < 0) { lderr(m_image_ctx.cct) << "failed to flush log entry" << cpp_strerror(r) << dendl; @@ -1724,6 +1757,18 @@ void AbstractWriteLog::process_writeback_dirty_entries() { if (!has_write_entry) has_write_entry = candidate->is_write_entry(); m_dirty_log_entries.pop_front(); + + // To track candidate, we should add m_flush_ops_in_flight in here + { + if (!m_flush_ops_in_flight || + (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) { + m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number; + } + m_flush_ops_in_flight += 1; + m_flush_ops_will_send += 1; + /* For write same this is the bytes affected by the flush op, not the bytes transferred */ + m_flush_bytes_in_flight += candidate->ram_entry.write_bytes; + } } else { ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl; break; diff --git a/src/librbd/cache/pwl/AbstractWriteLog.h b/src/librbd/cache/pwl/AbstractWriteLog.h index 864f1e8e6313..e66562234026 100644 --- a/src/librbd/cache/pwl/AbstractWriteLog.h +++ b/src/librbd/cache/pwl/AbstractWriteLog.h @@ -178,6 +178,9 @@ private: bool m_persist_on_write_until_flush = true; + pwl::WriteLogGuard m_flush_guard; + mutable ceph::mutex m_flush_guard_lock; + /* Debug counters for the places m_async_op_tracker is used */ std::atomic m_async_complete_ops = {0}; std::atomic m_async_null_flush_finish = {0}; @@ -225,7 +228,6 @@ private: void detain_guarded_request(C_BlockIORequestT *request, pwl::GuardedRequestFunctionContext *guarded_ctx, bool is_barrier); - void perf_start(const std::string name); void perf_stop(); void log_perf(); @@ -348,6 +350,8 @@ protected: std::shared_ptr log_entry) = 0; Context *construct_flush_entry( const std::shared_ptr log_entry, bool invalidating); + void detain_flush_guard_request(std::shared_ptr log_entry, + GuardedRequestFunctionContext *guarded_ctx); void process_writeback_dirty_entries(); bool can_retire_entry(const std::shared_ptr log_entry); diff --git a/src/librbd/cache/pwl/LogEntry.h b/src/librbd/cache/pwl/LogEntry.h index a5889a13bc79..78eb4a6de6ac 100644 --- a/src/librbd/cache/pwl/LogEntry.h +++ b/src/librbd/cache/pwl/LogEntry.h @@ -27,6 +27,7 @@ public: WriteLogCacheEntry *cache_entry = nullptr; uint64_t log_entry_index = 0; bool completed = false; + BlockGuardCell* m_cell = nullptr; GenericLogEntry(uint64_t image_offset_bytes = 0, uint64_t write_bytes = 0) : ram_entry(image_offset_bytes, write_bytes) { }; diff --git a/src/librbd/cache/pwl/rwl/WriteLog.cc b/src/librbd/cache/pwl/rwl/WriteLog.cc index 22c8e8ee668e..d5b5f712bb19 100644 --- a/src/librbd/cache/pwl/rwl/WriteLog.cc +++ b/src/librbd/cache/pwl/rwl/WriteLog.cc @@ -582,21 +582,28 @@ void WriteLog::construct_flush_entries(pwl::GenericLogEntries entries_to_flus bool invalidating = this->m_invalidating; // snapshot so we behave consistently for (auto &log_entry : entries_to_flush) { - Context *ctx = this->construct_flush_entry(log_entry, invalidating); - - if (!invalidating) { - ctx = new LambdaContext( - [this, log_entry, ctx](int r) { - m_image_ctx.op_work_queue->queue(new LambdaContext( - [this, log_entry, ctx](int r) { - ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry - << " " << *log_entry << dendl; - log_entry->writeback(this->m_image_writeback, ctx); - this->m_flush_ops_will_send -= 1; - }), 0); - }); - } - post_unlock.add(ctx); + GuardedRequestFunctionContext *guarded_ctx = + new GuardedRequestFunctionContext([this, log_entry, invalidating] + (GuardedRequestFunctionContext &guard_ctx) { + log_entry->m_cell = guard_ctx.cell; + Context *ctx = this->construct_flush_entry(log_entry, invalidating); + + if (!invalidating) { + ctx = new LambdaContext( + [this, log_entry, ctx](int r) { + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + this->m_flush_ops_will_send -= 1; + }), 0); + }); + } + + ctx->complete(0); + }); + this->detain_flush_guard_request(log_entry, guarded_ctx); } } diff --git a/src/librbd/cache/pwl/ssd/WriteLog.cc b/src/librbd/cache/pwl/ssd/WriteLog.cc index 18ac2adffddb..3046a7edf613 100644 --- a/src/librbd/cache/pwl/ssd/WriteLog.cc +++ b/src/librbd/cache/pwl/ssd/WriteLog.cc @@ -552,82 +552,90 @@ void WriteLog::construct_flush_entries(pwl::GenericLogEntries entries_to_flus if (invalidating || !has_write_entry) { for (auto &log_entry : entries_to_flush) { - Context *ctx = this->construct_flush_entry(log_entry, invalidating); - - if (!invalidating) { - ctx = new LambdaContext( - [this, log_entry, ctx](int r) { - m_image_ctx.op_work_queue->queue(new LambdaContext( - [this, log_entry, ctx](int r) { - ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry - << " " << *log_entry << dendl; - log_entry->writeback(this->m_image_writeback, ctx); - this->m_flush_ops_will_send -= 1; - }), 0); - }); - } - post_unlock.add(ctx); + GuardedRequestFunctionContext *guarded_ctx = + new GuardedRequestFunctionContext([this, log_entry, invalidating] + (GuardedRequestFunctionContext &guard_ctx) { + log_entry->m_cell = guard_ctx.cell; + Context *ctx = this->construct_flush_entry(log_entry, invalidating); + + if (!invalidating) { + ctx = new LambdaContext([this, log_entry, ctx](int r) { + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + this->m_flush_ops_will_send -= 1; + }), 0); + }); + } + ctx->complete(0); + }); + this->detain_flush_guard_request(log_entry, guarded_ctx); } } else { int count = entries_to_flush.size(); - std::vector> log_entries; + std::vector> write_entries; std::vector read_bls; - std::vector contexts; - log_entries.reserve(count); + write_entries.reserve(count); read_bls.reserve(count); - contexts.reserve(count); for (auto &log_entry : entries_to_flush) { - // log_entry already removed from m_dirty_log_entries and - // in construct_flush_entry() it will inc(m_flush_ops_in_flight). - // We call this func here to make ops can track. - Context *ctx = this->construct_flush_entry(log_entry, invalidating); if (log_entry->is_write_entry()) { bufferlist *bl = new bufferlist; auto write_entry = static_pointer_cast(log_entry); write_entry->inc_bl_refs(); - log_entries.push_back(write_entry); + write_entries.push_back(write_entry); read_bls.push_back(bl); } - contexts.push_back(ctx); } Context *ctx = new LambdaContext( - [this, entries_to_flush, read_bls, contexts](int r) { - int i = 0, j = 0; + [this, entries_to_flush, read_bls](int r) { + int i = 0; + GuardedRequestFunctionContext *guarded_ctx = nullptr; for (auto &log_entry : entries_to_flush) { - Context *ctx = contexts[j++]; - if (log_entry->is_write_entry()) { bufferlist captured_entry_bl; - captured_entry_bl.claim_append(*read_bls[i]); delete read_bls[i++]; - m_image_ctx.op_work_queue->queue(new LambdaContext( - [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) { - auto captured_entry_bl = std::move(entry_bl); - ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry - << " " << *log_entry << dendl; - log_entry->writeback_bl(this->m_image_writeback, ctx, - std::move(captured_entry_bl)); - this->m_flush_ops_will_send -= 1; - }), 0); + guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl] + (GuardedRequestFunctionContext &guard_ctx) { + log_entry->m_cell = guard_ctx.cell; + Context *ctx = this->construct_flush_entry(log_entry, false); + + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) { + auto captured_entry_bl = std::move(entry_bl); + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback_bl(this->m_image_writeback, ctx, + std::move(captured_entry_bl)); + this->m_flush_ops_will_send -= 1; + }), 0); + }); } else { - m_image_ctx.op_work_queue->queue(new LambdaContext( - [this, log_entry, ctx](int r) { - ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry - << " " << *log_entry << dendl; - log_entry->writeback(this->m_image_writeback, ctx); - this->m_flush_ops_will_send -= 1; - }), 0); + guarded_ctx = new GuardedRequestFunctionContext([this, log_entry] + (GuardedRequestFunctionContext &guard_ctx) { + log_entry->m_cell = guard_ctx.cell; + Context *ctx = this->construct_flush_entry(log_entry, false); + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + this->m_flush_ops_will_send -= 1; + }), 0); + }); } + this->detain_flush_guard_request(log_entry, guarded_ctx); } }); - aio_read_data_blocks(log_entries, read_bls, ctx); + aio_read_data_blocks(write_entries, read_bls, ctx); } }