From 48ad3cc26ea8d8e32c60cfc856b3ab23585bd9c6 Mon Sep 17 00:00:00 2001 From: Yuan Lu Date: Wed, 29 Apr 2020 16:03:30 +0800 Subject: [PATCH] librbd: retire entries Signed-off-by: Peterson, Scott Signed-off-by: Li, Xiaoyan Signed-off-by: Lu, Yuan Signed-off-by: Chamarthy, Mahati --- src/librbd/cache/ReplicatedWriteLog.cc | 172 ++++++++++++++++++++++++- src/librbd/cache/ReplicatedWriteLog.h | 5 + src/librbd/cache/rwl/LogEntry.cc | 2 +- src/librbd/cache/rwl/LogEntry.h | 29 ++++- src/librbd/cache/rwl/Types.h | 4 + 5 files changed, 207 insertions(+), 5 deletions(-) diff --git a/src/librbd/cache/ReplicatedWriteLog.cc b/src/librbd/cache/ReplicatedWriteLog.cc index a7ccb5bd812ab..81257398b0324 100644 --- a/src/librbd/cache/ReplicatedWriteLog.cc +++ b/src/librbd/cache/ReplicatedWriteLog.cc @@ -43,6 +43,8 @@ ReplicatedWriteLog::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::Imag m_image_ctx(image_ctx), m_log_pool_config_size(DEFAULT_POOL_SIZE), m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct), + m_log_retire_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::cache::ReplicatedWriteLog::m_log_retire_lock", this))), m_entry_reader_lock("librbd::cache::ReplicatedWriteLog::m_entry_reader_lock"), m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name( "librbd::cache::ReplicatedWriteLog::m_deferred_dispatch_lock", this))), @@ -72,6 +74,11 @@ ReplicatedWriteLog::~ReplicatedWriteLog() { std::lock_guard locker(m_lock); m_timer->cancel_event(m_timer_ctx); m_thread_pool.stop(); + ceph_assert(m_deferred_ios.size() == 0); + ceph_assert(m_ops_to_flush.size() == 0); + ceph_assert(m_ops_to_append.size() == 0); + ceph_assert(m_flush_ops_in_flight == 0); + m_log_pool = nullptr; delete m_cache_state; m_cache_state = nullptr; @@ -1709,10 +1716,15 @@ void ReplicatedWriteLog::wake_up() { template void ReplicatedWriteLog::process_work() { - // TODO: handle retiring entries in later PRs CephContext *cct = m_image_ctx.cct; int max_iterations = 4; bool wake_up_requested = false; + uint64_t aggressive_high_water_bytes = m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER; + uint64_t high_water_bytes = m_bytes_allocated_cap * RETIRE_HIGH_WATER; + uint64_t low_water_bytes = m_bytes_allocated_cap * RETIRE_LOW_WATER; + uint64_t aggressive_high_water_entries = m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER; + uint64_t high_water_entries = m_total_log_entries * RETIRE_HIGH_WATER; + uint64_t low_water_entries = m_total_log_entries * RETIRE_LOW_WATER; ldout(cct, 20) << dendl; @@ -1721,7 +1733,35 @@ void ReplicatedWriteLog::process_work() { std::lock_guard locker(m_lock); m_wake_up_requested = false; } - // TODO: retire entries if fulfill conditions + if (m_alloc_failed_since_retire || + m_bytes_allocated > high_water_bytes || + (m_log_entries.size() > high_water_entries)) { + int retired = 0; + utime_t started = ceph_clock_now(); + ldout(m_image_ctx.cct, 10) << "alloc_fail=" << m_alloc_failed_since_retire + << ", allocated > high_water=" + << (m_bytes_allocated > high_water_bytes) + << ", allocated_entries > high_water=" + << (m_log_entries.size() > high_water_entries) + << dendl; + while (m_alloc_failed_since_retire || + (m_bytes_allocated > high_water_bytes) || + (m_log_entries.size() > high_water_entries) || + (((m_bytes_allocated > low_water_bytes) || (m_log_entries.size() > low_water_entries)) && + (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) { + if (!retire_entries((m_shutting_down || + (m_bytes_allocated > aggressive_high_water_bytes) || + (m_log_entries.size() > aggressive_high_water_entries)) + ? MAX_ALLOC_PER_TRANSACTION + : MAX_FREE_PER_TRANSACTION)) { + break; + } + retired++; + dispatch_deferred_writes(); + process_writeback_dirty_entries(); + } + ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl; + } dispatch_deferred_writes(); process_writeback_dirty_entries(); @@ -1804,6 +1844,7 @@ Context* ReplicatedWriteLog::construct_flush_entry_ctx(std::shared_ptr= log_entry->bytes_dirty()); + log_entry->set_flushed(true); m_bytes_dirty -= log_entry->bytes_dirty(); sync_point_writer_flushed(log_entry->get_sync_point_entry()); ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry @@ -2221,6 +2262,133 @@ void ReplicatedWriteLog::add_into_log_map(GenericWriteLogEntries &log_entries m_blocks_to_log_entries.add_log_entries(log_entries); } +template +bool ReplicatedWriteLog::can_retire_entry(std::shared_ptr log_entry) { + CephContext *cct = m_image_ctx.cct; + + ldout(cct, 20) << dendl; + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + return log_entry->can_retire(); +} + +/** + * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries + * that are eligible to be retired. Returns true if anything was + * retired. + */ +template +bool ReplicatedWriteLog::retire_entries(const unsigned long int frees_per_tx) { + CephContext *cct = m_image_ctx.cct; + GenericLogEntriesVector retiring_entries; + uint32_t initial_first_valid_entry; + uint32_t first_valid_entry; + + std::lock_guard retire_locker(m_log_retire_lock); + ldout(cct, 20) << "Look for entries to retire" << dendl; + { + /* Entry readers can't be added while we hold m_entry_reader_lock */ + RWLock::WLocker entry_reader_locker(m_entry_reader_lock); + std::lock_guard locker(m_lock); + initial_first_valid_entry = m_first_valid_entry; + first_valid_entry = m_first_valid_entry; + auto entry = m_log_entries.front(); + while (!m_log_entries.empty() && + retiring_entries.size() < frees_per_tx && + can_retire_entry(entry)) { + if (entry->log_entry_index != first_valid_entry) { + lderr(cct) << "Retiring entry index (" << entry->log_entry_index + << ") and first valid log entry index (" << first_valid_entry + << ") must be ==." << dendl; + } + ceph_assert(entry->log_entry_index == first_valid_entry); + first_valid_entry = (first_valid_entry + 1) % m_total_log_entries; + m_log_entries.pop_front(); + retiring_entries.push_back(entry); + /* Remove entry from map so there will be no more readers */ + if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) { + auto gen_write_entry = static_pointer_cast(entry); + if (gen_write_entry) { + m_blocks_to_log_entries.remove_log_entry(gen_write_entry); + } + } + entry = m_log_entries.front(); + } + } + + if (retiring_entries.size()) { + ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl; + TOID(struct WriteLogPoolRoot) pool_root; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + + utime_t tx_start; + utime_t tx_end; + /* Advance first valid entry and release buffers */ + { + uint64_t flushed_sync_gen; + std::lock_guard append_locker(m_log_append_lock); + { + std::lock_guard locker(m_lock); + flushed_sync_gen = m_flushed_sync_gen; + } + + tx_start = ceph_clock_now(); + TX_BEGIN(m_log_pool) { + if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) { + ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from " + << D_RO(pool_root)->flushed_sync_gen << " to " + << flushed_sync_gen << dendl; + D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen; + } + D_RW(pool_root)->first_valid_entry = first_valid_entry; + for (auto &entry: retiring_entries) { + if (entry->write_bytes()) { + ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo + << "." << entry->ram_entry.write_data.oid.off << dendl; + TX_FREE(entry->ram_entry.write_data); + } else { + ldout(cct, 20) << "Retiring non-write: " << *entry << dendl; + } + } + } TX_ONCOMMIT { + } TX_ONABORT { + lderr(cct) << "failed to commit free of" << retiring_entries.size() << " log entries (" << m_log_pool_name << ")" << dendl; + ceph_assert(false); + } TX_FINALLY { + } TX_END; + tx_end = ceph_clock_now(); + } + m_perfcounter->tinc(l_librbd_rwl_retire_tx_t, tx_end - tx_start); + m_perfcounter->hinc(l_librbd_rwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), retiring_entries.size()); + + /* Update runtime copy of first_valid, and free entries counts */ + { + std::lock_guard locker(m_lock); + + ceph_assert(m_first_valid_entry == initial_first_valid_entry); + m_first_valid_entry = first_valid_entry; + m_free_log_entries += retiring_entries.size(); + for (auto &entry: retiring_entries) { + if (entry->write_bytes()) { + ceph_assert(m_bytes_cached >= entry->write_bytes()); + m_bytes_cached -= entry->write_bytes(); + uint64_t entry_allocation_size = entry->write_bytes(); + if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) { + entry_allocation_size = MIN_WRITE_ALLOC_SIZE; + } + ceph_assert(m_bytes_allocated >= entry_allocation_size); + m_bytes_allocated -= entry_allocation_size; + } + } + m_alloc_failed_since_retire = false; + wake_up(); + } + } else { + ldout(cct, 20) << "Nothing to retire" << dendl; + return false; + } + return true; +} + } // namespace cache } // namespace librbd diff --git a/src/librbd/cache/ReplicatedWriteLog.h b/src/librbd/cache/ReplicatedWriteLog.h index 23bdf205a6973..62c4aad1cf2d0 100644 --- a/src/librbd/cache/ReplicatedWriteLog.h +++ b/src/librbd/cache/ReplicatedWriteLog.h @@ -37,6 +37,8 @@ class GenericLogEntry; typedef std::list> WriteLogEntries; typedef std::list> GenericLogEntries; typedef std::list> GenericWriteLogEntries; +typedef std::vector> GenericLogEntriesVector; + typedef LogMapEntries WriteLogMapEntries; typedef LogMap WriteLogMap; @@ -197,6 +199,7 @@ private: /* Acquire locks in order declared here */ + mutable ceph::mutex m_log_retire_lock; /* Hold a read lock on m_entry_reader_lock to add readers to log entry * bufs. Hold a write lock to prevent readers from being added (e.g. when * removing log entrys from the map). No lock required to remove readers. */ @@ -275,6 +278,8 @@ private: bool handle_flushed_sync_point(std::shared_ptr log_entry); void sync_point_writer_flushed(std::shared_ptr log_entry); void process_writeback_dirty_entries(); + bool can_retire_entry(const std::shared_ptr log_entry); + bool retire_entries(const unsigned long int frees_per_tx); void init_flush_new_sync_point(rwl::DeferredContexts &later); void new_sync_point(rwl::DeferredContexts &later); diff --git a/src/librbd/cache/rwl/LogEntry.cc b/src/librbd/cache/rwl/LogEntry.cc index 76e1dad6b4626..657759de3e328 100644 --- a/src/librbd/cache/rwl/LogEntry.cc +++ b/src/librbd/cache/rwl/LogEntry.cc @@ -107,7 +107,7 @@ void WriteLogEntry::init_pmem_bl() { bl_refs = after_bl - before_bl; } -unsigned int WriteLogEntry::reader_count() { +unsigned int WriteLogEntry::reader_count() const { if (pmem_bp.have_raw()) { return (pmem_bp.raw_nref() - bl_refs - 1); } else { diff --git a/src/librbd/cache/rwl/LogEntry.h b/src/librbd/cache/rwl/LogEntry.h index 36fbf74f0ce84..d7c2974477a32 100644 --- a/src/librbd/cache/rwl/LogEntry.h +++ b/src/librbd/cache/rwl/LogEntry.h @@ -36,6 +36,16 @@ public: virtual bool can_writeback() const { return false; } + // TODO: discard need to override this + virtual bool can_retire() const { + return false; + } + virtual void set_flushed(bool flushed) { + ceph_assert(false); + } + virtual unsigned int write_bytes() const { + return 0; + }; virtual unsigned int bytes_dirty() const { return 0; }; @@ -71,6 +81,9 @@ public: ~SyncPointLogEntry() override {}; SyncPointLogEntry(const SyncPointLogEntry&) = delete; SyncPointLogEntry &operator=(const SyncPointLogEntry&) = delete; + bool can_retire() const override { + return this->completed; + } std::ostream& format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const SyncPointLogEntry &entry); @@ -88,7 +101,7 @@ public: ~GenericWriteLogEntry() override {}; GenericWriteLogEntry(const GenericWriteLogEntry&) = delete; GenericWriteLogEntry &operator=(const GenericWriteLogEntry&) = delete; - virtual unsigned int write_bytes() const { + unsigned int write_bytes() const override { /* The valid bytes in this ops data buffer. Discard and WS override. */ return ram_entry.write_bytes; }; @@ -109,9 +122,18 @@ public: return sync_point_entry; } virtual void copy_pmem_bl(bufferlist *out_bl) = 0; + void set_flushed(bool flushed) override { + m_flushed = flushed; + } + bool get_flushed() const { + return m_flushed; + } std::ostream &format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const GenericWriteLogEntry &entry); + +private: + bool m_flushed = false; /* or invalidated */ }; class WriteLogEntry : public GenericWriteLogEntry { @@ -150,13 +172,16 @@ public: void init(bool has_data, std::vector::iterator allocation, uint64_t current_sync_gen, uint64_t last_op_sequence_num, bool persist_on_flush); BlockExtent block_extent(); - unsigned int reader_count(); + unsigned int reader_count() const; /* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */ buffer::list &get_pmem_bl(); /* Constructs a new bl containing copies of pmem_bp */ void copy_pmem_bl(bufferlist *out_bl) override; void writeback(librbd::cache::ImageWritebackInterface &image_writeback, Context *ctx) override; + bool can_retire() const override { + return (this->completed && this->get_flushed() && (0 == reader_count())); + } std::ostream &format(std::ostream &os) const; friend std::ostream &operator<<(std::ostream &os, const WriteLogEntry &entry); diff --git a/src/librbd/cache/rwl/Types.h b/src/librbd/cache/rwl/Types.h index a894aaa8110d6..a5f3dc2ec1839 100644 --- a/src/librbd/cache/rwl/Types.h +++ b/src/librbd/cache/rwl/Types.h @@ -165,6 +165,10 @@ constexpr double USABLE_SIZE = (7.0 / 10); const uint64_t BLOCK_ALLOC_OVERHEAD_BYTES = 16; const uint8_t RWL_POOL_VERSION = 1; const uint64_t MAX_LOG_ENTRIES = (1024 * 1024); +const double AGGRESSIVE_RETIRE_HIGH_WATER = 0.75; +const double RETIRE_HIGH_WATER = 0.50; +const double RETIRE_LOW_WATER = 0.40; +const int RETIRE_BATCH_TIME_LIMIT_MS = 250; /* Defer a set of Contexts until destruct/exit. Used for deferring * work on a given thread until a required lock is dropped. */ -- 2.39.5