]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd: load existing cache
authorYuan Lu <yuan.y.lu@intel.com>
Thu, 7 May 2020 07:04:06 +0000 (15:04 +0800)
committerYuan Lu <yuan.y.lu@intel.com>
Wed, 13 May 2020 06:40:28 +0000 (14:40 +0800)
Signed-off-by: Peterson, Scott <scott.d.peterson@intel.com>
Signed-off-by: Li, Xiaoyan <xiaoyan.li@intel.com>
Signed-off-by: Lu, Yuan <yuan.y.lu@intel.com>
Signed-off-by: Chamarthy, Mahati <mahati.chamarthy@intel.com>
src/librbd/cache/ReplicatedWriteLog.cc
src/librbd/cache/ReplicatedWriteLog.h
src/librbd/cache/rwl/LogEntry.h
src/librbd/cache/rwl/Request.cc
src/librbd/cache/rwl/Types.h

index 8d107463899274c7a5458e6c687859cc643fb17f..8691c1b36c8cc370cf8527c880116bd276ae969d 100644 (file)
@@ -307,6 +307,203 @@ void ReplicatedWriteLog<I>::arm_periodic_stats() {
   }
 }
 
+/*
+ * Loads the log entries from an existing log.
+ *
+ * Creates the in-memory structures to represent the state of the
+ * re-opened log.
+ *
+ * Finds the last appended sync point, and any sync points referred to
+ * in log entries, but missing from the log. These missing sync points
+ * are created and scheduled for append. Some rudimentary consistency
+ * checking is done.
+ *
+ * Rebuilds the m_blocks_to_log_entries map, to make log entries
+ * readable.
+ *
+ * Places all writes on the dirty entries list, which causes them all
+ * to be flushed.
+ *
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::load_existing_entries(DeferredContexts &later) {
+  TOID(struct WriteLogPoolRoot) pool_root;
+  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+  struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
+  uint64_t entry_index = m_first_valid_entry;
+  /* The map below allows us to find sync point log entries by sync
+   * gen number, which is necessary so write entries can be linked to
+   * their sync points. */
+  std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
+  /* The map below tracks sync points referred to in writes but not
+   * appearing in the sync_point_entries map.  We'll use this to
+   * determine which sync points are missing and need to be
+   * created. */
+  std::map<uint64_t, bool> missing_sync_points;
+
+  /*
+   * Read the existing log entries. Construct an in-memory log entry
+   * object of the appropriate type for each. Add these to the global
+   * log entries list.
+   *
+   * Write entries will not link to their sync points yet. We'll do
+   * that in the next pass. Here we'll accumulate a map of sync point
+   * gen numbers that are referred to in writes but do not appearing in
+   * the log.
+   */
+  while (entry_index != m_first_free_entry) {
+    WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index];
+    std::shared_ptr<GenericLogEntry> log_entry = nullptr;
+    bool writer = pmem_entry->is_writer();
+
+    ceph_assert(pmem_entry->entry_index == entry_index);
+    if (pmem_entry->is_sync_point()) {
+      ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+                                 << " is a sync point. pmem_entry=[" << *pmem_entry << "]" << dendl;
+      auto sync_point_entry = std::make_shared<SyncPointLogEntry>(pmem_entry->sync_gen_number);
+      log_entry = sync_point_entry;
+      sync_point_entries[pmem_entry->sync_gen_number] = sync_point_entry;
+      missing_sync_points.erase(pmem_entry->sync_gen_number);
+      m_current_sync_gen = pmem_entry->sync_gen_number;
+    } else if (pmem_entry->is_write()) {
+      ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+                                 << " is a write. pmem_entry=[" << *pmem_entry << "]" << dendl;
+      auto write_entry =
+        std::make_shared<WriteLogEntry>(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes);
+      write_entry->pmem_buffer = D_RW(pmem_entry->write_data);
+      log_entry = write_entry;
+    } else if (pmem_entry->is_writesame()) {
+      ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+                                 << " is a write same. pmem_entry=[" << *pmem_entry << "]" << dendl;
+      auto ws_entry =
+        std::make_shared<WriteSameLogEntry>(nullptr, pmem_entry->image_offset_bytes,
+                                            pmem_entry->write_bytes, pmem_entry->ws_datalen);
+      ws_entry->pmem_buffer = D_RW(pmem_entry->write_data);
+      log_entry = ws_entry;
+    } else if (pmem_entry->is_discard()) {
+      ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+                                 << " is a discard. pmem_entry=[" << *pmem_entry << "]" << dendl;
+      auto discard_entry =
+        std::make_shared<DiscardLogEntry>(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes,
+                                          m_discard_granularity_bytes);
+      log_entry = discard_entry;
+    } else {
+      lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index
+                             << ", pmem_entry=[" << *pmem_entry << "]" << dendl;
+    }
+
+    if (writer) {
+      ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+                                 << " writes. pmem_entry=[" << *pmem_entry << "]" << dendl;
+      if (!sync_point_entries[pmem_entry->sync_gen_number]) {
+        missing_sync_points[pmem_entry->sync_gen_number] = true;
+      }
+    }
+
+    log_entry->ram_entry = *pmem_entry;
+    log_entry->pmem_entry = pmem_entry;
+    log_entry->log_entry_index = entry_index;
+    log_entry->completed = true;
+
+    m_log_entries.push_back(log_entry);
+
+    entry_index = (entry_index + 1) % m_total_log_entries;
+  }
+
+  /* Create missing sync points. These must not be appended until the
+   * entry reload is complete and the write map is up to
+   * date. Currently this is handled by the deferred contexts object
+   * passed to new_sync_point(). These contexts won't be completed
+   * until this function returns.  */
+  for (auto &kv : missing_sync_points) {
+    ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl;
+    if (0 == m_current_sync_gen) {
+      /* The unlikely case where the log contains writing entries, but no sync
+       * points (e.g. because they were all retired) */
+      m_current_sync_gen = kv.first-1;
+    }
+    ceph_assert(kv.first == m_current_sync_gen+1);
+    init_flush_new_sync_point(later);
+    ceph_assert(kv.first == m_current_sync_gen);
+    sync_point_entries[kv.first] = m_current_sync_point->log_entry;;
+  }
+
+  /*
+   * Iterate over the log entries again (this time via the global
+   * entries list), connecting write entries to their sync points and
+   * updating the sync point stats.
+   *
+   * Add writes to the write log map.
+   */
+  std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr;
+  for (auto &log_entry : m_log_entries)  {
+    if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) {
+      /* This entry is one of the types that write */
+      auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry);
+      if (gen_write_entry) {
+        auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number];
+        if (!sync_point_entry) {
+          lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl;
+          ceph_assert(false);
+        } else {
+          gen_write_entry->sync_point_entry = sync_point_entry;
+          sync_point_entry->writes++;
+          sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes;
+          sync_point_entry->writes_completed++;
+          m_blocks_to_log_entries.add_log_entry(gen_write_entry);
+          /* This entry is only dirty if its sync gen number is > the flushed
+           * sync gen number from the root object. */
+          if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
+            m_dirty_log_entries.push_back(log_entry);
+            m_bytes_dirty += gen_write_entry->bytes_dirty();
+          } else {
+            gen_write_entry->set_flushed(true);
+            sync_point_entry->writes_flushed++;
+          }
+          if (log_entry->write_bytes() == log_entry->bytes_dirty()) {
+            /* This entry is a basic write */
+            uint64_t bytes_allocated = MIN_WRITE_ALLOC_SIZE;
+            if (gen_write_entry->ram_entry.write_bytes > bytes_allocated) {
+              bytes_allocated = gen_write_entry->ram_entry.write_bytes;
+            }
+            m_bytes_allocated += bytes_allocated;
+            m_bytes_cached += gen_write_entry->ram_entry.write_bytes;
+          }
+        }
+      }
+    } else {
+      /* This entry is sync point entry */
+      auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry);
+      if (sync_point_entry) {
+        if (previous_sync_point_entry) {
+          previous_sync_point_entry->next_sync_point_entry = sync_point_entry;
+          if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
+            sync_point_entry->prior_sync_point_flushed = false;
+            ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed ||
+                        (0 == previous_sync_point_entry->writes) ||
+                        (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed));
+          } else {
+            sync_point_entry->prior_sync_point_flushed = true;
+            ceph_assert(previous_sync_point_entry->prior_sync_point_flushed);
+            ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed);
+          }
+          previous_sync_point_entry = sync_point_entry;
+        } else {
+          /* There are no previous sync points, so we'll consider them flushed */
+          sync_point_entry->prior_sync_point_flushed = true;
+        }
+        ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl;
+      }
+    }
+  }
+  if (0 == m_current_sync_gen) {
+    /* If a re-opened log was completely flushed, we'll have found no sync point entries here,
+     * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync
+     * point recorded in the log. */
+    m_current_sync_gen = m_flushed_sync_gen;
+  }
+}
+
 template <typename I>
 void ReplicatedWriteLog<I>::rwl_init(Context *on_finish, DeferredContexts &later) {
   CephContext *cct = m_image_ctx.cct;
@@ -409,7 +606,51 @@ void ReplicatedWriteLog<I>::rwl_init(Context *on_finish, DeferredContexts &later
     } TX_FINALLY {
     } TX_END;
   } else {
-    // TODO: load existed cache. This will be covered in later PR.
+    m_cache_state->present = true;
+    /* Open existing pool */
+    if ((m_log_pool =
+         pmemobj_open(m_log_pool_name.c_str(),
+                      m_rwl_pool_layout_name)) == NULL) {
+      lderr(cct) << "failed to open pool (" << m_log_pool_name << "): "
+                 << pmemobj_errormsg() << dendl;
+      on_finish->complete(-errno);
+      return;
+    }
+    pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+    if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) {
+      // TODO: will handle upgrading version in the future
+      lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version
+                 << " expected " << RWL_POOL_VERSION << dendl;
+      on_finish->complete(-EINVAL);
+      return;
+    }
+    if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
+      lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size
+                 << " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
+      on_finish->complete(-EINVAL);
+      return;
+    }
+    m_log_pool_actual_size = D_RO(pool_root)->pool_size;
+    m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
+    m_total_log_entries = D_RO(pool_root)->num_log_entries;
+    m_first_free_entry = D_RO(pool_root)->first_free_entry;
+    m_first_valid_entry = D_RO(pool_root)->first_valid_entry;
+    if (m_first_free_entry < m_first_valid_entry) {
+      /* Valid entries wrap around the end of the ring, so first_free is lower
+       * than first_valid.  If first_valid was == first_free+1, the entry at
+       * first_free would be empty. The last entry is never used, so in
+       * that case there would be zero free log entries. */
+      m_free_log_entries = m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1;
+    } else {
+      /* first_valid is <= first_free. If they are == we have zero valid log
+       * entries, and n-1 free log entries */
+      m_free_log_entries = m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
+    }
+    size_t effective_pool_size = (size_t)(m_log_pool_config_size * USABLE_SIZE);
+    m_bytes_allocated_cap = effective_pool_size;
+    load_existing_entries(later);
+    m_cache_state->clean = m_dirty_log_entries.empty();
+    m_cache_state->empty = m_log_entries.empty();
   }
 
   ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
@@ -498,6 +739,7 @@ void ReplicatedWriteLog<I>::shut_down(Context *on_finish) {
       {
         std::lock_guard locker(m_lock);
         ceph_assert(m_dirty_log_entries.size() == 0);
+        m_wake_up_enabled = false;
         m_cache_state->clean = true;
         m_log_entries.clear();
         if (m_log_pool) {
@@ -735,6 +977,7 @@ void ReplicatedWriteLog<I>::aio_discard(uint64_t offset, uint64_t length,
   utime_t now = ceph_clock_now();
   m_perfcounter->inc(l_librbd_rwl_discard, 1);
   Extents discard_extents = {{offset, length}};
+  m_discard_granularity_bytes = discard_granularity_bytes;
 
   ceph_assert(m_initialized);
 
index 9a0596eb066aac3eaa85e20139c0130e0d5dd2d8..c19d68da01ff80035bb98b538a2605db0778fc77 100644 (file)
@@ -262,6 +262,8 @@ private:
   ThreadPool m_thread_pool;
   ContextWQ m_work_queue;
 
+  uint32_t m_discard_granularity_bytes;
+
   void perf_start(const std::string name);
   void perf_stop();
   void log_perf();
@@ -270,6 +272,7 @@ private:
 
   void rwl_init(Context *on_finish, rwl::DeferredContexts &later);
   void update_image_cache_state(Context *on_finish);
+  void load_existing_entries(rwl::DeferredContexts &later);
   void wake_up();
   void process_work();
 
index d7c2974477a32c30fda86d6a112faa83e497d83b..df34d8a95d0e0b07eb39909e571cb32c2f27cbd6 100644 (file)
@@ -36,7 +36,6 @@ public:
   virtual bool can_writeback() const {
     return false;
   }
-  // TODO: discard need to override this
   virtual bool can_retire() const {
     return false;
   }
@@ -210,6 +209,9 @@ public:
     /* The bytes in the image this op makes dirty. */
     return ram_entry.write_bytes;
   };
+  bool can_retire() const override {
+    return this->completed;
+  }
   void copy_pmem_bl(bufferlist *out_bl) override {
     ceph_assert(false);
   }
index 147fe0cce3d1798bc8bbd702b05af62517d00209..bd8adefbcfa899f7a5ce8f72e525fc703270ed41 100644 (file)
@@ -606,6 +606,7 @@ void C_WriteSameRequest<T>::setup_buffer_resources(
   if (pattern_length > buffer.allocation_size) {
     buffer.allocation_size = pattern_length;
   }
+  bytes_allocated += buffer.allocation_size;
 }
 
 template <typename T>
index a5f3dc2ec183926603cb4474ff6a47ce3c7e38b8..cf87391ca633ad5cbf7429d8b7eb85d0547bdc65 100644 (file)
@@ -212,6 +212,23 @@ struct WriteLogPmemEntry {
   BlockExtent block_extent();
   uint64_t get_offset_bytes();
   uint64_t get_write_bytes();
+  bool is_sync_point() {
+    return sync_point;
+  }
+  bool is_discard() {
+    return discard;
+  }
+  bool is_writesame() {
+    return writesame;
+  }
+  bool is_write() {
+    /* Log entry is a basic write */
+    return !is_sync_point() && !is_discard() && !is_writesame();
+  }
+  bool is_writer() {
+    /* Log entry is any type that writes data */
+    return is_write() || is_discard() || is_writesame();
+  }
   friend std::ostream& operator<<(std::ostream& os,
                                   const WriteLogPmemEntry &entry);
 };