From f41b8a4f4da9eb51903a3644ac58028a81f28821 Mon Sep 17 00:00:00 2001 From: Yuan Lu Date: Thu, 7 May 2020 15:04:06 +0800 Subject: [PATCH] librbd: load existing cache Signed-off-by: Peterson, Scott Signed-off-by: Li, Xiaoyan Signed-off-by: Lu, Yuan Signed-off-by: Chamarthy, Mahati --- src/librbd/cache/ReplicatedWriteLog.cc | 245 ++++++++++++++++++++++++- src/librbd/cache/ReplicatedWriteLog.h | 3 + src/librbd/cache/rwl/LogEntry.h | 4 +- src/librbd/cache/rwl/Request.cc | 1 + src/librbd/cache/rwl/Types.h | 17 ++ 5 files changed, 268 insertions(+), 2 deletions(-) diff --git a/src/librbd/cache/ReplicatedWriteLog.cc b/src/librbd/cache/ReplicatedWriteLog.cc index 8d107463899..8691c1b36c8 100644 --- a/src/librbd/cache/ReplicatedWriteLog.cc +++ b/src/librbd/cache/ReplicatedWriteLog.cc @@ -307,6 +307,203 @@ void ReplicatedWriteLog::arm_periodic_stats() { } } +/* + * Loads the log entries from an existing log. + * + * Creates the in-memory structures to represent the state of the + * re-opened log. + * + * Finds the last appended sync point, and any sync points referred to + * in log entries, but missing from the log. These missing sync points + * are created and scheduled for append. Some rudimentary consistency + * checking is done. + * + * Rebuilds the m_blocks_to_log_entries map, to make log entries + * readable. + * + * Places all writes on the dirty entries list, which causes them all + * to be flushed. + * + */ +template +void ReplicatedWriteLog::load_existing_entries(DeferredContexts &later) { + TOID(struct WriteLogPoolRoot) pool_root; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries); + uint64_t entry_index = m_first_valid_entry; + /* The map below allows us to find sync point log entries by sync + * gen number, which is necessary so write entries can be linked to + * their sync points. */ + std::map> sync_point_entries; + /* The map below tracks sync points referred to in writes but not + * appearing in the sync_point_entries map. We'll use this to + * determine which sync points are missing and need to be + * created. */ + std::map missing_sync_points; + + /* + * Read the existing log entries. Construct an in-memory log entry + * object of the appropriate type for each. Add these to the global + * log entries list. + * + * Write entries will not link to their sync points yet. We'll do + * that in the next pass. Here we'll accumulate a map of sync point + * gen numbers that are referred to in writes but do not appearing in + * the log. + */ + while (entry_index != m_first_free_entry) { + WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index]; + std::shared_ptr log_entry = nullptr; + bool writer = pmem_entry->is_writer(); + + ceph_assert(pmem_entry->entry_index == entry_index); + if (pmem_entry->is_sync_point()) { + ldout(m_image_ctx.cct, 20) << "Entry " << entry_index + << " is a sync point. pmem_entry=[" << *pmem_entry << "]" << dendl; + auto sync_point_entry = std::make_shared(pmem_entry->sync_gen_number); + log_entry = sync_point_entry; + sync_point_entries[pmem_entry->sync_gen_number] = sync_point_entry; + missing_sync_points.erase(pmem_entry->sync_gen_number); + m_current_sync_gen = pmem_entry->sync_gen_number; + } else if (pmem_entry->is_write()) { + ldout(m_image_ctx.cct, 20) << "Entry " << entry_index + << " is a write. pmem_entry=[" << *pmem_entry << "]" << dendl; + auto write_entry = + std::make_shared(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes); + write_entry->pmem_buffer = D_RW(pmem_entry->write_data); + log_entry = write_entry; + } else if (pmem_entry->is_writesame()) { + ldout(m_image_ctx.cct, 20) << "Entry " << entry_index + << " is a write same. pmem_entry=[" << *pmem_entry << "]" << dendl; + auto ws_entry = + std::make_shared(nullptr, pmem_entry->image_offset_bytes, + pmem_entry->write_bytes, pmem_entry->ws_datalen); + ws_entry->pmem_buffer = D_RW(pmem_entry->write_data); + log_entry = ws_entry; + } else if (pmem_entry->is_discard()) { + ldout(m_image_ctx.cct, 20) << "Entry " << entry_index + << " is a discard. pmem_entry=[" << *pmem_entry << "]" << dendl; + auto discard_entry = + std::make_shared(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes, + m_discard_granularity_bytes); + log_entry = discard_entry; + } else { + lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index + << ", pmem_entry=[" << *pmem_entry << "]" << dendl; + } + + if (writer) { + ldout(m_image_ctx.cct, 20) << "Entry " << entry_index + << " writes. pmem_entry=[" << *pmem_entry << "]" << dendl; + if (!sync_point_entries[pmem_entry->sync_gen_number]) { + missing_sync_points[pmem_entry->sync_gen_number] = true; + } + } + + log_entry->ram_entry = *pmem_entry; + log_entry->pmem_entry = pmem_entry; + log_entry->log_entry_index = entry_index; + log_entry->completed = true; + + m_log_entries.push_back(log_entry); + + entry_index = (entry_index + 1) % m_total_log_entries; + } + + /* Create missing sync points. These must not be appended until the + * entry reload is complete and the write map is up to + * date. Currently this is handled by the deferred contexts object + * passed to new_sync_point(). These contexts won't be completed + * until this function returns. */ + for (auto &kv : missing_sync_points) { + ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl; + if (0 == m_current_sync_gen) { + /* The unlikely case where the log contains writing entries, but no sync + * points (e.g. because they were all retired) */ + m_current_sync_gen = kv.first-1; + } + ceph_assert(kv.first == m_current_sync_gen+1); + init_flush_new_sync_point(later); + ceph_assert(kv.first == m_current_sync_gen); + sync_point_entries[kv.first] = m_current_sync_point->log_entry;; + } + + /* + * Iterate over the log entries again (this time via the global + * entries list), connecting write entries to their sync points and + * updating the sync point stats. + * + * Add writes to the write log map. + */ + std::shared_ptr previous_sync_point_entry = nullptr; + for (auto &log_entry : m_log_entries) { + if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) { + /* This entry is one of the types that write */ + auto gen_write_entry = static_pointer_cast(log_entry); + if (gen_write_entry) { + auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number]; + if (!sync_point_entry) { + lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl; + ceph_assert(false); + } else { + gen_write_entry->sync_point_entry = sync_point_entry; + sync_point_entry->writes++; + sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes; + sync_point_entry->writes_completed++; + m_blocks_to_log_entries.add_log_entry(gen_write_entry); + /* This entry is only dirty if its sync gen number is > the flushed + * sync gen number from the root object. */ + if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) { + m_dirty_log_entries.push_back(log_entry); + m_bytes_dirty += gen_write_entry->bytes_dirty(); + } else { + gen_write_entry->set_flushed(true); + sync_point_entry->writes_flushed++; + } + if (log_entry->write_bytes() == log_entry->bytes_dirty()) { + /* This entry is a basic write */ + uint64_t bytes_allocated = MIN_WRITE_ALLOC_SIZE; + if (gen_write_entry->ram_entry.write_bytes > bytes_allocated) { + bytes_allocated = gen_write_entry->ram_entry.write_bytes; + } + m_bytes_allocated += bytes_allocated; + m_bytes_cached += gen_write_entry->ram_entry.write_bytes; + } + } + } + } else { + /* This entry is sync point entry */ + auto sync_point_entry = static_pointer_cast(log_entry); + if (sync_point_entry) { + if (previous_sync_point_entry) { + previous_sync_point_entry->next_sync_point_entry = sync_point_entry; + if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) { + sync_point_entry->prior_sync_point_flushed = false; + ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed || + (0 == previous_sync_point_entry->writes) || + (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed)); + } else { + sync_point_entry->prior_sync_point_flushed = true; + ceph_assert(previous_sync_point_entry->prior_sync_point_flushed); + ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed); + } + previous_sync_point_entry = sync_point_entry; + } else { + /* There are no previous sync points, so we'll consider them flushed */ + sync_point_entry->prior_sync_point_flushed = true; + } + ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl; + } + } + } + if (0 == m_current_sync_gen) { + /* If a re-opened log was completely flushed, we'll have found no sync point entries here, + * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync + * point recorded in the log. */ + m_current_sync_gen = m_flushed_sync_gen; + } +} + template void ReplicatedWriteLog::rwl_init(Context *on_finish, DeferredContexts &later) { CephContext *cct = m_image_ctx.cct; @@ -409,7 +606,51 @@ void ReplicatedWriteLog::rwl_init(Context *on_finish, DeferredContexts &later } TX_FINALLY { } TX_END; } else { - // TODO: load existed cache. This will be covered in later PR. + m_cache_state->present = true; + /* Open existing pool */ + if ((m_log_pool = + pmemobj_open(m_log_pool_name.c_str(), + m_rwl_pool_layout_name)) == NULL) { + lderr(cct) << "failed to open pool (" << m_log_pool_name << "): " + << pmemobj_errormsg() << dendl; + on_finish->complete(-errno); + return; + } + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) { + // TODO: will handle upgrading version in the future + lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version + << " expected " << RWL_POOL_VERSION << dendl; + on_finish->complete(-EINVAL); + return; + } + if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) { + lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size + << " expected " << MIN_WRITE_ALLOC_SIZE << dendl; + on_finish->complete(-EINVAL); + return; + } + m_log_pool_actual_size = D_RO(pool_root)->pool_size; + m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen; + m_total_log_entries = D_RO(pool_root)->num_log_entries; + m_first_free_entry = D_RO(pool_root)->first_free_entry; + m_first_valid_entry = D_RO(pool_root)->first_valid_entry; + if (m_first_free_entry < m_first_valid_entry) { + /* Valid entries wrap around the end of the ring, so first_free is lower + * than first_valid. If first_valid was == first_free+1, the entry at + * first_free would be empty. The last entry is never used, so in + * that case there would be zero free log entries. */ + m_free_log_entries = m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1; + } else { + /* first_valid is <= first_free. If they are == we have zero valid log + * entries, and n-1 free log entries */ + m_free_log_entries = m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1; + } + size_t effective_pool_size = (size_t)(m_log_pool_config_size * USABLE_SIZE); + m_bytes_allocated_cap = effective_pool_size; + load_existing_entries(later); + m_cache_state->clean = m_dirty_log_entries.empty(); + m_cache_state->empty = m_log_entries.empty(); } ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries @@ -498,6 +739,7 @@ void ReplicatedWriteLog::shut_down(Context *on_finish) { { std::lock_guard locker(m_lock); ceph_assert(m_dirty_log_entries.size() == 0); + m_wake_up_enabled = false; m_cache_state->clean = true; m_log_entries.clear(); if (m_log_pool) { @@ -735,6 +977,7 @@ void ReplicatedWriteLog::aio_discard(uint64_t offset, uint64_t length, utime_t now = ceph_clock_now(); m_perfcounter->inc(l_librbd_rwl_discard, 1); Extents discard_extents = {{offset, length}}; + m_discard_granularity_bytes = discard_granularity_bytes; ceph_assert(m_initialized); diff --git a/src/librbd/cache/ReplicatedWriteLog.h b/src/librbd/cache/ReplicatedWriteLog.h index 9a0596eb066..c19d68da01f 100644 --- a/src/librbd/cache/ReplicatedWriteLog.h +++ b/src/librbd/cache/ReplicatedWriteLog.h @@ -262,6 +262,8 @@ private: ThreadPool m_thread_pool; ContextWQ m_work_queue; + uint32_t m_discard_granularity_bytes; + void perf_start(const std::string name); void perf_stop(); void log_perf(); @@ -270,6 +272,7 @@ private: void rwl_init(Context *on_finish, rwl::DeferredContexts &later); void update_image_cache_state(Context *on_finish); + void load_existing_entries(rwl::DeferredContexts &later); void wake_up(); void process_work(); diff --git a/src/librbd/cache/rwl/LogEntry.h b/src/librbd/cache/rwl/LogEntry.h index d7c2974477a..df34d8a95d0 100644 --- a/src/librbd/cache/rwl/LogEntry.h +++ b/src/librbd/cache/rwl/LogEntry.h @@ -36,7 +36,6 @@ public: virtual bool can_writeback() const { return false; } - // TODO: discard need to override this virtual bool can_retire() const { return false; } @@ -210,6 +209,9 @@ public: /* The bytes in the image this op makes dirty. */ return ram_entry.write_bytes; }; + bool can_retire() const override { + return this->completed; + } void copy_pmem_bl(bufferlist *out_bl) override { ceph_assert(false); } diff --git a/src/librbd/cache/rwl/Request.cc b/src/librbd/cache/rwl/Request.cc index 147fe0cce3d..bd8adefbcfa 100644 --- a/src/librbd/cache/rwl/Request.cc +++ b/src/librbd/cache/rwl/Request.cc @@ -606,6 +606,7 @@ void C_WriteSameRequest::setup_buffer_resources( if (pattern_length > buffer.allocation_size) { buffer.allocation_size = pattern_length; } + bytes_allocated += buffer.allocation_size; } template diff --git a/src/librbd/cache/rwl/Types.h b/src/librbd/cache/rwl/Types.h index a5f3dc2ec18..cf87391ca63 100644 --- a/src/librbd/cache/rwl/Types.h +++ b/src/librbd/cache/rwl/Types.h @@ -212,6 +212,23 @@ struct WriteLogPmemEntry { BlockExtent block_extent(); uint64_t get_offset_bytes(); uint64_t get_write_bytes(); + bool is_sync_point() { + return sync_point; + } + bool is_discard() { + return discard; + } + bool is_writesame() { + return writesame; + } + bool is_write() { + /* Log entry is a basic write */ + return !is_sync_point() && !is_discard() && !is_writesame(); + } + bool is_writer() { + /* Log entry is any type that writes data */ + return is_write() || is_discard() || is_writesame(); + } friend std::ostream& operator<<(std::ostream& os, const WriteLogPmemEntry &entry); }; -- 2.47.3