From a1c3fdf77b2b3853070aaf4dda5958a9657f8bf5 Mon Sep 17 00:00:00 2001 From: Yuan Lu Date: Wed, 12 Feb 2020 13:21:18 +0800 Subject: [PATCH] librbd: flush sync point into cache device Signed-off-by: Peterson, Scott Signed-off-by: Li, Xiaoyan Signed-off-by: Lu, Yuan Signed-off-by: Chamarthy, Mahati --- src/librbd/cache/ReplicatedWriteLog.cc | 202 ++++++++++++++++++++++++- src/librbd/cache/ReplicatedWriteLog.h | 11 ++ src/librbd/cache/rwl/LogOperation.cc | 2 +- src/librbd/cache/rwl/Request.cc | 23 ++- src/librbd/cache/rwl/Request.h | 3 +- src/librbd/cache/rwl/SyncPoint.cc | 74 ++++++++- src/librbd/cache/rwl/SyncPoint.h | 43 +++--- src/librbd/cache/rwl/Types.cc | 4 + src/librbd/cache/rwl/Types.h | 3 + 9 files changed, 332 insertions(+), 33 deletions(-) diff --git a/src/librbd/cache/ReplicatedWriteLog.cc b/src/librbd/cache/ReplicatedWriteLog.cc index 51d789e31f1..b43c073e88a 100644 --- a/src/librbd/cache/ReplicatedWriteLog.cc +++ b/src/librbd/cache/ReplicatedWriteLog.cc @@ -413,11 +413,11 @@ void ReplicatedWriteLog::rwl_init(Context *on_finish, DeferredContexts &later m_cache_state->empty = true; } - // TODO: Will init sync point, this will be covered in later PR. - // init_flush_new_sync_point(later); - ++m_current_sync_gen; - m_current_sync_point = std::make_shared(m_current_sync_gen, - this->m_image_ctx.cct); + /* Start the sync point following the last one seen in the + * log. Flush the last sync point created during the loading of the + * existing log entries. */ + init_flush_new_sync_point(later); + ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl; m_initialized = true; // Start the thread @@ -787,6 +787,14 @@ void ReplicatedWriteLog::schedule_append(GenericLogOperationsVector &ops) schedule_append(to_append); } +template +void ReplicatedWriteLog::schedule_append(GenericLogOperationSharedPtr op) +{ + GenericLogOperations to_append { op }; + + schedule_append(to_append); +} + const unsigned long int ops_flushed_together = 4; /* * Performs the pmem buffer flush on all scheduled ops, then schedules @@ -1333,6 +1341,190 @@ bool ReplicatedWriteLog::alloc_resources(C_BlockIORequestT *req) { return alloc_succeeds; } +template +C_FlushRequest>* ReplicatedWriteLog::make_flush_req(Context *on_finish) { + utime_t flush_begins = ceph_clock_now(); + bufferlist bl; + auto *flush_req = + new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}), + std::move(bl), 0, m_lock, m_perfcounter, on_finish); + + return flush_req; +} + +/* Update/persist the last flushed sync point in the log */ +template +void ReplicatedWriteLog::persist_last_flushed_sync_gen() +{ + TOID(struct WriteLogPoolRoot) pool_root; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + uint64_t flushed_sync_gen; + + std::lock_guard append_locker(m_log_append_lock); + { + std::lock_guard locker(m_lock); + flushed_sync_gen = m_flushed_sync_gen; + } + + if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) { + ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from " + << D_RO(pool_root)->flushed_sync_gen << " to " + << flushed_sync_gen << dendl; + TX_BEGIN(m_log_pool) { + D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen; + } TX_ONCOMMIT { + } TX_ONABORT { + lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl; + ceph_assert(false); + } TX_FINALLY { + } TX_END; + } +} + +/* Returns true if the specified SyncPointLogEntry is considered flushed, and + * the log will be updated to reflect this. */ +template +bool ReplicatedWriteLog::handle_flushed_sync_point(std::shared_ptr log_entry) +{ + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + ceph_assert(log_entry); + + if ((log_entry->writes_flushed == log_entry->writes) && + log_entry->completed && log_entry->prior_sync_point_flushed && + log_entry->next_sync_point_entry) { + ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point=" + << *log_entry << dendl; + log_entry->next_sync_point_entry->prior_sync_point_flushed = true; + /* Don't move the flushed sync gen num backwards. */ + if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) { + m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number; + } + m_async_op_tracker.start_op(); + m_work_queue.queue(new LambdaContext( + [this, log_entry](int r) { + bool handled_by_next; + { + std::lock_guard locker(m_lock); + handled_by_next = handle_flushed_sync_point(log_entry->next_sync_point_entry); + } + if (!handled_by_next) { + persist_last_flushed_sync_gen(); + } + m_async_op_tracker.finish_op(); + })); + return true; + } + return false; +} + +/* Make a new sync point and flush the previous during initialization, when there may or may +* not be a previous sync point */ +template +void ReplicatedWriteLog::init_flush_new_sync_point(DeferredContexts &later) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + ceph_assert(!m_initialized); /* Don't use this after init */ + + if (!m_current_sync_point) { + /* First sync point since start */ + new_sync_point(later); + } else { + flush_new_sync_point(nullptr, later); + } +} + +/** +* Begin a new sync point +*/ +template +void ReplicatedWriteLog::new_sync_point(DeferredContexts &later) { + CephContext *cct = m_image_ctx.cct; + std::shared_ptr old_sync_point = m_current_sync_point; + std::shared_ptr new_sync_point; + ldout(cct, 20) << dendl; + + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + /* The first time this is called, if this is a newly created log, + * this makes the first sync gen number we'll use 1. On the first + * call for a re-opened log m_current_sync_gen will be the highest + * gen number from all the sync point entries found in the re-opened + * log, and this advances to the next sync gen number. */ + ++m_current_sync_gen; + + new_sync_point = std::make_shared(m_current_sync_gen, cct); + m_current_sync_point = new_sync_point; + + /* If this log has been re-opened, old_sync_point will initially be + * nullptr, but m_current_sync_gen may not be zero. */ + if (old_sync_point) { + new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num); + /* This sync point will acquire no more sub-ops. Activation needs + * to acquire m_lock, so defer to later*/ + later.add(new LambdaContext( + [this, old_sync_point](int r) { + old_sync_point->prior_persisted_gather_activate(); + })); + } + + new_sync_point->prior_persisted_gather_set_finisher(); + + if (old_sync_point) { + ldout(cct,6) << "new sync point = [" << *m_current_sync_point + << "], prior = [" << *old_sync_point << "]" << dendl; + } else { + ldout(cct,6) << "first sync point = [" << *m_current_sync_point + << "]" << dendl; + } +} + +template +void ReplicatedWriteLog::flush_new_sync_point(C_FlushRequestT *flush_req, DeferredContexts &later) { + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + + if (!flush_req) { + m_async_null_flush_finish++; + m_async_op_tracker.start_op(); + Context *flush_ctx = new LambdaContext([this](int r) { + m_async_null_flush_finish--; + m_async_op_tracker.finish_op(); + }); + flush_req = make_flush_req(flush_ctx); + flush_req->internal = true; + } + + /* Add a new sync point. */ + new_sync_point(later); + std::shared_ptr to_append = m_current_sync_point->earlier_sync_point; + ceph_assert(to_append); + + /* This flush request will append/persist the (now) previous sync point */ + flush_req->to_append = to_append; + + /* When the m_sync_point_persist Gather completes this sync point can be + * appended. The only sub for this Gather is the finisher Context for + * m_prior_log_entries_persisted, which records the result of the Gather in + * the sync point, and completes. TODO: Do we still need both of these + * Gathers?*/ + Context * ctx = new LambdaContext([this, flush_req](int r) { + ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req + << " sync point =" << flush_req->to_append + << ". Ready to persist." << dendl; + alloc_and_dispatch_io_req(flush_req); + }); + to_append->persist_gather_set_finisher(ctx); + + /* The m_sync_point_persist Gather has all the subs it will ever have, and + * now has its finisher. If the sub is already complete, activation will + * complete the Gather. The finisher will acquire m_lock, so we'll activate + * this when we release m_lock.*/ + later.add(new LambdaContext([this, to_append](int r) { + to_append->persist_gather_activate(); + })); + + /* The flush request completes when the sync point persists */ + to_append->add_in_on_persisted_ctxs(flush_req); +} + } // namespace cache } // namespace librbd diff --git a/src/librbd/cache/ReplicatedWriteLog.h b/src/librbd/cache/ReplicatedWriteLog.h index dea992f77cb..30e53bc6afd 100644 --- a/src/librbd/cache/ReplicatedWriteLog.h +++ b/src/librbd/cache/ReplicatedWriteLog.h @@ -91,6 +91,7 @@ public: using This = ReplicatedWriteLog; using C_WriteRequestT = rwl::C_WriteRequest; using C_BlockIORequestT = rwl::C_BlockIORequest; + using C_FlushRequestT = rwl::C_FlushRequest; CephContext * get_context(); void release_guarded_request(BlockGuardCell *cell); void release_write_lanes(C_BlockIORequestT *req); @@ -98,7 +99,9 @@ public: template void flush_pmem_buffer(V& ops); void schedule_append(rwl::GenericLogOperationsVector &ops); + void schedule_append(rwl::GenericLogOperationSharedPtr op); void schedule_flush_and_append(rwl::GenericLogOperationsVector &ops); + void flush_new_sync_point(C_FlushRequestT *flush_req, rwl::DeferredContexts &later); std::shared_ptr get_current_sync_point() { return m_current_sync_point; } @@ -179,6 +182,7 @@ private: std::atomic m_async_flush_ops = {0}; std::atomic m_async_append_ops = {0}; std::atomic m_async_complete_ops = {0}; + std::atomic m_async_null_flush_finish = {0}; /* Acquire locks in order declared here */ @@ -237,6 +241,13 @@ private: void update_image_cache_state(Context *on_finish); void wake_up(); + void persist_last_flushed_sync_gen(); + bool handle_flushed_sync_point(std::shared_ptr log_entry); + + void init_flush_new_sync_point(rwl::DeferredContexts &later); + void new_sync_point(rwl::DeferredContexts &later); + rwl::C_FlushRequest>* make_flush_req(Context *on_finish); + void dispatch_deferred_writes(void); void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req); void append_scheduled_ops(void); diff --git a/src/librbd/cache/rwl/LogOperation.cc b/src/librbd/cache/rwl/LogOperation.cc index c0c50a50b93..ea770a5033f 100644 --- a/src/librbd/cache/rwl/LogOperation.cc +++ b/src/librbd/cache/rwl/LogOperation.cc @@ -235,7 +235,7 @@ WriteLogOperationSet::WriteLogOperationSet(utime_t dispatched, PerfCounters *per dispatch_time(dispatched), perfcounter(perfcounter), sync_point(sync_point) { - on_ops_appending = sync_point->prior_log_entries_persisted->new_sub(); + on_ops_appending = sync_point->prior_persisted_gather_new_sub(); on_ops_persist = nullptr; extent_ops_persist = new C_Gather(m_cct, diff --git a/src/librbd/cache/rwl/Request.cc b/src/librbd/cache/rwl/Request.cc index fc9b6f52eae..e47c840b4e4 100644 --- a/src/librbd/cache/rwl/Request.cc +++ b/src/librbd/cache/rwl/Request.cc @@ -190,11 +190,27 @@ void C_WriteRequest::setup_buffer_resources( } template -void C_WriteRequest::setup_log_operations() { +void C_WriteRequest::setup_log_operations(DeferredContexts &on_exit) { { std::lock_guard locker(m_lock); - // TODO: Add sync point if necessary std::shared_ptr current_sync_point = rwl.get_current_sync_point(); + if ((!rwl.get_persist_on_flush() && current_sync_point->log_entry->writes_completed) || + (current_sync_point->log_entry->writes > MAX_WRITES_PER_SYNC_POINT) || + (current_sync_point->log_entry->bytes > MAX_BYTES_PER_SYNC_POINT)) { + /* Create new sync point and persist the previous one. This sequenced + * write will bear a sync gen number shared with no already completed + * writes. A group of sequenced writes may be safely flushed concurrently + * if they all arrived before any of them completed. We'll insert one on + * an aio_flush() from the application. Here we're inserting one to cap + * the number of bytes and writes per sync point. When the application is + * not issuing flushes, we insert sync points to record some observed + * write concurrency information that enables us to safely issue >1 flush + * write (for writes observed here to have been in flight simultaneously) + * at a time in persist-on-write mode. + */ + rwl.flush_new_sync_point(nullptr, on_exit); + current_sync_point = rwl.get_current_sync_point(); + } uint64_t current_sync_gen = rwl.get_current_sync_gen(); op_set = make_unique(this->m_dispatched_time, @@ -287,11 +303,12 @@ template void C_WriteRequest::dispatch() { CephContext *cct = rwl.get_context(); + DeferredContexts on_exit; utime_t now = ceph_clock_now(); this->m_dispatched_time = now; ldout(cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl; - setup_log_operations(); + setup_log_operations(on_exit); bool append_deferred = false; if (!op_set->persist_on_flush && diff --git a/src/librbd/cache/rwl/Request.h b/src/librbd/cache/rwl/Request.h index 49794766e78..79b0f9cd9bb 100644 --- a/src/librbd/cache/rwl/Request.h +++ b/src/librbd/cache/rwl/Request.h @@ -153,7 +153,7 @@ public: void dispatch() override; - virtual void setup_log_operations(); + virtual void setup_log_operations(DeferredContexts &on_exit); bool append_write_request(std::shared_ptr sync_point); @@ -193,6 +193,7 @@ template class C_FlushRequest : public C_BlockIORequest { public: using C_BlockIORequest::rwl; + bool internal = false; std::shared_ptr to_append; C_FlushRequest(T &rwl, const utime_t arrived, diff --git a/src/librbd/cache/rwl/SyncPoint.cc b/src/librbd/cache/rwl/SyncPoint.cc index d9573aa0750..cb3f3cfeebd 100644 --- a/src/librbd/cache/rwl/SyncPoint.cc +++ b/src/librbd/cache/rwl/SyncPoint.cc @@ -14,8 +14,8 @@ namespace rwl { SyncPoint::SyncPoint(uint64_t sync_gen_num, CephContext *cct) : log_entry(std::make_shared(sync_gen_num)), m_cct(cct) { - prior_log_entries_persisted = new C_Gather(cct, nullptr); - sync_point_persist = new C_Gather(cct, nullptr); + m_prior_log_entries_persisted = new C_Gather(cct, nullptr); + m_sync_point_persist = new C_Gather(cct, nullptr); on_sync_point_appending.reserve(MAX_WRITES_PER_SYNC_POINT + 2); on_sync_point_persisted.reserve(MAX_WRITES_PER_SYNC_POINT + 2); ldout(m_cct, 20) << "sync point " << sync_gen_num << dendl; @@ -32,16 +32,78 @@ std::ostream &operator<<(std::ostream &os, os << "log_entry=[" << *p.log_entry << "], " << "earlier_sync_point=" << p.earlier_sync_point << ", " << "later_sync_point=" << p.later_sync_point << ", " - << "final_op_sequence_num=" << p.final_op_sequence_num << ", " - << "prior_log_entries_persisted=" << p.prior_log_entries_persisted << ", " - << "prior_log_entries_persisted_complete=" << p.prior_log_entries_persisted_complete << ", " - << "append_scheduled=" << p.append_scheduled << ", " + << "m_final_op_sequence_num=" << p.m_final_op_sequence_num << ", " + << "m_prior_log_entries_persisted=" << p.m_prior_log_entries_persisted << ", " + << "m_prior_log_entries_persisted_complete=" << p.m_prior_log_entries_persisted_complete << ", " + << "m_append_scheduled=" << p.m_append_scheduled << ", " << "appending=" << p.appending << ", " << "on_sync_point_appending=" << p.on_sync_point_appending.size() << ", " << "on_sync_point_persisted=" << p.on_sync_point_persisted.size() << ""; return os; } +void SyncPoint::persist_gather_set_finisher(Context *ctx) { + m_append_scheduled = true; + /* All prior sync points that are still in this list must already be scheduled for append */ + std::shared_ptr previous = earlier_sync_point; + while (previous) { + ceph_assert(previous->m_append_scheduled); + previous = previous->earlier_sync_point; + } + + m_sync_point_persist->set_finisher(ctx); +} + +void SyncPoint::persist_gather_activate() { + m_sync_point_persist->activate(); +} + +Context* SyncPoint::persist_gather_new_sub() { + return m_sync_point_persist->new_sub(); +} + +void SyncPoint::prior_persisted_gather_activate() { + m_prior_log_entries_persisted->activate(); +} + +Context* SyncPoint::prior_persisted_gather_new_sub() { + return m_prior_log_entries_persisted->new_sub(); +} + +void SyncPoint::prior_persisted_gather_set_finisher() { + Context *sync_point_persist_ready = persist_gather_new_sub(); + std::shared_ptr sp = shared_from_this(); + m_prior_log_entries_persisted-> + set_finisher(new LambdaContext([this, sp, sync_point_persist_ready](int r) { + ldout(m_cct, 20) << "Prior log entries persisted for sync point =[" + << sp << "]" << dendl; + sp->m_prior_log_entries_persisted_result = r; + sp->m_prior_log_entries_persisted_complete = true; + sync_point_persist_ready->complete(r); + })); +} + +void SyncPoint::add_in_on_persisted_ctxs(Context* ctx) { + on_sync_point_persisted.push_back(ctx); +} + +void SyncPoint::add_in_on_appending_ctxs(Context* ctx) { + on_sync_point_appending.push_back(ctx); +} + +void SyncPoint::setup_earlier_sync_point(std::shared_ptr sync_point, + uint64_t last_op_sequence_num) { + earlier_sync_point = sync_point; + log_entry->prior_sync_point_flushed = false; + earlier_sync_point->log_entry->next_sync_point_entry = log_entry; + earlier_sync_point->later_sync_point = shared_from_this(); + earlier_sync_point->m_final_op_sequence_num = last_op_sequence_num; + if (!earlier_sync_point->appending) { + /* Append of new sync point deferred until old sync point is appending */ + earlier_sync_point->add_in_on_appending_ctxs(prior_persisted_gather_new_sub()); + } +} + } // namespace rwl } // namespace cache } // namespace librbd diff --git a/src/librbd/cache/rwl/SyncPoint.h b/src/librbd/cache/rwl/SyncPoint.h index da3cd09610e..1445146534e 100644 --- a/src/librbd/cache/rwl/SyncPoint.h +++ b/src/librbd/cache/rwl/SyncPoint.h @@ -12,27 +12,12 @@ namespace librbd { namespace cache { namespace rwl { -class SyncPoint { +class SyncPoint: public std::enable_shared_from_this { public: std::shared_ptr log_entry; /* Use lock for earlier/later links */ std::shared_ptr earlier_sync_point; /* NULL if earlier has completed */ std::shared_ptr later_sync_point; - uint64_t final_op_sequence_num = 0; - /* A sync point can't appear in the log until all the writes bearing - * it and all the prior sync points have been appended and - * persisted. - * - * Writes bearing this sync gen number and the prior sync point will be - * sub-ops of this Gather. This sync point will not be appended until all - * these complete to the point where their persist order is guaranteed. */ - C_Gather *prior_log_entries_persisted; - int prior_log_entries_persisted_result = 0; - int prior_log_entries_persisted_complete = false; - /* The finisher for this will append the sync point to the log. The finisher - * for m_prior_log_entries_persisted will be a sub-op of this. */ - C_Gather *sync_point_persist; - bool append_scheduled = false; bool appending = false; /* Signal these when this sync point is appending to the log, and its order * of appearance is guaranteed. One of these is is a sub-operation of the @@ -46,9 +31,33 @@ public: ~SyncPoint(); SyncPoint(const SyncPoint&) = delete; SyncPoint &operator=(const SyncPoint&) = delete; - + void persist_gather_activate(); + Context* persist_gather_new_sub(); + void persist_gather_set_finisher(Context *ctx); + void prior_persisted_gather_activate(); + Context* prior_persisted_gather_new_sub(); + void prior_persisted_gather_set_finisher(); + void add_in_on_persisted_ctxs(Context* cxt); + void add_in_on_appending_ctxs(Context* cxt); + void setup_earlier_sync_point(std::shared_ptr sync_point, + uint64_t last_op_sequence_num); private: CephContext *m_cct; + bool m_append_scheduled = false; + uint64_t m_final_op_sequence_num = 0; + /* A sync point can't appear in the log until all the writes bearing + * it and all the prior sync points have been appended and + * persisted. + * + * Writes bearing this sync gen number and the prior sync point will be + * sub-ops of this Gather. This sync point will not be appended until all + * these complete to the point where their persist order is guaranteed. */ + C_Gather *m_prior_log_entries_persisted; + /* The finisher for this will append the sync point to the log. The finisher + * for m_prior_log_entries_persisted will be a sub-op of this. */ + C_Gather *m_sync_point_persist; + int m_prior_log_entries_persisted_result = 0; + int m_prior_log_entries_persisted_complete = false; friend std::ostream &operator<<(std::ostream &os, const SyncPoint &p); }; diff --git a/src/librbd/cache/rwl/Types.cc b/src/librbd/cache/rwl/Types.cc index bd1b097c2e7..67188e0536d 100644 --- a/src/librbd/cache/rwl/Types.cc +++ b/src/librbd/cache/rwl/Types.cc @@ -103,6 +103,10 @@ std::ostream &operator<<(std::ostream &os, return os; }; +io::Extent whole_volume_extent() { + return io::Extent({0, std::numeric_limits::max()}); +} + } // namespace rwl } // namespace cache } // namespace librbd diff --git a/src/librbd/cache/rwl/Types.h b/src/librbd/cache/rwl/Types.h index 6bfceea0a51..d5de52a4c9e 100644 --- a/src/librbd/cache/rwl/Types.h +++ b/src/librbd/cache/rwl/Types.h @@ -143,6 +143,7 @@ namespace rwl { /* Limit work between sync points */ const uint64_t MAX_WRITES_PER_SYNC_POINT = 256; +const uint64_t MAX_BYTES_PER_SYNC_POINT = (1024 * 1024 * 8); const uint32_t MIN_WRITE_ALLOC_SIZE = 512; const uint32_t LOG_STATS_INTERVAL_SECONDS = 5; @@ -255,6 +256,8 @@ public: } }; +io::Extent whole_volume_extent(); + } // namespace rwl } // namespace cache } // namespace librbd -- 2.39.5