]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd/cache: Refactor common code for RWL and SSD 37406/head
authorMahati Chamarthy <mahati.chamarthy@intel.com>
Wed, 23 Sep 2020 09:32:17 +0000 (15:02 +0530)
committerMahati Chamarthy <mahati.chamarthy@intel.com>
Thu, 1 Oct 2020 11:24:46 +0000 (16:54 +0530)
... from AbstractWriteLog class

Signed-off-by: Lisa Li <xiaoyan.li@intel.com>
Signed-off-by: Mahati Chamarthy <mahati.chamarthy@intel.com>
Signed-off-by: Changcheng Liu <changcheng.liu@intel.com>
src/librbd/cache/pwl/AbstractWriteLog.cc
src/librbd/cache/pwl/AbstractWriteLog.h
src/librbd/cache/pwl/ReplicatedWriteLog.cc
src/librbd/cache/pwl/ReplicatedWriteLog.h
src/librbd/cache/pwl/Request.cc

index 8bb111ed5c5fbb7bb245642a195815663a2889c8..8cbdc6fce6cd017b6b02e7659289ee385a3510b7 100644 (file)
@@ -1,7 +1,6 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
-#include <libpmemobj.h>
 #include "AbstractWriteLog.h"
 #include "include/buffer.h"
 #include "include/Context.h"
@@ -37,30 +36,28 @@ using namespace librbd::cache::pwl;
 typedef AbstractWriteLog<ImageCtx>::Extent Extent;
 typedef AbstractWriteLog<ImageCtx>::Extents Extents;
 
-const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION;
-
 template <typename I>
 AbstractWriteLog<I>::AbstractWriteLog(I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state)
-  : m_cache_state(cache_state),
+  : m_write_log_guard(image_ctx.cct),
+    m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name(
+      "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
+    m_blockguard_lock(ceph::make_mutex(util::unique_lock_name(
+      "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))),
+    m_thread_pool(
+        image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool", "tp_pwl", 4, ""),
+    m_cache_state(cache_state),
     m_pwl_pool_layout_name(POBJ_LAYOUT_NAME(rbd_pwl)),
     m_image_ctx(image_ctx),
     m_log_pool_config_size(DEFAULT_POOL_SIZE),
-    m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct),
+    m_image_writeback(image_ctx),
     m_log_retire_lock(ceph::make_mutex(util::unique_lock_name(
       "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))),
     m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"),
-    m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name(
-      "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
-    m_log_append_lock(ceph::make_mutex(util::unique_lock_name(
+       m_log_append_lock(ceph::make_mutex(util::unique_lock_name(
       "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))),
     m_lock(ceph::make_mutex(util::unique_lock_name(
       "librbd::cache::pwl::AbstractWriteLog::m_lock", this))),
-    m_blockguard_lock(ceph::make_mutex(util::unique_lock_name(
-      "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))),
     m_blocks_to_log_entries(image_ctx.cct),
-    m_thread_pool(image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool", "tp_pwl",
-                  4,
-                  ""),
     m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue",
                  ceph::make_timespan(
                    image_ctx.config.template get_val<uint64_t>(
@@ -84,7 +81,6 @@ AbstractWriteLog<I>::~AbstractWriteLog() {
     ceph_assert(m_ops_to_append.size() == 0);
     ceph_assert(m_flush_ops_in_flight == 0);
 
-    m_log_pool = nullptr;
     delete m_cache_state;
     m_cache_state = nullptr;
   }
@@ -335,56 +331,12 @@ void AbstractWriteLog<I>::arm_periodic_stats() {
   }
 }
 
-/*
- * Loads the log entries from an existing log.
- *
- * Creates the in-memory structures to represent the state of the
- * re-opened log.
- *
- * Finds the last appended sync point, and any sync points referred to
- * in log entries, but missing from the log. These missing sync points
- * are created and scheduled for append. Some rudimentary consistency
- * checking is done.
- *
- * Rebuilds the m_blocks_to_log_entries map, to make log entries
- * readable.
- *
- * Places all writes on the dirty entries list, which causes them all
- * to be flushed.
- *
- */
 template <typename I>
-void AbstractWriteLog<I>::load_existing_entries(DeferredContexts &later) {
-  TOID(struct WriteLogPoolRoot) pool_root;
-  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
-  struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
-  uint64_t entry_index = m_first_valid_entry;
-  /* The map below allows us to find sync point log entries by sync
-   * gen number, which is necessary so write entries can be linked to
-   * their sync points. */
-  std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
-  /* The map below tracks sync points referred to in writes but not
-   * appearing in the sync_point_entries map.  We'll use this to
-   * determine which sync points are missing and need to be
-   * created. */
-  std::map<uint64_t, bool> missing_sync_points;
-
-  /*
-   * Read the existing log entries. Construct an in-memory log entry
-   * object of the appropriate type for each. Add these to the global
-   * log entries list.
-   *
-   * Write entries will not link to their sync points yet. We'll do
-   * that in the next pass. Here we'll accumulate a map of sync point
-   * gen numbers that are referred to in writes but do not appearing in
-   * the log.
-   */
-  while (entry_index != m_first_free_entry) {
-    WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index];
-    std::shared_ptr<GenericLogEntry> log_entry = nullptr;
+void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> log_entry,
+    WriteLogPmemEntry *pmem_entry, std::map<uint64_t, bool> &missing_sync_points,
+    std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
+    int entry_index) {
     bool writer = pmem_entry->is_writer();
-
-    ceph_assert(pmem_entry->entry_index == entry_index);
     if (pmem_entry->is_sync_point()) {
       ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
                                  << " is a sync point. pmem_entry=[" << *pmem_entry << "]" << dendl;
@@ -398,7 +350,7 @@ void AbstractWriteLog<I>::load_existing_entries(DeferredContexts &later) {
                                  << " is a write. pmem_entry=[" << *pmem_entry << "]" << dendl;
       auto write_entry =
         std::make_shared<WriteLogEntry>(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes);
-      write_entry->pmem_buffer = D_RW(pmem_entry->write_data);
+      write_data_to_buffer(write_entry, pmem_entry);
       log_entry = write_entry;
     } else if (pmem_entry->is_writesame()) {
       ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
@@ -406,7 +358,7 @@ void AbstractWriteLog<I>::load_existing_entries(DeferredContexts &later) {
       auto ws_entry =
         std::make_shared<WriteSameLogEntry>(nullptr, pmem_entry->image_offset_bytes,
                                             pmem_entry->write_bytes, pmem_entry->ws_datalen);
-      ws_entry->pmem_buffer = D_RW(pmem_entry->write_data);
+      write_data_to_buffer(ws_entry, pmem_entry);
       log_entry = ws_entry;
     } else if (pmem_entry->is_discard()) {
       ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
@@ -427,17 +379,12 @@ void AbstractWriteLog<I>::load_existing_entries(DeferredContexts &later) {
         missing_sync_points[pmem_entry->sync_gen_number] = true;
       }
     }
+}
 
-    log_entry->ram_entry = *pmem_entry;
-    log_entry->pmem_entry = pmem_entry;
-    log_entry->log_entry_index = entry_index;
-    log_entry->completed = true;
-
-    m_log_entries.push_back(log_entry);
-
-    entry_index = (entry_index + 1) % m_total_log_entries;
-  }
-
+template <typename I>
+void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points,
+    std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
+    DeferredContexts &later) {
   /* Create missing sync points. These must not be appended until the
    * entry reload is complete and the write map is up to
    * date. Currently this is handled by the deferred contexts object
@@ -536,7 +483,6 @@ template <typename I>
 void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << dendl;
-  TOID(struct WriteLogPoolRoot) pool_root;
   ceph_assert(m_cache_state);
   std::lock_guard locker(m_lock);
   ceph_assert(!m_initialized);
@@ -550,14 +496,8 @@ void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later)
   std::string log_poolset_name = pwl_path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".poolset";
   m_log_pool_config_size = max(m_cache_state->size, MIN_POOL_SIZE);
 
-  if (access(log_poolset_name.c_str(), F_OK) == 0) {
-    m_log_pool_name = log_poolset_name;
-    m_log_is_poolset = true;
-  } else {
-    m_log_pool_name = log_pool_name;
-    ldout(cct, 5) << "Poolset file " << log_poolset_name
-                  << " not present (or can't open). Using unreplicated pool" << dendl;
-  }
+  m_log_pool_name = log_pool_name;
+  get_pool_name(log_poolset_name);
 
   if ((!m_cache_state->present) &&
       (access(m_log_pool_name.c_str(), F_OK) == 0)) {
@@ -573,113 +513,7 @@ void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later)
     }
   }
 
-  if (access(m_log_pool_name.c_str(), F_OK) != 0) {
-    if ((m_log_pool =
-         pmemobj_create(m_log_pool_name.c_str(),
-                        m_pwl_pool_layout_name,
-                        m_log_pool_config_size,
-                        (S_IWUSR | S_IRUSR))) == NULL) {
-      lderr(cct) << "failed to create pool (" << m_log_pool_name << ")"
-                 << pmemobj_errormsg() << dendl;
-      m_cache_state->present = false;
-      m_cache_state->clean = true;
-      m_cache_state->empty = true;
-      /* TODO: filter/replace errnos that are meaningless to the caller */
-      on_finish->complete(-errno);
-      return;
-    }
-    m_cache_state->present = true;
-    m_cache_state->clean = true;
-    m_cache_state->empty = true;
-    pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
-
-    /* new pool, calculate and store metadata */
-    size_t effective_pool_size = (size_t)(m_log_pool_config_size * USABLE_SIZE);
-    size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogPmemEntry);
-    uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size);
-    if (num_small_writes > MAX_LOG_ENTRIES) {
-      num_small_writes = MAX_LOG_ENTRIES;
-    }
-    if (num_small_writes <= 2) {
-      lderr(cct) << "num_small_writes needs to > 2" << dendl;
-      on_finish->complete(-EINVAL);
-      return;
-    }
-    m_log_pool_actual_size = m_log_pool_config_size;
-    m_bytes_allocated_cap = effective_pool_size;
-    /* Log ring empty */
-    m_first_free_entry = 0;
-    m_first_valid_entry = 0;
-    TX_BEGIN(m_log_pool) {
-      TX_ADD(pool_root);
-      D_RW(pool_root)->header.layout_version = RWL_POOL_VERSION;
-      D_RW(pool_root)->log_entries =
-        TX_ZALLOC(struct WriteLogPmemEntry,
-                  sizeof(struct WriteLogPmemEntry) * num_small_writes);
-      D_RW(pool_root)->pool_size = m_log_pool_actual_size;
-      D_RW(pool_root)->flushed_sync_gen = m_flushed_sync_gen;
-      D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE;
-      D_RW(pool_root)->num_log_entries = num_small_writes;
-      D_RW(pool_root)->first_free_entry = m_first_free_entry;
-      D_RW(pool_root)->first_valid_entry = m_first_valid_entry;
-    } TX_ONCOMMIT {
-      m_total_log_entries = D_RO(pool_root)->num_log_entries;
-      m_free_log_entries = D_RO(pool_root)->num_log_entries - 1; // leave one free
-    } TX_ONABORT {
-      m_total_log_entries = 0;
-      m_free_log_entries = 0;
-      lderr(cct) << "failed to initialize pool (" << m_log_pool_name << ")" << dendl;
-      on_finish->complete(-pmemobj_tx_errno());
-      return;
-    } TX_FINALLY {
-    } TX_END;
-  } else {
-    m_cache_state->present = true;
-    /* Open existing pool */
-    if ((m_log_pool =
-         pmemobj_open(m_log_pool_name.c_str(),
-                      m_pwl_pool_layout_name)) == NULL) {
-      lderr(cct) << "failed to open pool (" << m_log_pool_name << "): "
-                 << pmemobj_errormsg() << dendl;
-      on_finish->complete(-errno);
-      return;
-    }
-    pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
-    if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) {
-      // TODO: will handle upgrading version in the future
-      lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version
-                 << " expected " << RWL_POOL_VERSION << dendl;
-      on_finish->complete(-EINVAL);
-      return;
-    }
-    if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
-      lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size
-                 << " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
-      on_finish->complete(-EINVAL);
-      return;
-    }
-    m_log_pool_actual_size = D_RO(pool_root)->pool_size;
-    m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
-    m_total_log_entries = D_RO(pool_root)->num_log_entries;
-    m_first_free_entry = D_RO(pool_root)->first_free_entry;
-    m_first_valid_entry = D_RO(pool_root)->first_valid_entry;
-    if (m_first_free_entry < m_first_valid_entry) {
-      /* Valid entries wrap around the end of the ring, so first_free is lower
-       * than first_valid.  If first_valid was == first_free+1, the entry at
-       * first_free would be empty. The last entry is never used, so in
-       * that case there would be zero free log entries. */
-      m_free_log_entries = m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1;
-    } else {
-      /* first_valid is <= first_free. If they are == we have zero valid log
-       * entries, and n-1 free log entries */
-      m_free_log_entries = m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
-    }
-    size_t effective_pool_size = (size_t)(m_log_pool_config_size * USABLE_SIZE);
-    m_bytes_allocated_cap = effective_pool_size;
-    load_existing_entries(later);
-    m_cache_state->clean = m_dirty_log_entries.empty();
-    m_cache_state->empty = m_log_entries.empty();
-  }
+  initialize_pool(on_finish, later);
 
   ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
                << " log entries, " << m_free_log_entries << " of which are free."
@@ -770,31 +604,9 @@ void AbstractWriteLog<I>::shut_down(Context *on_finish) {
         m_wake_up_enabled = false;
         m_cache_state->clean = true;
         m_log_entries.clear();
-        if (m_log_pool) {
-          ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
-          pmemobj_close(m_log_pool);
-        }
-        if (m_cache_state->clean) {
-          if (m_log_is_poolset) {
-            ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
-          } else {
-            ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << m_log_pool_name << dendl;
-            if (remove(m_log_pool_name.c_str()) != 0) {
-              lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << m_log_pool_name << "\": "
-                                     << pmemobj_errormsg() << dendl;
-            } else {
-              m_cache_state->clean = true;
-              m_cache_state->empty = true;
-              m_cache_state->present = false;
-            }
-          }
-        } else {
-          if (m_log_is_poolset) {
-            ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
-          } else {
-            ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << m_log_pool_name << dendl;
-          }
-        }
+
+        remove_pool_file();
+
         if (m_perfcounter) {
           perf_stop();
         }
@@ -1338,62 +1150,40 @@ void AbstractWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell)
   ldout(cct, 20) << "exit" << dendl;
 }
 
-/*
- * Performs the log event append operation for all of the scheduled
- * events.
- */
 template <typename I>
-void AbstractWriteLog<I>::append_scheduled_ops(void)
+void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_remain,
+                                         bool &appending, bool isRWL)
 {
-  GenericLogOperations ops;
-  int append_result = 0;
-  bool ops_remain = false;
-  bool appending = false; /* true if we set m_appending */
-  ldout(m_image_ctx.cct, 20) << dendl;
-  do {
-    ops.clear();
-
-    {
-      std::lock_guard locker(m_lock);
-      if (!appending && m_appending) {
-        /* Another thread is appending */
-        ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
-        return;
+  const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION
+    : MAX_WRITES_PER_SYNC_POINT;
+  {
+    std::lock_guard locker(m_lock);
+    if (!appending && m_appending) {
+      /* Another thread is appending */
+      ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
+      return;
+    }
+    if (m_ops_to_append.size()) {
+      appending = true;
+      m_appending = true;
+      auto last_in_batch = m_ops_to_append.begin();
+      unsigned int ops_to_append = m_ops_to_append.size();
+      if (ops_to_append > OPS_APPENDED) {
+        ops_to_append = OPS_APPENDED;
       }
-      if (m_ops_to_append.size()) {
-        appending = true;
-        m_appending = true;
-        auto last_in_batch = m_ops_to_append.begin();
-        unsigned int ops_to_append = m_ops_to_append.size();
-        if (ops_to_append > OPS_APPENDED_TOGETHER) {
-          ops_to_append = OPS_APPENDED_TOGETHER;
-        }
-        std::advance(last_in_batch, ops_to_append);
-        ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
-        ops_remain = true; /* Always check again before leaving */
-        ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
-                                   << m_ops_to_append.size() << " remain" << dendl;
-      } else {
-        ops_remain = false;
-        if (appending) {
-          appending = false;
-          m_appending = false;
-        }
+      std::advance(last_in_batch, ops_to_append);
+      ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
+      ops_remain = true; /* Always check again before leaving */
+      ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
+                                 << m_ops_to_append.size() << " remain" << dendl;
+    } else if (isRWL) {
+      ops_remain = false;
+      if (appending) {
+        appending = false;
+        m_appending = false;
       }
     }
-
-    if (ops.size()) {
-      std::lock_guard locker(m_log_append_lock);
-      alloc_op_log_entries(ops);
-      append_result = append_op_log_entries(ops);
-    }
-
-    int num_ops = ops.size();
-    if (num_ops) {
-      /* New entries may be flushable. Completion will wake up flusher. */
-      complete_op_log_entries(std::move(ops), append_result);
-    }
-  } while (ops_remain);
+  }
 }
 
 template <typename I>
@@ -1409,40 +1199,12 @@ void AbstractWriteLog<I>::enlist_op_appender()
   m_work_queue.queue(append_ctx);
 }
 
-/*
- * Takes custody of ops. They'll all get their log entries appended,
- * and have their on_write_persist contexts completed once they and
- * all prior log entries are persisted everywhere.
- */
-template <typename I>
-void AbstractWriteLog<I>::schedule_append(GenericLogOperations &ops)
-{
-  bool need_finisher;
-  GenericLogOperationsVector appending;
-
-  std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
-  {
-    std::lock_guard locker(m_lock);
-
-    need_finisher = m_ops_to_append.empty() && !m_appending;
-    m_ops_to_append.splice(m_ops_to_append.end(), ops);
-  }
-
-  if (need_finisher) {
-    enlist_op_appender();
-  }
-
-  for (auto &op : appending) {
-    op->appending();
-  }
-}
-
 template <typename I>
 void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops)
 {
   GenericLogOperations to_append(ops.begin(), ops.end());
 
-  schedule_append(to_append);
+  schedule_append_ops(to_append);
 }
 
 template <typename I>
@@ -1450,263 +1212,7 @@ void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op)
 {
   GenericLogOperations to_append { op };
 
-  schedule_append(to_append);
-}
-
-const unsigned long int ops_flushed_together = 4;
-/*
- * Performs the pmem buffer flush on all scheduled ops, then schedules
- * the log event append operation for all of them.
- */
-template <typename I>
-void AbstractWriteLog<I>::flush_then_append_scheduled_ops(void)
-{
-  GenericLogOperations ops;
-  bool ops_remain = false;
-  ldout(m_image_ctx.cct, 20) << dendl;
-  do {
-    {
-      ops.clear();
-      std::lock_guard locker(m_lock);
-      if (m_ops_to_flush.size()) {
-        auto last_in_batch = m_ops_to_flush.begin();
-        unsigned int ops_to_flush = m_ops_to_flush.size();
-        if (ops_to_flush > ops_flushed_together) {
-          ops_to_flush = ops_flushed_together;
-        }
-        ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
-        std::advance(last_in_batch, ops_to_flush);
-        ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
-        ops_remain = !m_ops_to_flush.empty();
-        ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", "
-                                   << m_ops_to_flush.size() << " remain" << dendl;
-      } else {
-        ops_remain = false;
-      }
-    }
-    if (ops_remain) {
-      enlist_op_flusher();
-    }
-
-    /* Ops subsequently scheduled for flush may finish before these,
-     * which is fine. We're unconcerned with completion order until we
-     * get to the log message append step. */
-    if (ops.size()) {
-      flush_pmem_buffer(ops);
-      schedule_append(ops);
-    }
-  } while (ops_remain);
-  append_scheduled_ops();
-}
-
-template <typename I>
-void AbstractWriteLog<I>::enlist_op_flusher()
-{
-  m_async_flush_ops++;
-  m_async_op_tracker.start_op();
-  Context *flush_ctx = new LambdaContext([this](int r) {
-      flush_then_append_scheduled_ops();
-      m_async_flush_ops--;
-      m_async_op_tracker.finish_op();
-    });
-  m_work_queue.queue(flush_ctx);
-}
-
-/*
- * Takes custody of ops. They'll all get their pmem blocks flushed,
- * then get their log entries appended.
- */
-template <typename I>
-void AbstractWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVector &ops)
-{
-  GenericLogOperations to_flush(ops.begin(), ops.end());
-  bool need_finisher;
-  ldout(m_image_ctx.cct, 20) << dendl;
-  {
-    std::lock_guard locker(m_lock);
-
-    need_finisher = m_ops_to_flush.empty();
-    m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush);
-  }
-
-  if (need_finisher) {
-    enlist_op_flusher();
-  }
-}
-
-/*
- * Flush the pmem regions for the data blocks of a set of operations
- *
- * V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I>
- */
-template <typename I>
-template <typename V>
-void AbstractWriteLog<I>::flush_pmem_buffer(V& ops)
-{
-  for (auto &operation : ops) {
-    operation->flush_pmem_buf_to_cache(m_log_pool);
-  }
-
-  /* Drain once for all */
-  pmemobj_drain(m_log_pool);
-
-  utime_t now = ceph_clock_now();
-  for (auto &operation : ops) {
-    if (operation->reserved_allocated()) {
-      operation->buf_persist_comp_time = now;
-    } else {
-      ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
-    }
-  }
-}
-
-/*
- * Allocate the (already reserved) write log entries for a set of operations.
- *
- * Locking:
- * Acquires lock
- */
-template <typename I>
-void AbstractWriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops)
-{
-  TOID(struct WriteLogPoolRoot) pool_root;
-  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
-  struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
-
-  ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock));
-
-  /* Allocate the (already reserved) log entries */
-  std::lock_guard locker(m_lock);
-
-  for (auto &operation : ops) {
-    uint32_t entry_index = m_first_free_entry;
-    m_first_free_entry = (m_first_free_entry + 1) % m_total_log_entries;
-    auto &log_entry = operation->get_log_entry();
-    log_entry->log_entry_index = entry_index;
-    log_entry->ram_entry.entry_index = entry_index;
-    log_entry->pmem_entry = &pmem_log_entries[entry_index];
-    log_entry->ram_entry.entry_valid = 1;
-    m_log_entries.push_back(log_entry);
-    ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
-  }
-}
-
-/*
- * Flush the persistent write log entries set of ops. The entries must
- * be contiguous in persistent memory.
- */
-template <typename I>
-void AbstractWriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops)
-{
-  if (ops.empty()) {
-    return;
-  }
-
-  if (ops.size() > 1) {
-    ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry);
-  }
-
-  ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
-                             << "start address="
-                             << ops.front()->get_log_entry()->pmem_entry << " "
-                             << "bytes="
-                             << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
-                             << dendl;
-  pmemobj_flush(m_log_pool,
-                ops.front()->get_log_entry()->pmem_entry,
-                ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)));
-}
-
-/*
- * Write and persist the (already allocated) write log entries and
- * data buffer allocations for a set of ops. The data buffer for each
- * of these must already have been persisted to its reserved area.
- */
-template <typename I>
-int AbstractWriteLog<I>::append_op_log_entries(GenericLogOperations &ops)
-{
-  CephContext *cct = m_image_ctx.cct;
-  GenericLogOperationsVector entries_to_flush;
-  TOID(struct WriteLogPoolRoot) pool_root;
-  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
-  int ret = 0;
-
-  ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock));
-
-  if (ops.empty()) {
-    return 0;
-  }
-  entries_to_flush.reserve(OPS_APPENDED_TOGETHER);
-
-  /* Write log entries to ring and persist */
-  utime_t now = ceph_clock_now();
-  for (auto &operation : ops) {
-    if (!entries_to_flush.empty()) {
-      /* Flush these and reset the list if the current entry wraps to the
-       * tail of the ring */
-      if (entries_to_flush.back()->get_log_entry()->log_entry_index >
-          operation->get_log_entry()->log_entry_index) {
-        ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
-                                   << "operation=[" << *operation << "]" << dendl;
-        flush_op_log_entries(entries_to_flush);
-        entries_to_flush.clear();
-        now = ceph_clock_now();
-      }
-    }
-    ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
-                               << operation->get_log_entry()->log_entry_index << " "
-                               << "from " << &operation->get_log_entry()->ram_entry << " "
-                               << "to " << operation->get_log_entry()->pmem_entry << " "
-                               << "operation=[" << *operation << "]" << dendl;
-    ldout(m_image_ctx.cct, 05) << "APPENDING: index="
-                               << operation->get_log_entry()->log_entry_index << " "
-                               << "operation=[" << *operation << "]" << dendl;
-    operation->log_append_time = now;
-    *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry;
-    ldout(m_image_ctx.cct, 20) << "APPENDING: index="
-                               << operation->get_log_entry()->log_entry_index << " "
-                               << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry
-                               << "]" << dendl;
-    entries_to_flush.push_back(operation);
-  }
-  flush_op_log_entries(entries_to_flush);
-
-  /* Drain once for all */
-  pmemobj_drain(m_log_pool);
-
-  /*
-   * Atomically advance the log head pointer and publish the
-   * allocations for all the data buffers they refer to.
-   */
-  utime_t tx_start = ceph_clock_now();
-  TX_BEGIN(m_log_pool) {
-    D_RW(pool_root)->first_free_entry = m_first_free_entry;
-    for (auto &operation : ops) {
-      if (operation->reserved_allocated()) {
-        auto write_op = (std::shared_ptr<WriteLogOperation>&) operation;
-        pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
-      } else {
-        ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
-      }
-    }
-  } TX_ONCOMMIT {
-  } TX_ONABORT {
-    lderr(cct) << "failed to commit " << ops.size()
-               << " log entries (" << m_log_pool_name << ")" << dendl;
-    ceph_assert(false);
-    ret = -EIO;
-  } TX_FINALLY {
-  } TX_END;
-
-  utime_t tx_end = ceph_clock_now();
-  m_perfcounter->tinc(l_librbd_pwl_append_tx_t, tx_end - tx_start);
-  m_perfcounter->hinc(
-    l_librbd_pwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
-  for (auto &operation : ops) {
-    operation->log_append_comp_time = tx_end;
-  }
-
-  return ret;
+  schedule_append_ops(to_append);
 }
 
 /*
@@ -1888,20 +1394,12 @@ void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
 }
 
 template <typename I>
-bool AbstractWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
+bool AbstractWriteLog<I>::check_allocation(C_BlockIORequestT *req,
+      uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
+      uint64_t &num_lanes, uint64_t &num_log_entries,
+      uint64_t &num_unpublished_reserves, uint64_t bytes_allocated_cap){
   bool alloc_succeeds = true;
   bool no_space = false;
-  uint64_t bytes_allocated = 0;
-  uint64_t bytes_cached = 0;
-  uint64_t bytes_dirtied = 0;
-  uint64_t num_lanes = 0;
-  uint64_t num_unpublished_reserves = 0;
-  uint64_t num_log_entries = 0;
-
-  // Setup buffer, and get all the number of required resources
-  req->setup_buffer_resources(bytes_cached, bytes_dirtied, bytes_allocated,
-                              num_lanes, num_log_entries, num_unpublished_reserves);
-
   {
     std::lock_guard locker(m_lock);
     if (m_free_lanes < num_lanes) {
@@ -1923,11 +1421,11 @@ bool AbstractWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
       no_space = true; /* Entries must be retired */
     }
     /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
-    if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
+    if (m_bytes_allocated + bytes_allocated > bytes_allocated_cap) {
       if (!req->has_io_waited_for_buffers()) {
         req->set_io_waited_for_entries(true);
         ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap="
-                                  << m_bytes_allocated_cap
+                                  << bytes_allocated_cap
                                   << ", allocated=" << m_bytes_allocated
                                   << ") in write [" << *req << "]" << dendl;
       }
@@ -1936,32 +1434,8 @@ bool AbstractWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
     }
   }
 
-  std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
   if (alloc_succeeds) {
-    for (auto &buffer : buffers) {
-      utime_t before_reserve = ceph_clock_now();
-      buffer.buffer_oid = pmemobj_reserve(m_log_pool,
-                                          &buffer.buffer_alloc_action,
-                                          buffer.allocation_size,
-                                          0 /* Object type */);
-      buffer.allocation_lat = ceph_clock_now() - before_reserve;
-      if (TOID_IS_NULL(buffer.buffer_oid)) {
-        if (!req->has_io_waited_for_buffers()) {
-          req->set_io_waited_for_entries(true);
-        }
-        ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: "
-                                  << pmemobj_errormsg() << ". "
-                                  << *req << dendl;
-        alloc_succeeds = false;
-        no_space = true; /* Entries need to be retired */
-        break;
-      } else {
-        buffer.allocated = true;
-      }
-      ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
-                                 << "." << buffer.buffer_oid.oid.off
-                                 << ", size=" << buffer.allocation_size << dendl;
-    }
+    reserve_pmem(req, alloc_succeeds, no_space);
   }
 
   if (alloc_succeeds) {
@@ -1981,23 +1455,13 @@ bool AbstractWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
     }
   }
 
-  if (!alloc_succeeds) {
-    /* On alloc failure, free any buffers we did allocate */
-    for (auto &buffer : buffers) {
-      if (buffer.allocated) {
-        pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1);
-      }
-    }
-    if (no_space) {
-      /* Expedite flushing and/or retiring */
-      std::lock_guard locker(m_lock);
-      m_alloc_failed_since_retire = true;
-      m_last_alloc_fail = ceph_clock_now();
-    }
+  if (!alloc_succeeds && no_space) {
+    /* Expedite flushing and/or retiring */
+    std::lock_guard locker(m_lock);
+    m_alloc_failed_since_retire = true;
+    m_last_alloc_fail = ceph_clock_now();
   }
 
-  req->set_allocated(alloc_succeeds);
-
   return alloc_succeeds;
 }
 
@@ -2047,73 +1511,6 @@ void AbstractWriteLog<I>::wake_up() {
     }), 0);
 }
 
-template <typename I>
-void AbstractWriteLog<I>::process_work() {
-  CephContext *cct = m_image_ctx.cct;
-  int max_iterations = 4;
-  bool wake_up_requested = false;
-  uint64_t aggressive_high_water_bytes = m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
-  uint64_t high_water_bytes = m_bytes_allocated_cap * RETIRE_HIGH_WATER;
-  uint64_t low_water_bytes = m_bytes_allocated_cap * RETIRE_LOW_WATER;
-  uint64_t aggressive_high_water_entries = m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER;
-  uint64_t high_water_entries = m_total_log_entries * RETIRE_HIGH_WATER;
-  uint64_t low_water_entries = m_total_log_entries * RETIRE_LOW_WATER;
-
-  ldout(cct, 20) << dendl;
-
-  do {
-    {
-      std::lock_guard locker(m_lock);
-      m_wake_up_requested = false;
-    }
-    if (m_alloc_failed_since_retire || m_invalidating ||
-        m_bytes_allocated > high_water_bytes ||
-        (m_log_entries.size() > high_water_entries)) {
-      int retired = 0;
-      utime_t started = ceph_clock_now();
-      ldout(m_image_ctx.cct, 10) << "alloc_fail=" << m_alloc_failed_since_retire
-                                 << ", allocated > high_water="
-                                 << (m_bytes_allocated > high_water_bytes)
-                                 << ", allocated_entries > high_water="
-                                 << (m_log_entries.size() > high_water_entries)
-                                 << dendl;
-      while (m_alloc_failed_since_retire || m_invalidating ||
-            (m_bytes_allocated > high_water_bytes) ||
-            (m_log_entries.size() > high_water_entries) ||
-            (((m_bytes_allocated > low_water_bytes) || (m_log_entries.size() > low_water_entries)) &&
-            (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
-        if (!retire_entries((m_shutting_down || m_invalidating ||
-           (m_bytes_allocated > aggressive_high_water_bytes) ||
-           (m_log_entries.size() > aggressive_high_water_entries))
-            ? MAX_ALLOC_PER_TRANSACTION
-            : MAX_FREE_PER_TRANSACTION)) {
-          break;
-        }
-        retired++;
-        dispatch_deferred_writes();
-        process_writeback_dirty_entries();
-      }
-      ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl;
-    }
-    dispatch_deferred_writes();
-    process_writeback_dirty_entries();
-
-    {
-      std::lock_guard locker(m_lock);
-      wake_up_requested = m_wake_up_requested;
-    }
-  } while (wake_up_requested && --max_iterations > 0);
-
-  {
-    std::lock_guard locker(m_lock);
-    m_wake_up_scheduled = false;
-    /* Reschedule if it's still requested */
-    if (m_wake_up_requested) {
-      wake_up();
-    }
-  }
-}
-
 template <typename I>
 bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) {
   CephContext *cct = m_image_ctx.cct;
@@ -2155,9 +1552,9 @@ bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_e
 }
 
 template <typename I>
-Context* AbstractWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<GenericLogEntry> log_entry) {
+Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
+                                                      bool invalidating) {
   CephContext *cct = m_image_ctx.cct;
-  bool invalidating = m_invalidating; // snapshot so we behave consistently
 
   ldout(cct, 20) << "" << dendl;
   ceph_assert(m_entry_reader_lock.is_locked());
@@ -2167,7 +1564,7 @@ Context* AbstractWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<GenericL
     m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
   }
   m_flush_ops_in_flight += 1;
-  /* For write same this is the bytes affected bt the flush op, not the bytes transferred */
+  /* For write same this is the bytes affected by the flush op, not the bytes transferred */
   m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
 
   /* Flush write completion action */
@@ -2204,19 +1601,7 @@ Context* AbstractWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<GenericL
         m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx);
       }
     });
-
-  if (invalidating) {
-    return ctx;
-  }
-  return new LambdaContext(
-    [this, log_entry, ctx](int r) {
-      m_image_ctx.op_work_queue->queue(new LambdaContext(
-        [this, log_entry, ctx](int r) {
-          ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                     << " " << *log_entry << dendl;
-          log_entry->writeback(m_image_writeback, ctx);
-        }), 0);
-    });
+  return ctx;
 }
 
 template <typename I>
@@ -2267,37 +1652,6 @@ void AbstractWriteLog<I>::process_writeback_dirty_entries() {
   }
 }
 
-/**
- * Update/persist the last flushed sync point in the log
- */
-template <typename I>
-void AbstractWriteLog<I>::persist_last_flushed_sync_gen()
-{
-  TOID(struct WriteLogPoolRoot) pool_root;
-  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
-  uint64_t flushed_sync_gen;
-
-  std::lock_guard append_locker(m_log_append_lock);
-  {
-    std::lock_guard locker(m_lock);
-    flushed_sync_gen = m_flushed_sync_gen;
-  }
-
-  if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
-    ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from "
-                               << D_RO(pool_root)->flushed_sync_gen << " to "
-                               << flushed_sync_gen << dendl;
-    TX_BEGIN(m_log_pool) {
-      D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
-    } TX_ONCOMMIT {
-    } TX_ONABORT {
-      lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl;
-      ceph_assert(false);
-    } TX_FINALLY {
-    } TX_END;
-  }
-}
-
 /* Returns true if the specified SyncPointLogEntry is considered flushed, and
  * the log will be updated to reflect this. */
 template <typename I>
@@ -2643,129 +1997,8 @@ bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_
   return log_entry->can_retire();
 }
 
-/**
- * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
- * that are eligible to be retired. Returns true if anything was
- * retired.
- */
-template <typename I>
-bool AbstractWriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
-  CephContext *cct = m_image_ctx.cct;
-  GenericLogEntriesVector retiring_entries;
-  uint32_t initial_first_valid_entry;
-  uint32_t first_valid_entry;
-
-  std::lock_guard retire_locker(m_log_retire_lock);
-  ldout(cct, 20) << "Look for entries to retire" << dendl;
-  {
-    /* Entry readers can't be added while we hold m_entry_reader_lock */
-    RWLock::WLocker entry_reader_locker(m_entry_reader_lock);
-    std::lock_guard locker(m_lock);
-    initial_first_valid_entry = m_first_valid_entry;
-    first_valid_entry = m_first_valid_entry;
-    auto entry = m_log_entries.front();
-    while (!m_log_entries.empty() &&
-           retiring_entries.size() < frees_per_tx &&
-           can_retire_entry(entry)) {
-      if (entry->log_entry_index != first_valid_entry) {
-        lderr(cct) << "Retiring entry index (" << entry->log_entry_index
-                   << ") and first valid log entry index (" << first_valid_entry
-                   << ") must be ==." << dendl;
-      }
-      ceph_assert(entry->log_entry_index == first_valid_entry);
-      first_valid_entry = (first_valid_entry + 1) % m_total_log_entries;
-      m_log_entries.pop_front();
-      retiring_entries.push_back(entry);
-      /* Remove entry from map so there will be no more readers */
-      if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) {
-        auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(entry);
-        if (gen_write_entry) {
-          m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
-        }
-      }
-      entry = m_log_entries.front();
-    }
-  }
-
-  if (retiring_entries.size()) {
-    ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl;
-    TOID(struct WriteLogPoolRoot) pool_root;
-    pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
-
-    utime_t tx_start;
-    utime_t tx_end;
-    /* Advance first valid entry and release buffers */
-    {
-      uint64_t flushed_sync_gen;
-      std::lock_guard append_locker(m_log_append_lock);
-      {
-        std::lock_guard locker(m_lock);
-        flushed_sync_gen = m_flushed_sync_gen;
-      }
-
-      tx_start = ceph_clock_now();
-      TX_BEGIN(m_log_pool) {
-        if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
-          ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from "
-                                     << D_RO(pool_root)->flushed_sync_gen << " to "
-                                     << flushed_sync_gen << dendl;
-          D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
-        }
-        D_RW(pool_root)->first_valid_entry = first_valid_entry;
-        for (auto &entry: retiring_entries) {
-          if (entry->write_bytes()) {
-            ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo
-                           << "." << entry->ram_entry.write_data.oid.off << dendl;
-            TX_FREE(entry->ram_entry.write_data);
-          } else {
-            ldout(cct, 20) << "Retiring non-write: " << *entry << dendl;
-          }
-        }
-      } TX_ONCOMMIT {
-      } TX_ONABORT {
-        lderr(cct) << "failed to commit free of" << retiring_entries.size() << " log entries (" << m_log_pool_name << ")" << dendl;
-        ceph_assert(false);
-      } TX_FINALLY {
-      } TX_END;
-      tx_end = ceph_clock_now();
-    }
-    m_perfcounter->tinc(l_librbd_pwl_retire_tx_t, tx_end - tx_start);
-    m_perfcounter->hinc(l_librbd_pwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), retiring_entries.size());
-
-    /* Update runtime copy of first_valid, and free entries counts */
-    {
-      std::lock_guard locker(m_lock);
-
-      ceph_assert(m_first_valid_entry == initial_first_valid_entry);
-      m_first_valid_entry = first_valid_entry;
-      m_free_log_entries += retiring_entries.size();
-      for (auto &entry: retiring_entries) {
-        if (entry->write_bytes()) {
-          ceph_assert(m_bytes_cached >= entry->write_bytes());
-          m_bytes_cached -= entry->write_bytes();
-          uint64_t entry_allocation_size = entry->write_bytes();
-          if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) {
-            entry_allocation_size = MIN_WRITE_ALLOC_SIZE;
-          }
-          ceph_assert(m_bytes_allocated >= entry_allocation_size);
-          m_bytes_allocated -= entry_allocation_size;
-        }
-      }
-      m_alloc_failed_since_retire = false;
-      wake_up();
-    }
-  } else {
-    ldout(cct, 20) << "Nothing to retire" << dendl;
-    return false;
-  }
-  return true;
-}
-
 } // namespace pwl
 } // namespace cache
 } // namespace librbd
 
 template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>;
-template void librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>:: \
-  flush_pmem_buffer(std::vector<std::shared_ptr< \
-    librbd::cache::pwl::GenericLogOperation>>&);
index d47980cc2fd43ba94dcc0be185fd2fc1d9a48eb4..5796e1b9df389346fbda77e93b6bacca095a6220 100644 (file)
@@ -15,6 +15,7 @@
 #include "librbd/cache/pwl/LogOperation.h"
 #include "librbd/cache/pwl/Request.h"
 #include "librbd/cache/pwl/LogMap.h"
+#include "librbd/cache/pwl/Types.h"
 #include <functional>
 #include <list>
 
@@ -29,10 +30,11 @@ namespace cache {
 
 namespace pwl {
 
-class SyncPointLogEntry;
+class GenericLogEntry;
 class GenericWriteLogEntry;
+class SyncPointLogEntry;
 class WriteLogEntry;
-class GenericLogEntry;
+struct WriteLogPmemEntry;
 
 typedef std::list<std::shared_ptr<WriteLogEntry>> WriteLogEntries;
 typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
@@ -65,24 +67,30 @@ public:
   typedef io::Extents Extents; 
 
   AbstractWriteLog(ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state);
-  ~AbstractWriteLog();
+  virtual ~AbstractWriteLog();
   AbstractWriteLog(const AbstractWriteLog&) = delete;
   AbstractWriteLog &operator=(const AbstractWriteLog&) = delete;
 
   /// IO methods
-  void read(Extents&& image_extents, ceph::bufferlist *bl,
+  void read(
+      Extents&& image_extents, ceph::bufferlist *bl,
       int fadvise_flags, Context *on_finish);
-  void write(Extents&& image_extents, ceph::bufferlist&& bl,
+  void write(
+      Extents&& image_extents, ceph::bufferlist&& bl,
       int fadvise_flags,
       Context *on_finish);
-  void discard(uint64_t offset, uint64_t length,
+  void discard(
+      uint64_t offset, uint64_t length,
       uint32_t discard_granularity_bytes,
       Context *on_finish);
-  void flush(io::FlushSource flush_source, Context *on_finish);
-  void writesame(uint64_t offset, uint64_t length,
+  void flush(
+      io::FlushSource flush_source, Context *on_finish);
+  void writesame(
+      uint64_t offset, uint64_t length,
       ceph::bufferlist&& bl,
       int fadvise_flags, Context *on_finish);
-  void compare_and_write(Extents&& image_extents,
+  void compare_and_write(
+      Extents&& image_extents,
       ceph::bufferlist&& cmp_bl, ceph::bufferlist&& bl,
       uint64_t *mismatch_offset,int fadvise_flags,
       Context *on_finish);
@@ -104,13 +112,13 @@ public:
   CephContext * get_context();
   void release_guarded_request(BlockGuardCell *cell);
   void release_write_lanes(C_BlockIORequestT *req);
-  bool alloc_resources(C_BlockIORequestT *req);
-  template <typename V>
-  void flush_pmem_buffer(V& ops);
+  virtual bool alloc_resources(C_BlockIORequestT *req) = 0;
+  virtual void setup_schedule_append(
+      pwl::GenericLogOperationsVector &ops, bool do_early_flush) = 0;
   void schedule_append(pwl::GenericLogOperationsVector &ops);
   void schedule_append(pwl::GenericLogOperationSharedPtr op);
-  void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops);
   void flush_new_sync_point(C_FlushRequestT *flush_req, pwl::DeferredContexts &later);
+
   std::shared_ptr<pwl::SyncPoint> get_current_sync_point() {
     return m_current_sync_point;
   }
@@ -134,9 +142,67 @@ public:
     return m_free_log_entries;
   }
   void add_into_log_map(pwl::GenericWriteLogEntries &log_entries);
-protected:
-  typedef std::list<pwl::C_WriteRequest<This> *> C_WriteRequests;
-  typedef std::list<pwl::C_BlockIORequest<This> *> C_BlockIORequests;
+
+private:
+ typedef std::list<pwl::C_WriteRequest<This> *> C_WriteRequests;
+ typedef std::list<pwl::C_BlockIORequest<This> *> C_BlockIORequests;
+
+ std::atomic<bool> m_initialized = {false};
+
+  uint64_t m_bytes_dirty = 0;     /* Total bytes yet to flush to RBD */
+  utime_t m_last_alloc_fail;      /* Entry or buffer allocation fail seen */
+
+  pwl::WriteLogGuard m_write_log_guard;
+
+  /* Starts at 0 for a new write log. Incremented on every flush. */
+  uint64_t m_current_sync_gen = 0;
+  /* Starts at 0 on each sync gen increase. Incremented before applied
+     to an operation */
+  uint64_t m_last_op_sequence_num = 0;
+
+  bool m_persist_on_write_until_flush = true;
+
+ /* Debug counters for the places m_async_op_tracker is used */
+  std::atomic<int> m_async_append_ops = {0};
+  std::atomic<int> m_async_complete_ops = {0};
+  std::atomic<int> m_async_null_flush_finish = {0};
+  std::atomic<int> m_async_process_work = {0};
+
+  /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
+  mutable ceph::mutex m_deferred_dispatch_lock;
+
+  /* Used in release/detain to make BlockGuard preserve submission order */
+  mutable ceph::mutex m_blockguard_lock;
+
+  /* Use m_blockguard_lock for the following 3 things */
+  bool m_barrier_in_progress = false;
+  BlockGuardCell *m_barrier_cell = nullptr;
+
+  bool m_wake_up_enabled = true;
+
+  Contexts m_flush_complete_contexts;
+
+  std::shared_ptr<pwl::SyncPoint> m_current_sync_point = nullptr;
+  bool m_persist_on_flush = false; /* If false, persist each write before completion */
+
+  int m_flush_ops_in_flight = 0;
+  int m_flush_bytes_in_flight = 0;
+  uint64_t m_lowest_flushing_sync_gen = 0;
+
+  /* Writes that have left the block guard, but are waiting for resources */
+  C_BlockIORequests m_deferred_ios;
+  /* Throttle writes concurrently allocating & replicating */
+  unsigned int m_free_lanes = pwl::MAX_CONCURRENT_WRITES;
+
+  /* Initialized from config, then set false during shutdown */
+  std::atomic<bool> m_periodic_stats_enabled = {false};
+  SafeTimer *m_timer = nullptr; /* Used with m_timer_lock */
+  mutable ceph::mutex *m_timer_lock = nullptr; /* Used with and by m_timer */
+  Context *m_timer_ctx = nullptr;
+
+  ThreadPool m_thread_pool;
+
+  uint32_t m_discard_granularity_bytes;
 
   BlockGuardCell* detain_guarded_request_helper(pwl::GuardedRequest &req);
   BlockGuardCell* detain_guarded_request_barrier_helper(pwl::GuardedRequest &req);
@@ -144,12 +210,34 @@ protected:
                               pwl::GuardedRequestFunctionContext *guarded_ctx,
                               bool is_barrier);
 
+  void perf_start(const std::string name);
+  void perf_stop();
+  void log_perf();
+  void periodic_stats();
+  void arm_periodic_stats();
+
+  void pwl_init(Context *on_finish, pwl::DeferredContexts &later);
+  void update_image_cache_state(Context *on_finish);
+
+  void flush_dirty_entries(Context *on_finish);
+  bool can_flush_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
+  bool handle_flushed_sync_point(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
+  void sync_point_writer_flushed(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
+
+  void init_flush_new_sync_point(pwl::DeferredContexts &later);
+  void new_sync_point(pwl::DeferredContexts &later);
+  pwl::C_FlushRequest<AbstractWriteLog<ImageCtxT>>* make_flush_req(Context *on_finish);
+  void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, pwl::DeferredContexts &later);
+
+  void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
+  void schedule_complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r);
+  void internal_flush(bool invalidate, Context *on_finish);
+
+protected:
   librbd::cache::pwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
 
-  std::atomic<bool> m_initialized = {false};
   std::atomic<bool> m_shutting_down = {false};
   std::atomic<bool> m_invalidating = {false};
-  PMEMobjpool *m_log_pool = nullptr;
   const char* m_pwl_pool_layout_name;
 
   ImageCtxT &m_image_ctx;
@@ -164,14 +252,11 @@ protected:
 
   std::atomic<uint64_t> m_bytes_allocated = {0}; /* Total bytes allocated in write buffers */
   uint64_t m_bytes_cached = 0;    /* Total bytes used in write buffers */
-  uint64_t m_bytes_dirty = 0;     /* Total bytes yet to flush to RBD */
   uint64_t m_bytes_allocated_cap = 0;
 
-  utime_t m_last_alloc_fail;      /* Entry or buffer allocation fail seen */
   std::atomic<bool> m_alloc_failed_since_retire = {false};
 
   ImageWriteback<ImageCtxT> m_image_writeback;
-  pwl::WriteLogGuard m_write_log_guard;
   /*
    * When m_first_free_entry == m_first_valid_entry, the log is
    * empty. There is always at least one free entry, which can't be
@@ -180,23 +265,12 @@ protected:
   uint64_t m_first_free_entry = 0;  /* Entries from here to m_first_valid_entry-1 are free */
   uint64_t m_first_valid_entry = 0; /* Entries from here to m_first_free_entry-1 are valid */
 
-  /* Starts at 0 for a new write log. Incremented on every flush. */
-  uint64_t m_current_sync_gen = 0;
-  /* Starts at 0 on each sync gen increase. Incremented before applied
-     to an operation */
-  uint64_t m_last_op_sequence_num = 0;
   /* All writes bearing this and all prior sync gen numbers are flushed */
   uint64_t m_flushed_sync_gen = 0;
 
-  bool m_persist_on_write_until_flush = true;
-
   AsyncOpTracker m_async_op_tracker;
   /* Debug counters for the places m_async_op_tracker is used */
   std::atomic<int> m_async_flush_ops = {0};
-  std::atomic<int> m_async_append_ops = {0};
-  std::atomic<int> m_async_complete_ops = {0};
-  std::atomic<int> m_async_null_flush_finish = {0};
-  std::atomic<int> m_async_process_work = {0};
 
   /* Acquire locks in order declared here */
 
@@ -205,29 +279,19 @@ protected:
    * bufs. Hold a write lock to prevent readers from being added (e.g. when
    * removing log entrys from the map). No lock required to remove readers. */
   mutable RWLock m_entry_reader_lock;
-  /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
-  mutable ceph::mutex m_deferred_dispatch_lock;
   /* Hold m_log_append_lock while appending or retiring log entries. */
   mutable ceph::mutex m_log_append_lock;
   /* Used for most synchronization */
   mutable ceph::mutex m_lock;
 
-  /* Used in release/detain to make BlockGuard preserve submission order */
-  mutable ceph::mutex m_blockguard_lock;
-
   /* Use m_blockguard_lock for the following 3 things */
   pwl::WriteLogGuard::BlockOperations m_awaiting_barrier;
-  bool m_barrier_in_progress = false;
-  BlockGuardCell *m_barrier_cell = nullptr;
 
   bool m_wake_up_requested = false;
   bool m_wake_up_scheduled = false;
-  bool m_wake_up_enabled = true;
   bool m_appending = false;
   bool m_dispatching_deferred_ops = false;
 
-  Contexts m_flush_complete_contexts;
-
   pwl::GenericLogOperations m_ops_to_flush; /* Write ops needing flush in local log */
   pwl::GenericLogOperations m_ops_to_append; /* Write ops needing event append in local log */
 
@@ -239,70 +303,53 @@ protected:
 
   PerfCounters *m_perfcounter = nullptr;
 
-  std::shared_ptr<pwl::SyncPoint> m_current_sync_point = nullptr;
-  bool m_persist_on_flush = false; /* If false, persist each write before completion */
-
-  int m_flush_ops_in_flight = 0;
-  int m_flush_bytes_in_flight = 0;
-  uint64_t m_lowest_flushing_sync_gen = 0;
-
-  /* Writes that have left the block guard, but are waiting for resources */
-  C_BlockIORequests m_deferred_ios;
-  /* Throttle writes concurrently allocating & replicating */
-  unsigned int m_free_lanes = pwl::MAX_CONCURRENT_WRITES;
   unsigned int m_unpublished_reserves = 0;
 
-  /* Initialized from config, then set false during shutdown */
-  std::atomic<bool> m_periodic_stats_enabled = {false};
-  SafeTimer *m_timer = nullptr; /* Used with m_timer_lock */
-  mutable ceph::mutex *m_timer_lock = nullptr; /* Used with and by m_timer */
-  Context *m_timer_ctx = nullptr;
-
-  ThreadPool m_thread_pool;
   ContextWQ m_work_queue;
 
-  uint32_t m_discard_granularity_bytes;
-
-  void perf_start(const std::string name);
-  void perf_stop();
-  void log_perf();
-  void periodic_stats();
-  void arm_periodic_stats();
-
-  void pwl_init(Context *on_finish, pwl::DeferredContexts &later);
-  void update_image_cache_state(Context *on_finish);
-  void load_existing_entries(pwl::DeferredContexts &later);
   void wake_up();
-  void process_work();
 
-  void flush_dirty_entries(Context *on_finish);
-  bool can_flush_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
-  Context *construct_flush_entry_ctx(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
-  void persist_last_flushed_sync_gen();
-  bool handle_flushed_sync_point(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
-  void sync_point_writer_flushed(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
+  void update_entries(
+      std::shared_ptr<pwl::GenericLogEntry> log_entry,
+      pwl::WriteLogPmemEntry *pmem_entry, std::map<uint64_t, bool> &missing_sync_points,
+      std::map<uint64_t, std::shared_ptr<pwl::SyncPointLogEntry>> &sync_point_entries,
+      int entry_index);
+  void update_sync_points(
+      std::map<uint64_t, bool> &missing_sync_points,
+      std::map<uint64_t, std::shared_ptr<pwl::SyncPointLogEntry>> &sync_point_entries,
+      pwl::DeferredContexts &later);
+  Context *construct_flush_entry(
+      const std::shared_ptr<pwl::GenericLogEntry> log_entry, bool invalidating);
   void process_writeback_dirty_entries();
   bool can_retire_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
-  bool retire_entries(const unsigned long int frees_per_tx);
-
-  void init_flush_new_sync_point(pwl::DeferredContexts &later);
-  void new_sync_point(pwl::DeferredContexts &later);
-  pwl::C_FlushRequest<AbstractWriteLog<ImageCtxT>>* make_flush_req(Context *on_finish);
-  void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, pwl::DeferredContexts &later);
 
   void dispatch_deferred_writes(void);
-  void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
-  void append_scheduled_ops(void);
   void enlist_op_appender();
-  void schedule_append(pwl::GenericLogOperations &ops);
-  void flush_then_append_scheduled_ops(void);
-  void enlist_op_flusher();
-  void alloc_op_log_entries(pwl::GenericLogOperations &ops);
-  void flush_op_log_entries(pwl::GenericLogOperationsVector &ops);
-  int append_op_log_entries(pwl::GenericLogOperations &ops);
   void complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r);
-  void schedule_complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r);
-  void internal_flush(bool invalidate, Context *on_finish);
+
+  bool check_allocation(
+      C_BlockIORequestT *req,
+      uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
+      uint64_t &num_lanes, uint64_t &num_log_entries,
+      uint64_t &num_unpublished_reserves, uint64_t bytes_allocated_cap);
+  void append_scheduled(
+      pwl::GenericLogOperations &ops, bool &ops_remain, bool &appending, bool isRWL=false);
+  
+  virtual void process_work() = 0;
+  virtual void append_scheduled_ops(void) = 0;
+  virtual void schedule_append_ops(pwl::GenericLogOperations &ops) = 0;
+  virtual void remove_pool_file() = 0;
+  virtual void initialize_pool(Context *on_finish, pwl::DeferredContexts &later) = 0;
+  virtual void write_data_to_buffer(
+      std::shared_ptr<pwl::WriteLogEntry> ws_entry, pwl::WriteLogPmemEntry *pmem_entry) {}
+  virtual void get_pool_name(const std::string log_poolset_name) {}
+  virtual void alloc_op_log_entries(pwl::GenericLogOperations &ops) {}
+  virtual bool retire_entries(const unsigned long int frees_per_tx) {return false;}
+  virtual void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops) {}
+  virtual void persist_last_flushed_sync_gen() {}
+  virtual void reserve_pmem(C_BlockIORequestT *req, bool &alloc_succeeds, bool &no_space) {}
+  virtual Context *construct_flush_entry_ctx(
+      const std::shared_ptr<pwl::GenericLogEntry> log_entry) {return nullptr;}
 };
 
 } // namespace pwl
index 5db62a5fd50f15723e61c76034d24a0b015d5063..c094cafff21cc0d892d294e5b7a0cc2cd60b9654 100644 (file)
@@ -1,6 +1,7 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// // vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab
 
+#include <libpmemobj.h>
 #include "ReplicatedWriteLog.h"
 #include "include/buffer.h"
 #include "include/Context.h"
 #include "common/Timer.h"
 #include "common/perf_counters.h"
 #include "librbd/ImageCtx.h"
+#include "librbd/asio/ContextWQ.h"
 #include "librbd/cache/pwl/ImageCacheState.h"
 #include "librbd/cache/pwl/LogEntry.h"
+#include "librbd/cache/pwl/Types.h"
 #include <map>
 #include <vector>
 
 namespace librbd {
 namespace cache {
 namespace pwl {
+
 using namespace librbd::cache::pwl;
 
+const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION;
+
 template <typename I>
-ReplicatedWriteLog<I>::ReplicatedWriteLog(I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state)
+ReplicatedWriteLog<I>::ReplicatedWriteLog(
+    I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state)
 : AbstractWriteLog<I>(image_ctx, cache_state)
 { 
 }
 
+template <typename I>
+ReplicatedWriteLog<I>::~ReplicatedWriteLog() {
+  m_log_pool = nullptr;
+}
+
+/*
+ * Allocate the (already reserved) write log entries for a set of operations.
+ *
+ * Locking:
+ * Acquires lock
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops)
+{
+  TOID(struct WriteLogPoolRoot) pool_root;
+  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+  struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
+  
+  ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock));
+  
+  /* Allocate the (already reserved) log entries */
+  std::lock_guard locker(m_lock);
+  
+  for (auto &operation : ops) {
+    uint32_t entry_index = this->m_first_free_entry;
+    this->m_first_free_entry = (this->m_first_free_entry + 1) % this->m_total_log_entries;
+    auto &log_entry = operation->get_log_entry();
+    log_entry->log_entry_index = entry_index;
+    log_entry->ram_entry.entry_index = entry_index;
+    log_entry->pmem_entry = &pmem_log_entries[entry_index];
+    log_entry->ram_entry.entry_valid = 1;
+    m_log_entries.push_back(log_entry);
+    ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
+  } 
+}
+
+/*
+ * Write and persist the (already allocated) write log entries and
+ * data buffer allocations for a set of ops. The data buffer for each
+ * of these must already have been persisted to its reserved area.
+ */
+template <typename I>
+int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperations &ops)
+{
+  CephContext *cct = m_image_ctx.cct;
+  GenericLogOperationsVector entries_to_flush;
+  TOID(struct WriteLogPoolRoot) pool_root;
+  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+  int ret = 0;
+
+  ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock));
+
+  if (ops.empty()) {
+    return 0;
+  }
+  entries_to_flush.reserve(OPS_APPENDED_TOGETHER);
+
+  /* Write log entries to ring and persist */
+  utime_t now = ceph_clock_now();
+  for (auto &operation : ops) {
+    if (!entries_to_flush.empty()) {
+      /* Flush these and reset the list if the current entry wraps to the
+       * tail of the ring */
+      if (entries_to_flush.back()->get_log_entry()->log_entry_index >
+          operation->get_log_entry()->log_entry_index) {
+        ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
+                                   << "operation=[" << *operation << "]" << dendl;
+        flush_op_log_entries(entries_to_flush);
+        entries_to_flush.clear();
+        now = ceph_clock_now();
+      } 
+    } 
+    ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
+                               << operation->get_log_entry()->log_entry_index << " "
+                               << "from " << &operation->get_log_entry()->ram_entry << " "
+                               << "to " << operation->get_log_entry()->pmem_entry << " "
+                               << "operation=[" << *operation << "]" << dendl;
+    ldout(m_image_ctx.cct, 05) << "APPENDING: index="
+                               << operation->get_log_entry()->log_entry_index << " "
+                               << "operation=[" << *operation << "]" << dendl;
+    operation->log_append_time = now;
+    *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry;
+    ldout(m_image_ctx.cct, 20) << "APPENDING: index="
+                               << operation->get_log_entry()->log_entry_index << " "
+                               << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry
+                               << "]" << dendl;
+    entries_to_flush.push_back(operation);
+  } 
+  flush_op_log_entries(entries_to_flush);
+
+  /* Drain once for all */
+  pmemobj_drain(m_log_pool);
+
+  /*
+   * Atomically advance the log head pointer and publish the
+   * allocations for all the data buffers they refer to.
+   */
+  utime_t tx_start = ceph_clock_now();
+  TX_BEGIN(m_log_pool) {
+    D_RW(pool_root)->first_free_entry = this->m_first_free_entry;
+    for (auto &operation : ops) {
+      if (operation->reserved_allocated()) {
+        auto write_op = (std::shared_ptr<WriteLogOperation>&) operation;
+        pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
+      } else {
+        ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
+      } 
+    } 
+  } TX_ONCOMMIT {
+  } TX_ONABORT {
+    lderr(cct) << "failed to commit " << ops.size()
+               << " log entries (" << this->m_log_pool_name << ")" << dendl;
+    ceph_assert(false);
+    ret = -EIO;
+  } TX_FINALLY {
+  } TX_END;
+
+  utime_t tx_end = ceph_clock_now();
+  m_perfcounter->tinc(l_librbd_pwl_append_tx_t, tx_end - tx_start);
+  m_perfcounter->hinc(
+    l_librbd_pwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
+  for (auto &operation : ops) {
+    operation->log_append_comp_time = tx_end;
+  } 
+
+  return ret;
+}
+
+/*
+ * Flush the persistent write log entries set of ops. The entries must
+ * be contiguous in persistent memory.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops)
+{
+  if (ops.empty()) {
+    return;
+  } 
+
+  if (ops.size() > 1) {
+    ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry);
+  } 
+
+  ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
+                             << "start address="
+                             << ops.front()->get_log_entry()->pmem_entry << " "
+                             << "bytes="
+                             << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
+                             << dendl;
+  pmemobj_flush(m_log_pool,  
+                ops.front()->get_log_entry()->pmem_entry,
+                ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)));
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::get_pool_name(const std::string log_poolset_name) {
+  CephContext *cct = m_image_ctx.cct;
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+  if (access(log_poolset_name.c_str(), F_OK) == 0) {
+    this->m_log_pool_name = log_poolset_name;
+    this->m_log_is_poolset = true;
+  } else {
+    ldout(cct, 5) << "Poolset file " << log_poolset_name
+                  << " not present (or can't open). Using unreplicated pool" << dendl;
+  }               
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::remove_pool_file() {
+  if (m_log_pool) {
+    ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
+    pmemobj_close(m_log_pool);
+  } 
+  if (m_cache_state->clean) {
+    if (this->m_log_is_poolset) {
+      ldout(m_image_ctx.cct, 5) << "Not removing poolset " << this->m_log_pool_name << dendl;
+    } else {
+      ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << this->m_log_pool_name << dendl;
+      if (remove(this->m_log_pool_name.c_str()) != 0) {
+        lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << this->m_log_pool_name << "\": "
+          << pmemobj_errormsg() << dendl;
+      } else {
+        m_cache_state->clean = true;
+        m_cache_state->empty = true;
+        m_cache_state->present = false;
+      } 
+    } 
+  } else {
+    if (this->m_log_is_poolset) {
+      ldout(m_image_ctx.cct, 5) << "Not removing poolset " << this->m_log_pool_name << dendl;
+    } else {
+      ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << this->m_log_pool_name << dendl;
+    } 
+  } 
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::initialize_pool(Context *on_finish, pwl::DeferredContexts &later) {
+  CephContext *cct = m_image_ctx.cct;
+  TOID(struct WriteLogPoolRoot) pool_root;
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+  if (access(this->m_log_pool_name.c_str(), F_OK) != 0) {
+    if ((m_log_pool =
+         pmemobj_create(this->m_log_pool_name.c_str(),
+                        this->m_pwl_pool_layout_name,
+                        this->m_log_pool_config_size,
+                        (S_IWUSR | S_IRUSR))) == NULL) {
+      lderr(cct) << "failed to create pool (" << this->m_log_pool_name << ")"
+                 << pmemobj_errormsg() << dendl;
+      m_cache_state->present = false;
+      m_cache_state->clean = true;
+      m_cache_state->empty = true;
+      /* TODO: filter/replace errnos that are meaningless to the caller */
+      on_finish->complete(-errno);
+      return;
+    } 
+    m_cache_state->present = true;
+    m_cache_state->clean = true;
+    m_cache_state->empty = true;
+    pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+
+    /* new pool, calculate and store metadata */
+    size_t effective_pool_size = (size_t)(this->m_log_pool_config_size * USABLE_SIZE);
+    size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogPmemEntry);
+    uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size);
+    if (num_small_writes > MAX_LOG_ENTRIES) {
+      num_small_writes = MAX_LOG_ENTRIES;
+    }
+    if (num_small_writes <= 2) {
+      lderr(cct) << "num_small_writes needs to > 2" << dendl;
+      on_finish->complete(-EINVAL);
+      return;
+    } 
+    this->m_log_pool_actual_size = this->m_log_pool_config_size;
+    this->m_bytes_allocated_cap = effective_pool_size;
+    /* Log ring empty */
+    m_first_free_entry = 0;
+    m_first_valid_entry = 0;
+    TX_BEGIN(m_log_pool) {
+      TX_ADD(pool_root);
+      D_RW(pool_root)->header.layout_version = RWL_POOL_VERSION;
+      D_RW(pool_root)->log_entries =
+        TX_ZALLOC(struct WriteLogPmemEntry,
+                  sizeof(struct WriteLogPmemEntry) * num_small_writes);
+      D_RW(pool_root)->pool_size = this->m_log_pool_actual_size;
+      D_RW(pool_root)->flushed_sync_gen = this->m_flushed_sync_gen;
+      D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE;
+      D_RW(pool_root)->num_log_entries = num_small_writes;
+      D_RW(pool_root)->first_free_entry = m_first_free_entry;
+      D_RW(pool_root)->first_valid_entry = m_first_valid_entry;
+    } TX_ONCOMMIT {
+      this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
+      this->m_free_log_entries = D_RO(pool_root)->num_log_entries - 1; // leave one free
+    } TX_ONABORT {
+      this->m_total_log_entries = 0;
+      this->m_free_log_entries = 0;
+      lderr(cct) << "failed to initialize pool (" << this->m_log_pool_name << ")" << dendl;
+      on_finish->complete(-pmemobj_tx_errno());
+      return;
+    } TX_FINALLY {
+    } TX_END;
+  } else {
+    m_cache_state->present = true;
+    /* Open existing pool */
+    if ((m_log_pool =
+         pmemobj_open(this->m_log_pool_name.c_str(),
+                      this->m_pwl_pool_layout_name)) == NULL) {
+      lderr(cct) << "failed to open pool (" << this->m_log_pool_name << "): "
+                 << pmemobj_errormsg() << dendl;
+      on_finish->complete(-errno);
+      return;
+    }
+    pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+    if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) {
+      // TODO: will handle upgrading version in the future
+      lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version
+                 << " expected " << RWL_POOL_VERSION << dendl;
+      on_finish->complete(-EINVAL);
+      return;
+    }
+    if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
+      lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size
+                 << " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
+      on_finish->complete(-EINVAL);
+      return;
+    }
+    this->m_log_pool_actual_size = D_RO(pool_root)->pool_size;
+    this->m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
+    this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
+    m_first_free_entry = D_RO(pool_root)->first_free_entry;
+    m_first_valid_entry = D_RO(pool_root)->first_valid_entry;
+    if (m_first_free_entry < m_first_valid_entry) {
+      /* Valid entries wrap around the end of the ring, so first_free is lower
+       * than first_valid.  If first_valid was == first_free+1, the entry at
+       * first_free would be empty. The last entry is never used, so in
+       * that case there would be zero free log entries. */
+     this->m_free_log_entries = this->m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1;
+    } else {
+      /* first_valid is <= first_free. If they are == we have zero valid log
+       * entries, and n-1 free log entries */
+      this->m_free_log_entries = this->m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
+    } 
+    size_t effective_pool_size = (size_t)(this->m_log_pool_config_size * USABLE_SIZE);
+    this->m_bytes_allocated_cap = effective_pool_size;
+    load_existing_entries(later);
+    m_cache_state->clean = this->m_dirty_log_entries.empty();
+    m_cache_state->empty = m_log_entries.empty();
+  }
+}
+
+/*
+ * Loads the log entries from an existing log.
+ *
+ * Creates the in-memory structures to represent the state of the
+ * re-opened log.
+ *
+ * Finds the last appended sync point, and any sync points referred to
+ * in log entries, but missing from the log. These missing sync points
+ * are created and scheduled for append. Some rudimentary consistency
+ * checking is done.
+ *    
+ * Rebuilds the m_blocks_to_log_entries map, to make log entries
+ * readable.
+ *  
+ * Places all writes on the dirty entries list, which causes them all
+ * to be flushed.
+ *  
+ */
+
+template <typename I>
+void ReplicatedWriteLog<I>::load_existing_entries(DeferredContexts &later) {
+  TOID(struct WriteLogPoolRoot) pool_root;
+  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+  struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
+  uint64_t entry_index = m_first_valid_entry;
+  /* The map below allows us to find sync point log entries by sync
+   * gen number, which is necessary so write entries can be linked to
+   * their sync points. */
+  std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
+  /* The map below tracks sync points referred to in writes but not
+   * appearing in the sync_point_entries map.  We'll use this to
+   * determine which sync points are missing and need to be
+   * created. */
+  std::map<uint64_t, bool> missing_sync_points;
+
+  /*
+   * Read the existing log entries. Construct an in-memory log entry
+   * object of the appropriate type for each. Add these to the global
+   * log entries list.
+   *
+   * Write entries will not link to their sync points yet. We'll do
+   * that in the next pass. Here we'll accumulate a map of sync point
+   * gen numbers that are referred to in writes but do not appearing in
+   * the log.
+   */
+  while (entry_index != m_first_free_entry) {
+    WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index];
+    std::shared_ptr<GenericLogEntry> log_entry = nullptr;
+    ceph_assert(pmem_entry->entry_index == entry_index);
+
+    this->update_entries(log_entry, pmem_entry, missing_sync_points,
+        sync_point_entries, entry_index);
+
+    log_entry->ram_entry = *pmem_entry;
+    log_entry->pmem_entry = pmem_entry;
+    log_entry->log_entry_index = entry_index;
+    log_entry->completed = true;
+
+    m_log_entries.push_back(log_entry);
+    
+    entry_index = (entry_index + 1) % this->m_total_log_entries;
+  }
+
+  this->update_sync_points(missing_sync_points, sync_point_entries, later);
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::write_data_to_buffer(std::shared_ptr<WriteLogEntry> ws_entry,
+    WriteLogPmemEntry *pmem_entry) {
+  ws_entry->pmem_buffer = D_RW(pmem_entry->write_data);
+} 
+
+/**
+ * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
+ * that are eligible to be retired. Returns true if anything was
+ * retired.
+ */
+template <typename I>
+bool ReplicatedWriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
+  CephContext *cct = m_image_ctx.cct;
+  GenericLogEntriesVector retiring_entries;
+  uint32_t initial_first_valid_entry;
+  uint32_t first_valid_entry;
+
+  std::lock_guard retire_locker(this->m_log_retire_lock);
+  ldout(cct, 20) << "Look for entries to retire" << dendl;
+  {
+    /* Entry readers can't be added while we hold m_entry_reader_lock */
+    RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock);
+    std::lock_guard locker(m_lock);
+    initial_first_valid_entry = this->m_first_valid_entry;
+    first_valid_entry = this->m_first_valid_entry;
+    auto entry = m_log_entries.front();
+    while (!m_log_entries.empty() &&
+           retiring_entries.size() < frees_per_tx &&
+           this->can_retire_entry(entry)) {
+      if (entry->log_entry_index != first_valid_entry) {
+        lderr(cct) << "Retiring entry index (" << entry->log_entry_index
+                   << ") and first valid log entry index (" << first_valid_entry
+                   << ") must be ==." << dendl;
+      }
+      ceph_assert(entry->log_entry_index == first_valid_entry);
+      first_valid_entry = (first_valid_entry + 1) % this->m_total_log_entries;
+      m_log_entries.pop_front();
+      retiring_entries.push_back(entry);
+      /* Remove entry from map so there will be no more readers */
+      if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) {
+        auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(entry);
+        if (gen_write_entry) {
+          this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
+        }
+      }
+      entry = m_log_entries.front();
+    }
+  }
+
+  if (retiring_entries.size()) {
+    ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl;
+    TOID(struct WriteLogPoolRoot) pool_root;
+    pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+
+    utime_t tx_start;
+    utime_t tx_end;
+    /* Advance first valid entry and release buffers */
+    {
+      uint64_t flushed_sync_gen;
+      std::lock_guard append_locker(this->m_log_append_lock);
+      {
+        std::lock_guard locker(m_lock);
+        flushed_sync_gen = this->m_flushed_sync_gen;
+      }
+
+      tx_start = ceph_clock_now();
+      TX_BEGIN(m_log_pool) {
+        if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
+          ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from "
+                                     << D_RO(pool_root)->flushed_sync_gen << " to "
+                                     << flushed_sync_gen << dendl;
+          D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
+        } 
+        D_RW(pool_root)->first_valid_entry = first_valid_entry;
+        for (auto &entry: retiring_entries) {
+          if (entry->write_bytes()) {
+            ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo
+                           << "." << entry->ram_entry.write_data.oid.off << dendl;
+            TX_FREE(entry->ram_entry.write_data);
+          } else {
+            ldout(cct, 20) << "Retiring non-write: " << *entry << dendl;
+          }
+        }
+      } TX_ONCOMMIT {
+      } TX_ONABORT {
+        lderr(cct) << "failed to commit free of" << retiring_entries.size()
+                   << " log entries (" << this->m_log_pool_name << ")" << dendl;
+        ceph_assert(false);
+      } TX_FINALLY {
+      } TX_END;
+      tx_end = ceph_clock_now();
+    }
+    m_perfcounter->tinc(l_librbd_pwl_retire_tx_t, tx_end - tx_start);
+    m_perfcounter->hinc(l_librbd_pwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(),
+        retiring_entries.size());
+
+    /* Update runtime copy of first_valid, and free entries counts */
+    {
+      std::lock_guard locker(m_lock);
+
+      ceph_assert(this->m_first_valid_entry == initial_first_valid_entry);
+      this->m_first_valid_entry = first_valid_entry;
+      this->m_free_log_entries += retiring_entries.size();
+      for (auto &entry: retiring_entries) {
+        if (entry->write_bytes()) {
+          ceph_assert(this->m_bytes_cached >= entry->write_bytes());
+          this->m_bytes_cached -= entry->write_bytes();
+          uint64_t entry_allocation_size = entry->write_bytes();
+          if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) {
+            entry_allocation_size = MIN_WRITE_ALLOC_SIZE;
+          }
+          ceph_assert(this->m_bytes_allocated >= entry_allocation_size);
+          this->m_bytes_allocated -= entry_allocation_size;
+        }
+      } 
+      this->m_alloc_failed_since_retire = false;
+      this->wake_up();
+    }
+  } else {
+    ldout(cct, 20) << "Nothing to retire" << dendl;
+    return false;
+  }
+  return true;
+}
+
+template <typename I>
+Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(
+    std::shared_ptr<GenericLogEntry> log_entry) {
+  bool invalidating = this->m_invalidating; // snapshot so we behave consistently
+  Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+  
+  if (invalidating) {
+    return ctx;
+  }
+  return new LambdaContext(
+    [this, log_entry, ctx](int r) {
+      m_image_ctx.op_work_queue->queue(new LambdaContext(
+        [this, log_entry, ctx](int r) {
+          ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                     << " " << *log_entry << dendl;
+          log_entry->writeback(this->m_image_writeback, ctx);
+        }), 0);
+    }); 
+}
+
+const unsigned long int ops_flushed_together = 4;
+/*
+ * Performs the pmem buffer flush on all scheduled ops, then schedules
+ * the log event append operation for all of them.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::flush_then_append_scheduled_ops(void)
+{
+  GenericLogOperations ops;
+  bool ops_remain = false;
+  ldout(m_image_ctx.cct, 20) << dendl;
+  do {
+    {
+      ops.clear();
+      std::lock_guard locker(m_lock);
+      if (m_ops_to_flush.size()) {
+        auto last_in_batch = m_ops_to_flush.begin();
+        unsigned int ops_to_flush = m_ops_to_flush.size();
+        if (ops_to_flush > ops_flushed_together) {
+          ops_to_flush = ops_flushed_together;
+        }
+        ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
+        std::advance(last_in_batch, ops_to_flush);
+        ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
+        ops_remain = !m_ops_to_flush.empty();
+        ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", "
+                                   << m_ops_to_flush.size() << " remain" << dendl;
+      } else {
+        ops_remain = false;
+      }
+    } 
+    if (ops_remain) {
+      enlist_op_flusher();
+    }
+
+    /* Ops subsequently scheduled for flush may finish before these,
+     * which is fine. We're unconcerned with completion order until we
+     * get to the log message append step. */
+    if (ops.size()) {
+      flush_pmem_buffer(ops);
+      schedule_append_ops(ops);
+    } 
+  } while (ops_remain);
+  append_scheduled_ops();
+}
+
+/*
+ * Performs the log event append operation for all of the scheduled
+ * events.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::append_scheduled_ops(void) {
+  GenericLogOperations ops;
+  int append_result = 0;
+  bool ops_remain = false;
+  bool appending = false; /* true if we set m_appending */
+  ldout(m_image_ctx.cct, 20) << dendl;
+  do {
+    ops.clear();
+    this->append_scheduled(ops, ops_remain, appending, true);
+    
+    if (ops.size()) {
+      std::lock_guard locker(this->m_log_append_lock);
+      alloc_op_log_entries(ops);
+      append_result = append_op_log_entries(ops);
+    } 
+    
+    int num_ops = ops.size();
+    if (num_ops) {
+      /* New entries may be flushable. Completion will wake up flusher. */
+      this->complete_op_log_entries(std::move(ops), append_result);
+    } 
+  } while (ops_remain);
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::enlist_op_flusher()
+{
+  this->m_async_flush_ops++;
+  this->m_async_op_tracker.start_op();
+  Context *flush_ctx = new LambdaContext([this](int r) {
+      flush_then_append_scheduled_ops();
+      this->m_async_flush_ops--;
+      this->m_async_op_tracker.finish_op();
+    });
+  this->m_work_queue.queue(flush_ctx);
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::setup_schedule_append(
+    pwl::GenericLogOperationsVector &ops, bool do_early_flush) {
+  if (do_early_flush) {                           
+    /* This caller is waiting for persist, so we'll use their thread to
+     * expedite it */
+    flush_pmem_buffer(ops);
+    this->schedule_append(ops);
+  } else {
+    /* This is probably not still the caller's thread, so do the payload
+     * flushing/replicating later. */
+    schedule_flush_and_append(ops);
+  } 
+}
+
+/*
+ * Takes custody of ops. They'll all get their log entries appended,
+ * and have their on_write_persist contexts completed once they and
+ * all prior log entries are persisted everywhere.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_append_ops(GenericLogOperations &ops)
+{
+  bool need_finisher;
+  GenericLogOperationsVector appending;
+  
+  std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
+  {
+    std::lock_guard locker(m_lock);
+  
+    need_finisher = this->m_ops_to_append.empty() && !this->m_appending;
+    this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops);
+  } 
+  
+  if (need_finisher) {
+    this->enlist_op_appender();
+  }
+  
+  for (auto &op : appending) {
+    op->appending();
+  }
+}
+
+/*
+ * Takes custody of ops. They'll all get their pmem blocks flushed,
+ * then get their log entries appended.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVector &ops)
+{
+  GenericLogOperations to_flush(ops.begin(), ops.end());
+  bool need_finisher;
+  ldout(m_image_ctx.cct, 20) << dendl;
+  {
+    std::lock_guard locker(m_lock);
+    
+    need_finisher = m_ops_to_flush.empty();
+    m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush);
+  } 
+  
+  if (need_finisher) {
+    enlist_op_flusher();
+  } 
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::process_work() {
+  CephContext *cct = m_image_ctx.cct;
+  int max_iterations = 4;
+  bool wake_up_requested = false;
+  uint64_t aggressive_high_water_bytes = this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
+  uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER;
+  uint64_t low_water_bytes = this->m_bytes_allocated_cap * RETIRE_LOW_WATER;
+  uint64_t aggressive_high_water_entries = this->m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER;
+  uint64_t high_water_entries = this->m_total_log_entries * RETIRE_HIGH_WATER;
+  uint64_t low_water_entries = this->m_total_log_entries * RETIRE_LOW_WATER;
+
+  ldout(cct, 20) << dendl;
+
+  do {
+    {
+      std::lock_guard locker(m_lock);
+      this->m_wake_up_requested = false;
+    }
+    if (this->m_alloc_failed_since_retire || this->m_invalidating ||
+        this->m_bytes_allocated > high_water_bytes ||
+        (m_log_entries.size() > high_water_entries)) {
+      int retired = 0;
+      utime_t started = ceph_clock_now();
+      ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
+                                 << ", allocated > high_water="
+                                 << (this->m_bytes_allocated > high_water_bytes)
+                                 << ", allocated_entries > high_water="
+                                 << (m_log_entries.size() > high_water_entries)
+                                 << dendl;
+      while (this->m_alloc_failed_since_retire || this->m_invalidating ||
+            (this->m_bytes_allocated > high_water_bytes) ||
+            (m_log_entries.size() > high_water_entries) ||
+            (((this->m_bytes_allocated > low_water_bytes) ||
+              (m_log_entries.size() > low_water_entries)) &&
+            (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
+        if (!retire_entries((this->m_shutting_down || this->m_invalidating ||
+           (this->m_bytes_allocated > aggressive_high_water_bytes) ||
+           (m_log_entries.size() > aggressive_high_water_entries))
+            ? MAX_ALLOC_PER_TRANSACTION
+            : MAX_FREE_PER_TRANSACTION)) {
+          break;
+        }
+        retired++;
+        this->dispatch_deferred_writes();
+        this->process_writeback_dirty_entries();
+      }
+      ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl;
+    }
+    this->dispatch_deferred_writes();
+    this->process_writeback_dirty_entries();
+
+    {
+      std::lock_guard locker(m_lock);
+      wake_up_requested = this->m_wake_up_requested;
+    }
+  } while (wake_up_requested && --max_iterations > 0);
+
+  {
+    std::lock_guard locker(m_lock);
+    this->m_wake_up_scheduled = false;
+    /* Reschedule if it's still requested */
+    if (this->m_wake_up_requested) {
+      this->wake_up();
+    } 
+  }
+}
+
+/*
+ * Flush the pmem regions for the data blocks of a set of operations
+ *
+ * V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I>
+ */
+template <typename I>
+template <typename V>
+void ReplicatedWriteLog<I>::flush_pmem_buffer(V& ops)
+{
+  for (auto &operation : ops) {
+    operation->flush_pmem_buf_to_cache(m_log_pool);
+  } 
+  
+  /* Drain once for all */
+  pmemobj_drain(m_log_pool);
+  
+  utime_t now = ceph_clock_now();
+  for (auto &operation : ops) {
+    if (operation->reserved_allocated()) {
+      operation->buf_persist_comp_time = now;
+    } else {
+      ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
+    } 
+  } 
+}
+
+/**
+ * Update/persist the last flushed sync point in the log
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::persist_last_flushed_sync_gen()
+{
+  TOID(struct WriteLogPoolRoot) pool_root;
+  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+  uint64_t flushed_sync_gen;
+
+  std::lock_guard append_locker(this->m_log_append_lock);
+  {
+    std::lock_guard locker(m_lock);
+    flushed_sync_gen = this->m_flushed_sync_gen;
+  }
+  
+  if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
+    ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from "
+                               << D_RO(pool_root)->flushed_sync_gen << " to "
+                               << flushed_sync_gen << dendl;
+    TX_BEGIN(m_log_pool) {     
+      D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
+    } TX_ONCOMMIT {
+    } TX_ONABORT {
+      lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl;
+      ceph_assert(false);
+    } TX_FINALLY {
+    } TX_END;
+  }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::reserve_pmem(C_BlockIORequestT *req,
+                                         bool &alloc_succeeds, bool &no_space) {
+  std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
+  for (auto &buffer : buffers) {
+    utime_t before_reserve = ceph_clock_now();
+    buffer.buffer_oid = pmemobj_reserve(m_log_pool,
+                                        &buffer.buffer_alloc_action,
+                                        buffer.allocation_size,
+                                        0 /* Object type */);
+    buffer.allocation_lat = ceph_clock_now() - before_reserve;
+    if (TOID_IS_NULL(buffer.buffer_oid)) {
+      if (!req->has_io_waited_for_buffers()) {
+        req->set_io_waited_for_entries(true);
+      } 
+      ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: "
+                                << pmemobj_errormsg() << ". "
+                                << *req << dendl;
+      alloc_succeeds = false;   
+      no_space = true; /* Entries need to be retired */
+      break;
+    } else {
+      buffer.allocated = true;
+    } 
+    ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
+                               << "." << buffer.buffer_oid.oid.off
+                               << ", size=" << buffer.allocation_size << dendl;
+  }                            
+}
+
+template <typename I>
+bool ReplicatedWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
+  bool alloc_succeeds = true;
+  uint64_t bytes_allocated = 0;
+  uint64_t bytes_cached = 0;
+  uint64_t bytes_dirtied = 0;
+  uint64_t num_lanes = 0;
+  uint64_t num_unpublished_reserves = 0;
+  uint64_t num_log_entries = 0;
+  
+  ldout(m_image_ctx.cct, 20) << dendl;
+  // Setup buffer, and get all the number of required resources
+  req->setup_buffer_resources(bytes_cached, bytes_dirtied, bytes_allocated,
+                              num_lanes, num_log_entries, num_unpublished_reserves);
+                              
+  alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied, bytes_allocated,
+                              num_lanes, num_log_entries, num_unpublished_reserves,
+                              this->m_bytes_allocated_cap);
+                              
+  std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
+  if (!alloc_succeeds) {
+    /* On alloc failure, free any buffers we did allocate */
+    for (auto &buffer : buffers) {
+      if (buffer.allocated) {
+        pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1);
+      }
+    } 
+  } 
+  
+  req->set_allocated(alloc_succeeds);
+  return alloc_succeeds;
+}
+
 } // namespace pwl
 } // namespace cache
 } // namespace librbd
index dc1a46a4547c1e9db609aa0e9997757a9e4c5363..339264b15ca6ad3322ecbc5109df1d7e6d49ca0a 100644 (file)
@@ -33,10 +33,8 @@ namespace pwl {
 template <typename ImageCtxT>
 class ReplicatedWriteLog : public AbstractWriteLog<ImageCtxT> {
 public:
-  typedef io::Extent Extent;
-  typedef io::Extents Extents;
-
-  ReplicatedWriteLog(ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state);
+  ReplicatedWriteLog(
+      ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state);
   ~ReplicatedWriteLog();
   ReplicatedWriteLog(const ReplicatedWriteLog&) = delete;
   ReplicatedWriteLog &operator=(const ReplicatedWriteLog&) = delete;
@@ -50,6 +48,45 @@ private:
   using C_WriteSameRequestT = pwl::C_WriteSameRequest<This>;
   using C_CompAndWriteRequestT = pwl::C_CompAndWriteRequest<This>;
 
+  PMEMobjpool *m_log_pool = nullptr;
+
+  void remove_pool_file();
+  void load_existing_entries(pwl::DeferredContexts &later);
+  void alloc_op_log_entries(pwl::GenericLogOperations &ops);
+  int append_op_log_entries(pwl::GenericLogOperations &ops);
+  void flush_then_append_scheduled_ops(void);
+  void enlist_op_flusher();
+  void flush_op_log_entries(pwl::GenericLogOperationsVector &ops);
+  template <typename V>
+  void flush_pmem_buffer(V& ops);
+
+protected:
+  using AbstractWriteLog<ImageCtxT>::m_lock;
+  using AbstractWriteLog<ImageCtxT>::m_log_entries;
+  using AbstractWriteLog<ImageCtxT>::m_image_ctx;
+  using AbstractWriteLog<ImageCtxT>::m_perfcounter;
+  using AbstractWriteLog<ImageCtxT>::m_ops_to_flush;
+  using AbstractWriteLog<ImageCtxT>::m_cache_state;
+  using AbstractWriteLog<ImageCtxT>::m_first_free_entry;
+  using AbstractWriteLog<ImageCtxT>::m_first_valid_entry;
+
+  void process_work() override;
+  void schedule_append_ops(pwl::GenericLogOperations &ops) override;
+  void append_scheduled_ops(void) override;
+  void reserve_pmem(C_BlockIORequestT *req, bool &alloc_succeeds, bool &no_space) override;
+  bool retire_entries(const unsigned long int frees_per_tx) override;
+  void persist_last_flushed_sync_gen() override;
+  bool alloc_resources(C_BlockIORequestT *req) override;
+  void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops) override;
+  void setup_schedule_append(
+      pwl::GenericLogOperationsVector &ops, bool do_early_flush) override;
+  Context *construct_flush_entry_ctx(
+        const std::shared_ptr<pwl::GenericLogEntry> log_entry) override;
+  void get_pool_name(const std::string log_poolset_name) override;
+  void initialize_pool(Context *on_finish, pwl::DeferredContexts &later) override;
+  void write_data_to_buffer(
+      std::shared_ptr<pwl::WriteLogEntry> ws_entry,
+      pwl::WriteLogPmemEntry *pmem_entry) override;
 };
 
 } // namespace pwl
index c30fb2203cbe96abef95d868e6049374bdb69267..aecf3b6a3f53ef532c542e91405e52eb5a415262 100644 (file)
@@ -275,16 +275,7 @@ bool C_WriteRequest<T>::append_write_request(std::shared_ptr<SyncPoint> sync_poi
 template <typename T>
 void C_WriteRequest<T>::schedule_append() {
   ceph_assert(++m_appended == 1);
-  if (m_do_early_flush) {
-    /* This caller is waiting for persist, so we'll use their thread to
-     * expedite it */
-    pwl.flush_pmem_buffer(this->op_set->operations);
-    pwl.schedule_append(this->op_set->operations);
-  } else {
-    /* This is probably not still the caller's thread, so do the payload
-     * flushing/replicating later. */
-    pwl.schedule_flush_and_append(this->op_set->operations);
-  }
+  pwl.setup_schedule_append(this->op_set->operations, m_do_early_flush);
 }
 
 /**