}
}
+/*
+ * Loads the log entries from an existing log.
+ *
+ * Creates the in-memory structures to represent the state of the
+ * re-opened log.
+ *
+ * Finds the last appended sync point, and any sync points referred to
+ * in log entries, but missing from the log. These missing sync points
+ * are created and scheduled for append. Some rudimentary consistency
+ * checking is done.
+ *
+ * Rebuilds the m_blocks_to_log_entries map, to make log entries
+ * readable.
+ *
+ * Places all writes on the dirty entries list, which causes them all
+ * to be flushed.
+ *
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::load_existing_entries(DeferredContexts &later) {
+ TOID(struct WriteLogPoolRoot) pool_root;
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
+ uint64_t entry_index = m_first_valid_entry;
+ /* The map below allows us to find sync point log entries by sync
+ * gen number, which is necessary so write entries can be linked to
+ * their sync points. */
+ std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
+ /* The map below tracks sync points referred to in writes but not
+ * appearing in the sync_point_entries map. We'll use this to
+ * determine which sync points are missing and need to be
+ * created. */
+ std::map<uint64_t, bool> missing_sync_points;
+
+ /*
+ * Read the existing log entries. Construct an in-memory log entry
+ * object of the appropriate type for each. Add these to the global
+ * log entries list.
+ *
+ * Write entries will not link to their sync points yet. We'll do
+ * that in the next pass. Here we'll accumulate a map of sync point
+ * gen numbers that are referred to in writes but do not appearing in
+ * the log.
+ */
+ while (entry_index != m_first_free_entry) {
+ WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index];
+ std::shared_ptr<GenericLogEntry> log_entry = nullptr;
+ bool writer = pmem_entry->is_writer();
+
+ ceph_assert(pmem_entry->entry_index == entry_index);
+ if (pmem_entry->is_sync_point()) {
+ ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+ << " is a sync point. pmem_entry=[" << *pmem_entry << "]" << dendl;
+ auto sync_point_entry = std::make_shared<SyncPointLogEntry>(pmem_entry->sync_gen_number);
+ log_entry = sync_point_entry;
+ sync_point_entries[pmem_entry->sync_gen_number] = sync_point_entry;
+ missing_sync_points.erase(pmem_entry->sync_gen_number);
+ m_current_sync_gen = pmem_entry->sync_gen_number;
+ } else if (pmem_entry->is_write()) {
+ ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+ << " is a write. pmem_entry=[" << *pmem_entry << "]" << dendl;
+ auto write_entry =
+ std::make_shared<WriteLogEntry>(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes);
+ write_entry->pmem_buffer = D_RW(pmem_entry->write_data);
+ log_entry = write_entry;
+ } else if (pmem_entry->is_writesame()) {
+ ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+ << " is a write same. pmem_entry=[" << *pmem_entry << "]" << dendl;
+ auto ws_entry =
+ std::make_shared<WriteSameLogEntry>(nullptr, pmem_entry->image_offset_bytes,
+ pmem_entry->write_bytes, pmem_entry->ws_datalen);
+ ws_entry->pmem_buffer = D_RW(pmem_entry->write_data);
+ log_entry = ws_entry;
+ } else if (pmem_entry->is_discard()) {
+ ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+ << " is a discard. pmem_entry=[" << *pmem_entry << "]" << dendl;
+ auto discard_entry =
+ std::make_shared<DiscardLogEntry>(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes,
+ m_discard_granularity_bytes);
+ log_entry = discard_entry;
+ } else {
+ lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index
+ << ", pmem_entry=[" << *pmem_entry << "]" << dendl;
+ }
+
+ if (writer) {
+ ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
+ << " writes. pmem_entry=[" << *pmem_entry << "]" << dendl;
+ if (!sync_point_entries[pmem_entry->sync_gen_number]) {
+ missing_sync_points[pmem_entry->sync_gen_number] = true;
+ }
+ }
+
+ log_entry->ram_entry = *pmem_entry;
+ log_entry->pmem_entry = pmem_entry;
+ log_entry->log_entry_index = entry_index;
+ log_entry->completed = true;
+
+ m_log_entries.push_back(log_entry);
+
+ entry_index = (entry_index + 1) % m_total_log_entries;
+ }
+
+ /* Create missing sync points. These must not be appended until the
+ * entry reload is complete and the write map is up to
+ * date. Currently this is handled by the deferred contexts object
+ * passed to new_sync_point(). These contexts won't be completed
+ * until this function returns. */
+ for (auto &kv : missing_sync_points) {
+ ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl;
+ if (0 == m_current_sync_gen) {
+ /* The unlikely case where the log contains writing entries, but no sync
+ * points (e.g. because they were all retired) */
+ m_current_sync_gen = kv.first-1;
+ }
+ ceph_assert(kv.first == m_current_sync_gen+1);
+ init_flush_new_sync_point(later);
+ ceph_assert(kv.first == m_current_sync_gen);
+ sync_point_entries[kv.first] = m_current_sync_point->log_entry;;
+ }
+
+ /*
+ * Iterate over the log entries again (this time via the global
+ * entries list), connecting write entries to their sync points and
+ * updating the sync point stats.
+ *
+ * Add writes to the write log map.
+ */
+ std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr;
+ for (auto &log_entry : m_log_entries) {
+ if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) {
+ /* This entry is one of the types that write */
+ auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry);
+ if (gen_write_entry) {
+ auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number];
+ if (!sync_point_entry) {
+ lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl;
+ ceph_assert(false);
+ } else {
+ gen_write_entry->sync_point_entry = sync_point_entry;
+ sync_point_entry->writes++;
+ sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes;
+ sync_point_entry->writes_completed++;
+ m_blocks_to_log_entries.add_log_entry(gen_write_entry);
+ /* This entry is only dirty if its sync gen number is > the flushed
+ * sync gen number from the root object. */
+ if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
+ m_dirty_log_entries.push_back(log_entry);
+ m_bytes_dirty += gen_write_entry->bytes_dirty();
+ } else {
+ gen_write_entry->set_flushed(true);
+ sync_point_entry->writes_flushed++;
+ }
+ if (log_entry->write_bytes() == log_entry->bytes_dirty()) {
+ /* This entry is a basic write */
+ uint64_t bytes_allocated = MIN_WRITE_ALLOC_SIZE;
+ if (gen_write_entry->ram_entry.write_bytes > bytes_allocated) {
+ bytes_allocated = gen_write_entry->ram_entry.write_bytes;
+ }
+ m_bytes_allocated += bytes_allocated;
+ m_bytes_cached += gen_write_entry->ram_entry.write_bytes;
+ }
+ }
+ }
+ } else {
+ /* This entry is sync point entry */
+ auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry);
+ if (sync_point_entry) {
+ if (previous_sync_point_entry) {
+ previous_sync_point_entry->next_sync_point_entry = sync_point_entry;
+ if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
+ sync_point_entry->prior_sync_point_flushed = false;
+ ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed ||
+ (0 == previous_sync_point_entry->writes) ||
+ (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed));
+ } else {
+ sync_point_entry->prior_sync_point_flushed = true;
+ ceph_assert(previous_sync_point_entry->prior_sync_point_flushed);
+ ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed);
+ }
+ previous_sync_point_entry = sync_point_entry;
+ } else {
+ /* There are no previous sync points, so we'll consider them flushed */
+ sync_point_entry->prior_sync_point_flushed = true;
+ }
+ ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl;
+ }
+ }
+ }
+ if (0 == m_current_sync_gen) {
+ /* If a re-opened log was completely flushed, we'll have found no sync point entries here,
+ * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync
+ * point recorded in the log. */
+ m_current_sync_gen = m_flushed_sync_gen;
+ }
+}
+
template <typename I>
void ReplicatedWriteLog<I>::rwl_init(Context *on_finish, DeferredContexts &later) {
CephContext *cct = m_image_ctx.cct;
} TX_FINALLY {
} TX_END;
} else {
- // TODO: load existed cache. This will be covered in later PR.
+ m_cache_state->present = true;
+ /* Open existing pool */
+ if ((m_log_pool =
+ pmemobj_open(m_log_pool_name.c_str(),
+ m_rwl_pool_layout_name)) == NULL) {
+ lderr(cct) << "failed to open pool (" << m_log_pool_name << "): "
+ << pmemobj_errormsg() << dendl;
+ on_finish->complete(-errno);
+ return;
+ }
+ pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+ if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) {
+ // TODO: will handle upgrading version in the future
+ lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version
+ << " expected " << RWL_POOL_VERSION << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+ if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
+ lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size
+ << " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
+ on_finish->complete(-EINVAL);
+ return;
+ }
+ m_log_pool_actual_size = D_RO(pool_root)->pool_size;
+ m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
+ m_total_log_entries = D_RO(pool_root)->num_log_entries;
+ m_first_free_entry = D_RO(pool_root)->first_free_entry;
+ m_first_valid_entry = D_RO(pool_root)->first_valid_entry;
+ if (m_first_free_entry < m_first_valid_entry) {
+ /* Valid entries wrap around the end of the ring, so first_free is lower
+ * than first_valid. If first_valid was == first_free+1, the entry at
+ * first_free would be empty. The last entry is never used, so in
+ * that case there would be zero free log entries. */
+ m_free_log_entries = m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1;
+ } else {
+ /* first_valid is <= first_free. If they are == we have zero valid log
+ * entries, and n-1 free log entries */
+ m_free_log_entries = m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
+ }
+ size_t effective_pool_size = (size_t)(m_log_pool_config_size * USABLE_SIZE);
+ m_bytes_allocated_cap = effective_pool_size;
+ load_existing_entries(later);
+ m_cache_state->clean = m_dirty_log_entries.empty();
+ m_cache_state->empty = m_log_entries.empty();
}
ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
{
std::lock_guard locker(m_lock);
ceph_assert(m_dirty_log_entries.size() == 0);
+ m_wake_up_enabled = false;
m_cache_state->clean = true;
m_log_entries.clear();
if (m_log_pool) {
utime_t now = ceph_clock_now();
m_perfcounter->inc(l_librbd_rwl_discard, 1);
Extents discard_extents = {{offset, length}};
+ m_discard_granularity_bytes = discard_granularity_bytes;
ceph_assert(m_initialized);