]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: retire entries
authorYuan Lu <yuan.y.lu@intel.com>
Wed, 29 Apr 2020 08:03:30 +0000 (16:03 +0800)
committerYuan Lu <yuan.y.lu@intel.com>
Wed, 6 May 2020 15:30:38 +0000 (23:30 +0800)
Signed-off-by: Peterson, Scott <scott.d.peterson@intel.com>
Signed-off-by: Li, Xiaoyan <xiaoyan.li@intel.com>
Signed-off-by: Lu, Yuan <yuan.y.lu@intel.com>
Signed-off-by: Chamarthy, Mahati <mahati.chamarthy@intel.com>
src/librbd/cache/ReplicatedWriteLog.cc
src/librbd/cache/ReplicatedWriteLog.h
src/librbd/cache/rwl/LogEntry.cc
src/librbd/cache/rwl/LogEntry.h
src/librbd/cache/rwl/Types.h

index a7ccb5bd812ab852a6af361618b029b47a24f581..81257398b0324145c54b536ab048c998765af790 100644 (file)
@@ -43,6 +43,8 @@ ReplicatedWriteLog<I>::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::Imag
     m_image_ctx(image_ctx),
     m_log_pool_config_size(DEFAULT_POOL_SIZE),
     m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct),
+    m_log_retire_lock(ceph::make_mutex(util::unique_lock_name(
+      "librbd::cache::ReplicatedWriteLog::m_log_retire_lock", this))),
     m_entry_reader_lock("librbd::cache::ReplicatedWriteLog::m_entry_reader_lock"),
     m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name(
       "librbd::cache::ReplicatedWriteLog::m_deferred_dispatch_lock", this))),
@@ -72,6 +74,11 @@ ReplicatedWriteLog<I>::~ReplicatedWriteLog() {
     std::lock_guard locker(m_lock);
     m_timer->cancel_event(m_timer_ctx);
     m_thread_pool.stop();
+    ceph_assert(m_deferred_ios.size() == 0);
+    ceph_assert(m_ops_to_flush.size() == 0);
+    ceph_assert(m_ops_to_append.size() == 0);
+    ceph_assert(m_flush_ops_in_flight == 0);
+
     m_log_pool = nullptr;
     delete m_cache_state;
     m_cache_state = nullptr;
@@ -1709,10 +1716,15 @@ void ReplicatedWriteLog<I>::wake_up() {
 
 template <typename I>
 void ReplicatedWriteLog<I>::process_work() {
-  // TODO: handle retiring entries in later PRs
   CephContext *cct = m_image_ctx.cct;
   int max_iterations = 4;
   bool wake_up_requested = false;
+  uint64_t aggressive_high_water_bytes = m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
+  uint64_t high_water_bytes = m_bytes_allocated_cap * RETIRE_HIGH_WATER;
+  uint64_t low_water_bytes = m_bytes_allocated_cap * RETIRE_LOW_WATER;
+  uint64_t aggressive_high_water_entries = m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER;
+  uint64_t high_water_entries = m_total_log_entries * RETIRE_HIGH_WATER;
+  uint64_t low_water_entries = m_total_log_entries * RETIRE_LOW_WATER;
 
   ldout(cct, 20) << dendl;
 
@@ -1721,7 +1733,35 @@ void ReplicatedWriteLog<I>::process_work() {
       std::lock_guard locker(m_lock);
       m_wake_up_requested = false;
     }
-    // TODO: retire entries if fulfill conditions
+    if (m_alloc_failed_since_retire ||
+        m_bytes_allocated > high_water_bytes ||
+        (m_log_entries.size() > high_water_entries)) {
+      int retired = 0;
+      utime_t started = ceph_clock_now();
+      ldout(m_image_ctx.cct, 10) << "alloc_fail=" << m_alloc_failed_since_retire
+                                 << ", allocated > high_water="
+                                 << (m_bytes_allocated > high_water_bytes)
+                                 << ", allocated_entries > high_water="
+                                 << (m_log_entries.size() > high_water_entries)
+                                 << dendl;
+      while (m_alloc_failed_since_retire ||
+            (m_bytes_allocated > high_water_bytes) ||
+            (m_log_entries.size() > high_water_entries) ||
+            (((m_bytes_allocated > low_water_bytes) || (m_log_entries.size() > low_water_entries)) &&
+            (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
+        if (!retire_entries((m_shutting_down ||
+           (m_bytes_allocated > aggressive_high_water_bytes) ||
+           (m_log_entries.size() > aggressive_high_water_entries))
+            ? MAX_ALLOC_PER_TRANSACTION
+            : MAX_FREE_PER_TRANSACTION)) {
+          break;
+        }
+        retired++;
+        dispatch_deferred_writes();
+        process_writeback_dirty_entries();
+      }
+      ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl;
+    }
     dispatch_deferred_writes();
     process_writeback_dirty_entries();
 
@@ -1804,6 +1844,7 @@ Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<Generi
           m_dirty_log_entries.push_front(log_entry);
         } else {
           ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty());
+          log_entry->set_flushed(true);
           m_bytes_dirty -= log_entry->bytes_dirty();
           sync_point_writer_flushed(log_entry->get_sync_point_entry());
           ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
@@ -2221,6 +2262,133 @@ void ReplicatedWriteLog<I>::add_into_log_map(GenericWriteLogEntries &log_entries
   m_blocks_to_log_entries.add_log_entries(log_entries);
 }
 
+template <typename I>
+bool ReplicatedWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_entry) {
+  CephContext *cct = m_image_ctx.cct;
+
+  ldout(cct, 20) << dendl;
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+  return log_entry->can_retire();
+}
+
+/**
+ * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
+ * that are eligible to be retired. Returns true if anything was
+ * retired.
+ */
+template <typename I>
+bool ReplicatedWriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
+  CephContext *cct = m_image_ctx.cct;
+  GenericLogEntriesVector retiring_entries;
+  uint32_t initial_first_valid_entry;
+  uint32_t first_valid_entry;
+
+  std::lock_guard retire_locker(m_log_retire_lock);
+  ldout(cct, 20) << "Look for entries to retire" << dendl;
+  {
+    /* Entry readers can't be added while we hold m_entry_reader_lock */
+    RWLock::WLocker entry_reader_locker(m_entry_reader_lock);
+    std::lock_guard locker(m_lock);
+    initial_first_valid_entry = m_first_valid_entry;
+    first_valid_entry = m_first_valid_entry;
+    auto entry = m_log_entries.front();
+    while (!m_log_entries.empty() &&
+           retiring_entries.size() < frees_per_tx &&
+           can_retire_entry(entry)) {
+      if (entry->log_entry_index != first_valid_entry) {
+        lderr(cct) << "Retiring entry index (" << entry->log_entry_index
+                   << ") and first valid log entry index (" << first_valid_entry
+                   << ") must be ==." << dendl;
+      }
+      ceph_assert(entry->log_entry_index == first_valid_entry);
+      first_valid_entry = (first_valid_entry + 1) % m_total_log_entries;
+      m_log_entries.pop_front();
+      retiring_entries.push_back(entry);
+      /* Remove entry from map so there will be no more readers */
+      if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) {
+        auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(entry);
+        if (gen_write_entry) {
+          m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
+        }
+      }
+      entry = m_log_entries.front();
+    }
+  }
+
+  if (retiring_entries.size()) {
+    ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl;
+    TOID(struct WriteLogPoolRoot) pool_root;
+    pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+
+    utime_t tx_start;
+    utime_t tx_end;
+    /* Advance first valid entry and release buffers */
+    {
+      uint64_t flushed_sync_gen;
+      std::lock_guard append_locker(m_log_append_lock);
+      {
+        std::lock_guard locker(m_lock);
+        flushed_sync_gen = m_flushed_sync_gen;
+      }
+
+      tx_start = ceph_clock_now();
+      TX_BEGIN(m_log_pool) {
+        if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
+          ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from "
+                                     << D_RO(pool_root)->flushed_sync_gen << " to "
+                                     << flushed_sync_gen << dendl;
+          D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
+        }
+        D_RW(pool_root)->first_valid_entry = first_valid_entry;
+        for (auto &entry: retiring_entries) {
+          if (entry->write_bytes()) {
+            ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo
+                           << "." << entry->ram_entry.write_data.oid.off << dendl;
+            TX_FREE(entry->ram_entry.write_data);
+          } else {
+            ldout(cct, 20) << "Retiring non-write: " << *entry << dendl;
+          }
+        }
+      } TX_ONCOMMIT {
+      } TX_ONABORT {
+        lderr(cct) << "failed to commit free of" << retiring_entries.size() << " log entries (" << m_log_pool_name << ")" << dendl;
+        ceph_assert(false);
+      } TX_FINALLY {
+      } TX_END;
+      tx_end = ceph_clock_now();
+    }
+    m_perfcounter->tinc(l_librbd_rwl_retire_tx_t, tx_end - tx_start);
+    m_perfcounter->hinc(l_librbd_rwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), retiring_entries.size());
+
+    /* Update runtime copy of first_valid, and free entries counts */
+    {
+      std::lock_guard locker(m_lock);
+
+      ceph_assert(m_first_valid_entry == initial_first_valid_entry);
+      m_first_valid_entry = first_valid_entry;
+      m_free_log_entries += retiring_entries.size();
+      for (auto &entry: retiring_entries) {
+        if (entry->write_bytes()) {
+          ceph_assert(m_bytes_cached >= entry->write_bytes());
+          m_bytes_cached -= entry->write_bytes();
+          uint64_t entry_allocation_size = entry->write_bytes();
+          if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) {
+            entry_allocation_size = MIN_WRITE_ALLOC_SIZE;
+          }
+          ceph_assert(m_bytes_allocated >= entry_allocation_size);
+          m_bytes_allocated -= entry_allocation_size;
+        }
+      }
+      m_alloc_failed_since_retire = false;
+      wake_up();
+    }
+  } else {
+    ldout(cct, 20) << "Nothing to retire" << dendl;
+    return false;
+  }
+  return true;
+}
+
 } // namespace cache
 } // namespace librbd
 
index 23bdf205a6973505a27ca5c2cb76ebcb5304e9cc..62c4aad1cf2d096deac900ed195c0791ec85bf00 100644 (file)
@@ -37,6 +37,8 @@ class GenericLogEntry;
 typedef std::list<std::shared_ptr<WriteLogEntry>> WriteLogEntries;
 typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
 typedef std::list<std::shared_ptr<GenericWriteLogEntry>> GenericWriteLogEntries;
+typedef std::vector<std::shared_ptr<GenericLogEntry>> GenericLogEntriesVector;
+
 typedef LogMapEntries<GenericWriteLogEntry> WriteLogMapEntries;
 typedef LogMap<GenericWriteLogEntry> WriteLogMap;
 
@@ -197,6 +199,7 @@ private:
 
   /* Acquire locks in order declared here */
 
+  mutable ceph::mutex m_log_retire_lock;
   /* Hold a read lock on m_entry_reader_lock to add readers to log entry
    * bufs. Hold a write lock to prevent readers from being added (e.g. when
    * removing log entrys from the map). No lock required to remove readers. */
@@ -275,6 +278,8 @@ private:
   bool handle_flushed_sync_point(std::shared_ptr<rwl::SyncPointLogEntry> log_entry);
   void sync_point_writer_flushed(std::shared_ptr<rwl::SyncPointLogEntry> log_entry);
   void process_writeback_dirty_entries();
+  bool can_retire_entry(const std::shared_ptr<rwl::GenericLogEntry> log_entry);
+  bool retire_entries(const unsigned long int frees_per_tx);
 
   void init_flush_new_sync_point(rwl::DeferredContexts &later);
   void new_sync_point(rwl::DeferredContexts &later);
index 76e1dad6b46263b66c55d0bf8afc530659d5df96..657759de3e32872ad313fd580832a698d9de0f5f 100644 (file)
@@ -107,7 +107,7 @@ void WriteLogEntry::init_pmem_bl() {
   bl_refs = after_bl - before_bl;
 }
 
-unsigned int WriteLogEntry::reader_count() {
+unsigned int WriteLogEntry::reader_count() const {
   if (pmem_bp.have_raw()) {
     return (pmem_bp.raw_nref() - bl_refs - 1);
   } else {
index 36fbf74f0ce845c281b299553234430533b45397..d7c2974477a32c30fda86d6a112faa83e497d83b 100644 (file)
@@ -36,6 +36,16 @@ public:
   virtual bool can_writeback() const {
     return false;
   }
+  // TODO: discard need to override this
+  virtual bool can_retire() const {
+    return false;
+  }
+  virtual void set_flushed(bool flushed) {
+    ceph_assert(false);
+  }
+  virtual unsigned int write_bytes() const {
+    return 0;
+  };
   virtual unsigned int bytes_dirty() const {
     return 0;
   };
@@ -71,6 +81,9 @@ public:
   ~SyncPointLogEntry() override {};
   SyncPointLogEntry(const SyncPointLogEntry&) = delete;
   SyncPointLogEntry &operator=(const SyncPointLogEntry&) = delete;
+  bool can_retire() const override {
+    return this->completed;
+  }
   std::ostream& format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const SyncPointLogEntry &entry);
@@ -88,7 +101,7 @@ public:
   ~GenericWriteLogEntry() override {};
   GenericWriteLogEntry(const GenericWriteLogEntry&) = delete;
   GenericWriteLogEntry &operator=(const GenericWriteLogEntry&) = delete;
-  virtual unsigned int write_bytes() const {
+  unsigned int write_bytes() const override {
     /* The valid bytes in this ops data buffer. Discard and WS override. */
     return ram_entry.write_bytes;
   };
@@ -109,9 +122,18 @@ public:
     return sync_point_entry;
   }
   virtual void copy_pmem_bl(bufferlist *out_bl) = 0;
+  void set_flushed(bool flushed) override {
+    m_flushed = flushed;
+  }
+  bool get_flushed() const {
+    return m_flushed;
+  }
   std::ostream &format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const GenericWriteLogEntry &entry);
+
+private:
+  bool m_flushed = false; /* or invalidated */
 };
 
 class WriteLogEntry : public GenericWriteLogEntry {
@@ -150,13 +172,16 @@ public:
   void init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation,
             uint64_t current_sync_gen, uint64_t last_op_sequence_num, bool persist_on_flush);
   BlockExtent block_extent();
-  unsigned int reader_count();
+  unsigned int reader_count() const;
   /* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */
   buffer::list &get_pmem_bl();
   /* Constructs a new bl containing copies of pmem_bp */
   void copy_pmem_bl(bufferlist *out_bl) override;
   void writeback(librbd::cache::ImageWritebackInterface &image_writeback,
                  Context *ctx) override;
+  bool can_retire() const override {
+    return (this->completed && this->get_flushed() && (0 == reader_count()));
+  }
   std::ostream &format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const WriteLogEntry &entry);
index a894aaa8110d694a3c804fb6a9216f21456c40e2..a5f3dc2ec183926603cb4474ff6a47ce3c7e38b8 100644 (file)
@@ -165,6 +165,10 @@ constexpr double USABLE_SIZE = (7.0 / 10);
 const uint64_t BLOCK_ALLOC_OVERHEAD_BYTES = 16;
 const uint8_t RWL_POOL_VERSION = 1;
 const uint64_t MAX_LOG_ENTRIES = (1024 * 1024);
+const double AGGRESSIVE_RETIRE_HIGH_WATER = 0.75;
+const double RETIRE_HIGH_WATER = 0.50;
+const double RETIRE_LOW_WATER = 0.40;
+const int RETIRE_BATCH_TIME_LIMIT_MS = 250;
 
 /* Defer a set of Contexts until destruct/exit. Used for deferring
  * work on a given thread until a required lock is dropped. */