From: Yuan Lu Date: Tue, 28 Apr 2020 07:34:46 +0000 (+0800) Subject: librbd: add invalidate X-Git-Tag: wip-pdonnell-testing-20200918.022351~1341^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=8388f1e817843152f4f0477290332ea5515d114f;p=ceph-ci.git librbd: add invalidate Signed-off-by: Peterson, Scott Signed-off-by: Li, Xiaoyan Signed-off-by: Lu, Yuan Signed-off-by: Chamarthy, Mahati --- diff --git a/src/librbd/cache/ReplicatedWriteLog.cc b/src/librbd/cache/ReplicatedWriteLog.cc index 81257398b03..2c4b9f945ce 100644 --- a/src/librbd/cache/ReplicatedWriteLog.cc +++ b/src/librbd/cache/ReplicatedWriteLog.cc @@ -565,7 +565,7 @@ void ReplicatedWriteLog::shut_down(Context *on_finish) { }); /* Complete all in-flight writes before shutting down */ ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl; - internal_flush(ctx); + internal_flush(false, ctx); } } @@ -770,7 +770,7 @@ void ReplicatedWriteLog::aio_flush(io::FlushSource flush_source, Context *on_ ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl; if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source) { - internal_flush(on_finish); + internal_flush(false, on_finish); return; } m_perfcounter->inc(l_librbd_rwl_aio_flush, 1); @@ -870,11 +870,12 @@ void ReplicatedWriteLog::aio_compare_and_write(Extents &&image_extents, template void ReplicatedWriteLog::flush(Context *on_finish) { - internal_flush(on_finish); + internal_flush(false, on_finish); } template void ReplicatedWriteLog::invalidate(Context *on_finish) { + internal_flush(true, on_finish); } template @@ -1733,7 +1734,7 @@ void ReplicatedWriteLog::process_work() { std::lock_guard locker(m_lock); m_wake_up_requested = false; } - if (m_alloc_failed_since_retire || + if (m_alloc_failed_since_retire || m_invalidating || m_bytes_allocated > high_water_bytes || (m_log_entries.size() > high_water_entries)) { int retired = 0; @@ -1744,12 +1745,12 @@ void ReplicatedWriteLog::process_work() { << ", allocated_entries > high_water=" << (m_log_entries.size() > high_water_entries) << dendl; - while (m_alloc_failed_since_retire || + while (m_alloc_failed_since_retire || m_invalidating || (m_bytes_allocated > high_water_bytes) || (m_log_entries.size() > high_water_entries) || (((m_bytes_allocated > low_water_bytes) || (m_log_entries.size() > low_water_entries)) && (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) { - if (!retire_entries((m_shutting_down || + if (!retire_entries((m_shutting_down || m_invalidating || (m_bytes_allocated > aggressive_high_water_bytes) || (m_log_entries.size() > aggressive_high_water_entries)) ? MAX_ALLOC_PER_TRANSACTION @@ -1788,6 +1789,10 @@ bool ReplicatedWriteLog::can_flush_entry(std::shared_ptr log ldout(cct, 20) << "" << dendl; ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + if (m_invalidating) { + return true; + } + /* For OWB we can flush entries with the same sync gen number (write between * aio_flush() calls) concurrently. Here we'll consider an entry flushable if * its sync gen number is <= the lowest sync gen number carried by all the @@ -1819,8 +1824,8 @@ bool ReplicatedWriteLog::can_flush_entry(std::shared_ptr log template Context* ReplicatedWriteLog::construct_flush_entry_ctx(std::shared_ptr log_entry) { - //TODO handle invalidate in later PRs CephContext *cct = m_image_ctx.cct; + bool invalidating = m_invalidating; // snapshot so we behave consistently ldout(cct, 20) << "" << dendl; ceph_assert(m_entry_reader_lock.is_locked()); @@ -1835,7 +1840,7 @@ Context* ReplicatedWriteLog::construct_flush_entry_ctx(std::shared_ptr::construct_flush_entry_ctx(std::shared_ptrbytes_dirty(); sync_point_writer_flushed(log_entry->get_sync_point_entry()); ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry + << " invalidating=" << invalidating << dendl; } m_flush_ops_in_flight -= 1; @@ -1867,6 +1873,9 @@ Context* ReplicatedWriteLog::construct_flush_entry_ctx(std::shared_ptrqueue(new LambdaContext( @@ -2189,7 +2198,16 @@ void ReplicatedWriteLog::flush_dirty_entries(Context *on_finish) { } template -void ReplicatedWriteLog::internal_flush(Context *on_finish) { +void ReplicatedWriteLog::internal_flush(bool invalidate, Context *on_finish) { + ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl; + + if (m_perfcounter) { + if (invalidate) { + m_perfcounter->inc(l_librbd_rwl_invalidate_cache, 1); + } else { + m_perfcounter->inc(l_librbd_rwl_flush, 1); + } + } /* May be called even if initialization fails */ if (!m_initialized) { @@ -2205,27 +2223,32 @@ void ReplicatedWriteLog::internal_flush(Context *on_finish) { * results. */ GuardedRequestFunctionContext *guarded_ctx = new GuardedRequestFunctionContext( - [this, on_finish](GuardedRequestFunctionContext &guard_ctx) { + [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) { DeferredContexts on_exit; ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl; ceph_assert(guard_ctx.cell); Context *ctx = new LambdaContext( - [this, cell=guard_ctx.cell, on_finish](int r) { + [this, cell=guard_ctx.cell, invalidate, on_finish](int r) { std::lock_guard locker(m_lock); - ldout(m_image_ctx.cct, 6) << "Done flush" << dendl; + m_invalidating = false; + ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate=" + << invalidate << ")" << dendl; if (m_log_entries.size()) { ldout(m_image_ctx.cct, 1) << "m_log_entries.size()=" << m_log_entries.size() << ", " << "front()=" << *m_log_entries.front() << dendl; } + if (invalidate) { + ceph_assert(m_log_entries.size() == 0); + } ceph_assert(m_dirty_log_entries.size() == 0); m_image_ctx.op_work_queue->queue(on_finish, r); release_guarded_request(cell); }); ctx = new LambdaContext( - [this, ctx](int r) { + [this, ctx, invalidate](int r) { Context *next_ctx = ctx; if (r < 0) { /* Override on_finish status with this error */ @@ -2233,11 +2256,25 @@ void ReplicatedWriteLog::internal_flush(Context *on_finish) { ctx->complete(r); }); } - { - std::lock_guard locker(m_lock); - ceph_assert(m_dirty_log_entries.size() == 0); + if (invalidate) { + { + std::lock_guard locker(m_lock); + ceph_assert(m_dirty_log_entries.size() == 0); + ceph_assert(!m_invalidating); + ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl; + m_invalidating = true; + } + /* Discards all RWL entries */ + while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { } + next_ctx->complete(0); + } else { + { + std::lock_guard locker(m_lock); + ceph_assert(m_dirty_log_entries.size() == 0); + ceph_assert(!m_invalidating); + } + m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx); } - m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx); }); ctx = new LambdaContext( [this, ctx](int r) { diff --git a/src/librbd/cache/ReplicatedWriteLog.h b/src/librbd/cache/ReplicatedWriteLog.h index 62c4aad1cf2..ef22b41f959 100644 --- a/src/librbd/cache/ReplicatedWriteLog.h +++ b/src/librbd/cache/ReplicatedWriteLog.h @@ -91,7 +91,7 @@ public: /// internal state methods void init(Context *on_finish) override; void shut_down(Context *on_finish) override; - void invalidate(Context *on_finish); + void invalidate(Context *on_finish) override; void flush(Context *on_finish) override; using This = ReplicatedWriteLog; @@ -148,6 +148,7 @@ private: std::atomic m_initialized = {false}; std::atomic m_shutting_down = {false}; + std::atomic m_invalidating = {false}; PMEMobjpool *m_log_pool = nullptr; const char* m_rwl_pool_layout_name; @@ -298,7 +299,7 @@ private: int append_op_log_entries(rwl::GenericLogOperations &ops); void complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r); void schedule_complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r); - void internal_flush(Context *on_finish); + void internal_flush(bool invalidate, Context *on_finish); }; } // namespace cache