From d860e909fbfeea8988c1f32474e7e15bf3f530d8 Mon Sep 17 00:00:00 2001 From: Yuan Lu Date: Mon, 17 Feb 2020 17:35:35 +0800 Subject: [PATCH] librbd: add internal flush Signed-off-by: Peterson, Scott Signed-off-by: Li, Xiaoyan Signed-off-by: Lu, Yuan Signed-off-by: Chamarthy, Mahati --- src/librbd/cache/ReplicatedWriteLog.cc | 195 ++++++++++++++++++++++--- src/librbd/cache/ReplicatedWriteLog.h | 5 + src/librbd/cache/rwl/Types.cc | 4 + src/librbd/cache/rwl/Types.h | 2 + 4 files changed, 186 insertions(+), 20 deletions(-) diff --git a/src/librbd/cache/ReplicatedWriteLog.cc b/src/librbd/cache/ReplicatedWriteLog.cc index ca0fd2751f3de..e12579d8d968f 100644 --- a/src/librbd/cache/ReplicatedWriteLog.cc +++ b/src/librbd/cache/ReplicatedWriteLog.cc @@ -479,7 +479,7 @@ void ReplicatedWriteLog::shut_down(Context *on_finish) { << m_log_pool_name << dendl; if (remove(m_log_pool_name.c_str()) != 0) { lderr(m_image_ctx.cct) << "failed to remove empty pool \"" - << m_log_pool_name << "\": " + << m_log_pool_name << "\": " << pmemobj_errormsg() << dendl; } } @@ -525,7 +525,7 @@ void ReplicatedWriteLog::aio_write(Extents &&image_extents, template void ReplicatedWriteLog::aio_discard(uint64_t offset, uint64_t length, uint32_t discard_granularity_bytes, - Context *on_finish) { + Context *on_finish) { } template @@ -549,6 +549,7 @@ void ReplicatedWriteLog::aio_compare_and_write(Extents &&image_extents, template void ReplicatedWriteLog::flush(Context *on_finish) { + internal_flush(on_finish); } template @@ -613,8 +614,15 @@ template void ReplicatedWriteLog::detain_guarded_request( C_BlockIORequestT *request, GuardedRequestFunctionContext *guarded_ctx) { - //TODO: add is_barrier for flush request in later PRs - auto req = GuardedRequest(request->image_extents_summary.block_extent(), guarded_ctx); + BlockExtent extent; + bool is_barrier = false; + if (request) { + extent = request->image_extents_summary.block_extent(); + } else { + extent = block_extent(whole_volume_extent()); + is_barrier = true; + } + auto req = GuardedRequest(extent, guarded_ctx, is_barrier); BlockGuardCell *cell = nullptr; ldout(m_image_ctx.cct, 20) << dendl; @@ -710,7 +718,7 @@ void ReplicatedWriteLog::append_scheduled_ops(void) ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch); ops_remain = true; /* Always check again before leaving */ ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " - << m_ops_to_append.size() << " remain" << dendl; + << m_ops_to_append.size() << " remain" << dendl; } else { ops_remain = false; if (appending) { @@ -817,7 +825,7 @@ void ReplicatedWriteLog::flush_then_append_scheduled_ops(void) ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch); ops_remain = !m_ops_to_flush.empty(); ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", " - << m_ops_to_flush.size() << " remain" << dendl; + << m_ops_to_flush.size() << " remain" << dendl; } else { ops_remain = false; } @@ -946,9 +954,9 @@ void ReplicatedWriteLog::flush_op_log_entries(GenericLogOperationsVector &ops ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " " << "start address=" - << ops.front()->get_log_entry()->pmem_entry << " " + << ops.front()->get_log_entry()->pmem_entry << " " << "bytes=" - << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)) + << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)) << dendl; pmemobj_flush(m_log_pool, ops.front()->get_log_entry()->pmem_entry, @@ -1004,7 +1012,7 @@ int ReplicatedWriteLog::append_op_log_entries(GenericLogOperations &ops) ldout(m_image_ctx.cct, 20) << "APPENDING: index=" << operation->get_log_entry()->log_entry_index << " " << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry - << "]" << dendl; + << "]" << dendl; entries_to_flush.push_back(operation); } flush_op_log_entries(entries_to_flush); @@ -1068,10 +1076,10 @@ void ReplicatedWriteLog::complete_op_log_entries(GenericLogOperations &&ops, } op->complete(result); m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t, - op->log_append_time - op->dispatch_time); + op->log_append_time - op->dispatch_time); m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_cmp_t, now - op->dispatch_time); m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist, - utime_t(now - op->dispatch_time).to_nsec(), + utime_t(now - op->dispatch_time).to_nsec(), log_entry->ram_entry.write_bytes); utime_t app_lat = op->log_append_comp_time - op->log_append_time; m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_appc_t, app_lat); @@ -1263,7 +1271,7 @@ bool ReplicatedWriteLog::alloc_resources(C_BlockIORequestT *req) { if (!req->has_io_waited_for_buffers()) { req->set_io_waited_for_entries(true); ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap=" - << m_bytes_allocated_cap + << m_bytes_allocated_cap << ", allocated=" << m_bytes_allocated << ") in write [" << *req << "]" << dendl; } @@ -1424,8 +1432,6 @@ bool ReplicatedWriteLog::can_flush_entry(std::shared_ptr log ldout(cct, 20) << "" << dendl; ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); - //TODO handle invalidate - /* For OWB we can flush entries with the same sync gen number (write between * aio_flush() calls) concurrently. Here we'll consider an entry flushable if * its sync gen number is <= the lowest sync gen number carried by all the @@ -1553,11 +1559,19 @@ void ReplicatedWriteLog::process_writeback_dirty_entries() { } if (all_clean) { - // TODO: all flusing complete + /* All flushing complete, drain outside lock */ + Contexts flush_contexts; + { + std::lock_guard locker(m_lock); + flush_contexts.swap(m_flush_complete_contexts); + } + finish_contexts(m_image_ctx.cct, flush_contexts, 0); } } -/* Update/persist the last flushed sync point in the log */ +/** + * Update/persist the last flushed sync point in the log + */ template void ReplicatedWriteLog::persist_last_flushed_sync_gen() { @@ -1638,7 +1652,7 @@ void ReplicatedWriteLog::sync_point_writer_flushed(std::shared_ptr void ReplicatedWriteLog::init_flush_new_sync_point(DeferredContexts &later) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); @@ -1653,8 +1667,8 @@ void ReplicatedWriteLog::init_flush_new_sync_point(DeferredContexts &later) { } /** -* Begin a new sync point -*/ + * Begin a new sync point + */ template void ReplicatedWriteLog::new_sync_point(DeferredContexts &later) { CephContext *cct = m_image_ctx.cct; @@ -1698,7 +1712,8 @@ void ReplicatedWriteLog::new_sync_point(DeferredContexts &later) { } template -void ReplicatedWriteLog::flush_new_sync_point(C_FlushRequestT *flush_req, DeferredContexts &later) { +void ReplicatedWriteLog::flush_new_sync_point(C_FlushRequestT *flush_req, + DeferredContexts &later) { ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); if (!flush_req) { @@ -1745,6 +1760,146 @@ void ReplicatedWriteLog::flush_new_sync_point(C_FlushRequestT *flush_req, Def to_append->add_in_on_persisted_ctxs(flush_req); } +template +void ReplicatedWriteLog::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, + DeferredContexts &later) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + /* If there have been writes since the last sync point ... */ + if (m_current_sync_point->log_entry->writes) { + flush_new_sync_point(flush_req, later); + } else { + /* There have been no writes to the current sync point. */ + if (m_current_sync_point->earlier_sync_point) { + /* If previous sync point hasn't completed, complete this flush + * with the earlier sync point. No alloc or dispatch needed. */ + m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req); + } else { + /* The previous sync point has already completed and been + * appended. The current sync point has no writes, so this flush + * has nothing to wait for. This flush completes now. */ + later.add(flush_req); + } + } +} + +/* + * RWL internal flush - will actually flush the RWL. + * + * User flushes should arrive at aio_flush(), and only flush prior + * writes to all log replicas. + * + * Librbd internal flushes will arrive at flush(invalidate=false, + * discard=false), and traverse the block guard to ensure in-flight writes are + * flushed. + */ +template +void ReplicatedWriteLog::flush_dirty_entries(Context *on_finish) { + CephContext *cct = m_image_ctx.cct; + bool all_clean; + bool flushing; + bool stop_flushing; + + { + std::lock_guard locker(m_lock); + flushing = (0 != m_flush_ops_in_flight); + all_clean = m_dirty_log_entries.empty(); + stop_flushing = (m_shutting_down); + } + + if (!flushing && (all_clean || stop_flushing)) { + /* Complete without holding m_lock */ + if (all_clean) { + ldout(cct, 20) << "no dirty entries" << dendl; + } else { + ldout(cct, 5) << "flush during shutdown suppressed" << dendl; + } + on_finish->complete(0); + } else { + if (all_clean) { + ldout(cct, 5) << "flush ops still in progress" << dendl; + } else { + ldout(cct, 20) << "dirty entries remain" << dendl; + } + std::lock_guard locker(m_lock); + /* on_finish can't be completed yet */ + m_flush_complete_contexts.push_back(new LambdaContext( + [this, on_finish](int r) { + flush_dirty_entries(on_finish); + })); + wake_up(); + } +} + +template +void ReplicatedWriteLog::internal_flush(Context *on_finish) { + + /* May be called even if initialization fails */ + if (!m_initialized) { + ldout(m_image_ctx.cct, 05) << "never initialized" << dendl; + /* Deadlock if completed here */ + m_image_ctx.op_work_queue->queue(on_finish, 0); + return; + } + + /* Flush/invalidate must pass through block guard to ensure all layers of + * cache are consistently flush/invalidated. This ensures no in-flight write leaves + * some layers with valid regions, which may later produce inconsistent read + * results. */ + GuardedRequestFunctionContext *guarded_ctx = + new GuardedRequestFunctionContext( + [this, on_finish](GuardedRequestFunctionContext &guard_ctx) { + DeferredContexts on_exit; + ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl; + ceph_assert(guard_ctx.cell); + + Context *ctx = new LambdaContext( + [this, cell=guard_ctx.cell, on_finish](int r) { + std::lock_guard locker(m_lock); + ldout(m_image_ctx.cct, 6) << "Done flush" << dendl; + if (m_log_entries.size()) { + ldout(m_image_ctx.cct, 1) << "m_log_entries.size()=" + << m_log_entries.size() << ", " + << "front()=" << *m_log_entries.front() + << dendl; + } + ceph_assert(m_dirty_log_entries.size() == 0); + m_image_ctx.op_work_queue->queue(on_finish, r); + release_guarded_request(cell); + }); + ctx = new LambdaContext( + [this, ctx](int r) { + Context *next_ctx = ctx; + if (r < 0) { + /* Override on_finish status with this error */ + next_ctx = new LambdaContext([r, ctx](int _r) { + ctx->complete(r); + }); + } + { + std::lock_guard locker(m_lock); + ceph_assert(m_dirty_log_entries.size() == 0); + } + m_image_writeback.aio_flush(next_ctx); + }); + ctx = new LambdaContext( + [this, ctx](int r) { + flush_dirty_entries(ctx); + }); + std::lock_guard locker(m_lock); + /* Even if we're throwing everything away, but we want the last entry to + * be a sync point so we can cleanly resume. + * + * Also, the blockguard only guarantees the replication of this op + * can't overlap with prior ops. It doesn't guarantee those are all + * completed and eligible for flush & retire, which we require here. + */ + auto flush_req = make_flush_req(ctx); + flush_new_sync_point_if_needed(flush_req, on_exit); + }); + detain_guarded_request(nullptr, guarded_ctx); +} + } // namespace cache } // namespace librbd diff --git a/src/librbd/cache/ReplicatedWriteLog.h b/src/librbd/cache/ReplicatedWriteLog.h index 482d2f9af5183..1f51028691921 100644 --- a/src/librbd/cache/ReplicatedWriteLog.h +++ b/src/librbd/cache/ReplicatedWriteLog.h @@ -214,6 +214,8 @@ private: bool m_appending = false; bool m_dispatching_deferred_ops = false; + Contexts m_flush_complete_contexts; + rwl::GenericLogOperations m_ops_to_flush; /* Write ops needing flush in local log */ rwl::GenericLogOperations m_ops_to_append; /* Write ops needing event append in local log */ @@ -256,6 +258,7 @@ private: void wake_up(); void process_work(); + void flush_dirty_entries(Context *on_finish); bool can_flush_entry(const std::shared_ptr log_entry); Context *construct_flush_entry_ctx(const std::shared_ptr log_entry); void persist_last_flushed_sync_gen(); @@ -266,6 +269,7 @@ private: void init_flush_new_sync_point(rwl::DeferredContexts &later); void new_sync_point(rwl::DeferredContexts &later); rwl::C_FlushRequest>* make_flush_req(Context *on_finish); + void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, rwl::DeferredContexts &later); void dispatch_deferred_writes(void); void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req); @@ -279,6 +283,7 @@ private: int append_op_log_entries(rwl::GenericLogOperations &ops); void complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r); void schedule_complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r); + void internal_flush(Context *on_finish); }; } // namespace cache diff --git a/src/librbd/cache/rwl/Types.cc b/src/librbd/cache/rwl/Types.cc index 67188e0536d95..e6f10de2114a2 100644 --- a/src/librbd/cache/rwl/Types.cc +++ b/src/librbd/cache/rwl/Types.cc @@ -107,6 +107,10 @@ io::Extent whole_volume_extent() { return io::Extent({0, std::numeric_limits::max()}); } +BlockExtent block_extent(const io::Extent& image_extent) { + return convert_to_block_extent(image_extent.first, image_extent.second); +} + } // namespace rwl } // namespace cache } // namespace librbd diff --git a/src/librbd/cache/rwl/Types.h b/src/librbd/cache/rwl/Types.h index bd2ece22e2187..768902044fa78 100644 --- a/src/librbd/cache/rwl/Types.h +++ b/src/librbd/cache/rwl/Types.h @@ -261,6 +261,8 @@ public: io::Extent whole_volume_extent(); +BlockExtent block_extent(const io::Extent& image_extent); + } // namespace rwl } // namespace cache } // namespace librbd -- 2.39.5