]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd: add aio_write tests
authorYuan Lu <yuan.y.lu@intel.com>
Thu, 21 Nov 2019 09:27:48 +0000 (17:27 +0800)
committerYuan Lu <yuan.y.lu@intel.com>
Thu, 20 Feb 2020 13:18:38 +0000 (21:18 +0800)
Signed-off-by: Peterson, Scott <scott.d.peterson@intel.com>
Signed-off-by: Li, Xiaoyan <xiaoyan.li@intel.com>
Signed-off-by: Lu, Yuan <yuan.y.lu@intel.com>
Signed-off-by: Chamarthy, Mahati <mahati.chamarthy@intel.com>
15 files changed:
src/common/subsys.h
src/librbd/cache/ReplicatedWriteLog.cc
src/librbd/cache/ReplicatedWriteLog.h
src/librbd/cache/rwl/ImageCacheState.cc
src/librbd/cache/rwl/LogEntry.cc
src/librbd/cache/rwl/LogEntry.h
src/librbd/cache/rwl/LogOperation.cc
src/librbd/cache/rwl/LogOperation.h
src/librbd/cache/rwl/Request.cc
src/librbd/cache/rwl/Request.h
src/librbd/cache/rwl/SyncPoint.cc
src/librbd/cache/rwl/SyncPoint.h
src/librbd/cache/rwl/Types.cc
src/librbd/cache/rwl/Types.h
src/test/librbd/cache/test_mock_ReplicatedWriteLog.cc

index 2e134c38bd6fc07b5febcc84499fd7972e8a1493..e61b478bfae959c738871366df5c637d5281a69d 100644 (file)
@@ -37,6 +37,7 @@ SUBSYS(rados, 0, 5)
 SUBSYS(rbd, 0, 5)
 SUBSYS(rbd_mirror, 0, 5)
 SUBSYS(rbd_replay, 0, 5)
+SUBSYS(rbd_rwl, 0, 5)
 SUBSYS(journaler, 0, 5)
 SUBSYS(objectcacher, 0, 5)
 SUBSYS(immutable_obj_cache, 0, 5)
index 343790a41c44b313e51dd80db524e4a07a4a9776..51d789e31f1faeed3004bf1c1cff4b18bd737113 100644 (file)
@@ -20,7 +20,7 @@
 #include <map>
 #include <vector>
 
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::cache::ReplicatedWriteLog: " << this << " " \
                            <<  __func__ << ": "
@@ -33,7 +33,7 @@ using namespace librbd::cache::rwl;
 typedef ReplicatedWriteLog<ImageCtx>::Extent Extent;
 typedef ReplicatedWriteLog<ImageCtx>::Extents Extents;
 
-const unsigned long int ops_appended_together = MAX_ALLOC_PER_TRANSACTION;
+const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION;
 
 template <typename I>
 ReplicatedWriteLog<I>::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::ImageCacheState<I>* cache_state)
@@ -50,8 +50,6 @@ ReplicatedWriteLog<I>::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::Imag
       "librbd::cache::ReplicatedWriteLog::m_lock", this))),
     m_blockguard_lock(ceph::make_mutex(util::unique_lock_name(
       "librbd::cache::ReplicatedWriteLog::m_blockguard_lock", this))),
-    m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name(
-      "librbd::cache::ReplicatedWriteLog::m_entry_bl_lock", this))),
     m_thread_pool(image_ctx.cct, "librbd::cache::ReplicatedWriteLog::thread_pool", "tp_rwl",
                   4,
                   ""),
@@ -132,43 +130,43 @@ void ReplicatedWriteLog<I>::perf_start(std::string name) {
   plb.add_u64_avg(l_librbd_rwl_log_op_bytes, "log_op_bytes", "Average log append bytes");
 
   plb.add_time_avg(
-      l_librbd_rwl_req_arr_to_all_t, "req_arr_to_all_t",
-      "Average arrival to allocation time (time deferred for overlap)");
+    l_librbd_rwl_req_arr_to_all_t, "req_arr_to_all_t",
+    "Average arrival to allocation time (time deferred for overlap)");
   plb.add_time_avg(
-      l_librbd_rwl_req_arr_to_dis_t, "req_arr_to_dis_t",
-      "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
+    l_librbd_rwl_req_arr_to_dis_t, "req_arr_to_dis_t",
+    "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
   plb.add_time_avg(
-      l_librbd_rwl_req_all_to_dis_t, "req_all_to_dis_t",
-      "Average allocation to dispatch time (time deferred for log resources)");
+    l_librbd_rwl_req_all_to_dis_t, "req_all_to_dis_t",
+    "Average allocation to dispatch time (time deferred for log resources)");
   plb.add_time_avg(
-      l_librbd_rwl_wr_latency, "wr_latency",
-      "Latency of writes (persistent completion)");
+    l_librbd_rwl_wr_latency, "wr_latency",
+    "Latency of writes (persistent completion)");
   plb.add_u64_counter_histogram(
     l_librbd_rwl_wr_latency_hist, "wr_latency_bytes_histogram",
     op_hist_x_axis_config, op_hist_y_axis_config,
     "Histogram of write request latency (nanoseconds) vs. bytes written");
   plb.add_time_avg(
-      l_librbd_rwl_wr_caller_latency, "caller_wr_latency",
-      "Latency of write completion to caller");
+    l_librbd_rwl_wr_caller_latency, "caller_wr_latency",
+    "Latency of write completion to caller");
   plb.add_time_avg(
-      l_librbd_rwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t",
-      "Average arrival to allocation time (time deferred for overlap)");
+    l_librbd_rwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t",
+    "Average arrival to allocation time (time deferred for overlap)");
   plb.add_time_avg(
-      l_librbd_rwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t",
-      "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
+    l_librbd_rwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t",
+    "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
   plb.add_time_avg(
-      l_librbd_rwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t",
-      "Average allocation to dispatch time (time deferred for log resources)");
+    l_librbd_rwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t",
+    "Average allocation to dispatch time (time deferred for log resources)");
   plb.add_time_avg(
-      l_librbd_rwl_nowait_wr_latency, "wr_latency_nw",
-      "Latency of writes (persistent completion) not deferred for free space");
+    l_librbd_rwl_nowait_wr_latency, "wr_latency_nw",
+    "Latency of writes (persistent completion) not deferred for free space");
   plb.add_u64_counter_histogram(
     l_librbd_rwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram",
     op_hist_x_axis_config, op_hist_y_axis_config,
     "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space");
   plb.add_time_avg(
-      l_librbd_rwl_nowait_wr_caller_latency, "caller_wr_latency_nw",
-      "Latency of write completion to callerfor writes not deferred for free space");
+    l_librbd_rwl_nowait_wr_caller_latency, "caller_wr_latency_nw",
+    "Latency of write completion to callerfor writes not deferred for free space");
   plb.add_time_avg(l_librbd_rwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time");
   plb.add_u64_counter_histogram(
     l_librbd_rwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram",
@@ -183,21 +181,21 @@ void ReplicatedWriteLog<I>::perf_start(std::string name) {
     "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written");
 
   plb.add_time_avg(
-      l_librbd_rwl_log_op_buf_to_app_t, "op_buf_to_app_t",
-      "Average buffer persist to log append time (write data persist/replicate + wait for append time)");
+    l_librbd_rwl_log_op_buf_to_app_t, "op_buf_to_app_t",
+    "Average buffer persist to log append time (write data persist/replicate + wait for append time)");
   plb.add_time_avg(
-      l_librbd_rwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t",
-      "Average buffer persist time (write data persist/replicate time)");
+    l_librbd_rwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t",
+    "Average buffer persist time (write data persist/replicate time)");
   plb.add_u64_counter_histogram(
     l_librbd_rwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram",
     op_hist_x_axis_config, op_hist_y_axis_config,
     "Histogram of write buffer persist time (nanoseconds) vs. bytes written");
   plb.add_time_avg(
-      l_librbd_rwl_log_op_app_to_cmp_t, "op_app_to_cmp_t",
-      "Average log append to persist complete time (log entry append/replicate + wait for complete time)");
+    l_librbd_rwl_log_op_app_to_cmp_t, "op_app_to_cmp_t",
+    "Average log append to persist complete time (log entry append/replicate + wait for complete time)");
   plb.add_time_avg(
-      l_librbd_rwl_log_op_app_to_appc_t, "op_app_to_appc_t",
-      "Average log append to persist complete time (log entry append/replicate time)");
+    l_librbd_rwl_log_op_app_to_appc_t, "op_app_to_appc_t",
+    "Average log append to persist complete time (log entry append/replicate time)");
   plb.add_u64_counter_histogram(
     l_librbd_rwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram",
     op_hist_x_axis_config, op_hist_y_axis_config,
@@ -409,7 +407,7 @@ void ReplicatedWriteLog<I>::rwl_init(Context *on_finish, DeferredContexts &later
                << " first_valid=" << m_first_valid_entry
                << ", first_free=" << m_first_free_entry
                << ", flushed_sync_gen=" << m_flushed_sync_gen
-               << ", current_sync_gen=" << m_current_sync_gen << dendl;
+               << ", m_current_sync_gen=" << m_current_sync_gen << dendl;
   if (m_first_free_entry == m_first_valid_entry) {
     ldout(cct,1) << "write log is empty" << dendl;
     m_cache_state->empty = true;
@@ -418,15 +416,15 @@ void ReplicatedWriteLog<I>::rwl_init(Context *on_finish, DeferredContexts &later
   // TODO: Will init sync point, this will be covered in later PR.
   //  init_flush_new_sync_point(later);
   ++m_current_sync_gen;
-  auto new_sync_point = std::make_shared<SyncPointT>(*this, m_current_sync_gen);
-  m_current_sync_point = new_sync_point;
+  m_current_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen,
+                                                     this->m_image_ctx.cct);
 
   m_initialized = true;
   // Start the thread
   m_thread_pool.start();
 
   m_periodic_stats_enabled = m_cache_state->log_periodic_stats;
-  /* Do these after we drop m_lock */
+  /* Do these after we drop lock */
   later.add(new LambdaContext([this](int r) {
         if (m_periodic_stats_enabled) {
           /* Log stats for the first time */
@@ -467,7 +465,23 @@ void ReplicatedWriteLog<I>::init(Context *on_finish) {
 
 template <typename I>
 void ReplicatedWriteLog<I>::shut_down(Context *on_finish) {
-  // TODO: This is cover in later PR.
+  // Here we only close pmem pool file and remove the pool file.
+  // TODO: We'll continue to update this part in later PRs.
+  if (m_log_pool) {
+    ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
+    pmemobj_close(m_log_pool);
+  }
+  if (m_log_is_poolset) {
+    ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
+  } else {
+    ldout(m_image_ctx.cct, 5) << "Removing empty pool file: "
+                              << m_log_pool_name << dendl;
+    if (remove(m_log_pool_name.c_str()) != 0) {
+      lderr(m_image_ctx.cct) << "failed to remove empty pool \""
+                            << m_log_pool_name << "\": "
+                             << pmemobj_errormsg() << dendl;
+    }
+  }
   on_finish->complete(0);
 }
 
@@ -483,28 +497,17 @@ void ReplicatedWriteLog<I>::aio_write(Extents &&image_extents,
                                       int fadvise_flags,
                                       Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(cct, 20) << "aio_write" << dendl;
-  }
+
+  ldout(cct, 20) << "aio_write" << dendl;
+
   utime_t now = ceph_clock_now();
   m_perfcounter->inc(l_librbd_rwl_wr_req, 1);
 
   ceph_assert(m_initialized);
-  {
-    std::shared_lock image_locker(m_image_ctx.image_lock);
-    if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
-      on_finish->complete(-EROFS);
-      return;
-    }
-  }
-
-  if (ExtentsSummary<Extents>(image_extents).total_bytes == 0) {
-    on_finish->complete(0);
-    return;
-  }
 
   auto *write_req =
-    new C_WriteRequestT(*this, now, std::move(image_extents), std::move(bl), fadvise_flags, on_finish);
+    new C_WriteRequestT(*this, now, std::move(image_extents), std::move(bl), fadvise_flags,
+                        m_lock, m_perfcounter, on_finish);
   m_perfcounter->inc(l_librbd_rwl_wr_bytes, write_req->image_extents_summary.total_bytes);
 
   /* The lambda below will be called when the block guard for all
@@ -515,13 +518,13 @@ void ReplicatedWriteLog<I>::aio_write(Extents &&image_extents,
       alloc_and_dispatch_io_req(write_req);
     });
 
-  detain_guarded_request(GuardedRequest(write_req->image_extents_summary.block_extent(),
-                                        guarded_ctx));
+  detain_guarded_request(write_req, guarded_ctx);
 }
 
 template <typename I>
 void ReplicatedWriteLog<I>::aio_discard(uint64_t offset, uint64_t length,
-                                        uint32_t discard_granularity_bytes, Context *on_finish) {
+                                        uint32_t discard_granularity_bytes,
+                                       Context *on_finish) {
 }
 
 template <typename I>
@@ -556,6 +559,11 @@ template <typename I>
 void ReplicatedWriteLog<I>::invalidate(Context *on_finish) {
 }
 
+template <typename I>
+CephContext *ReplicatedWriteLog<I>::get_context() {
+  return m_image_ctx.cct;
+}
+
 template <typename I>
 BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req)
 {
@@ -563,44 +571,37 @@ BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_helper(GuardedRequ
   BlockGuardCell *cell;
 
   ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(cct, 20) << dendl;
-  }
+  ldout(cct, 20) << dendl;
 
   int r = m_write_log_guard.detain(req.block_extent, &req, &cell);
   ceph_assert(r>=0);
   if (r > 0) {
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
-                     << "req=" << req << dendl;
-    }
+    ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
+                   << "req=" << req << dendl;
     return nullptr;
   }
 
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
-  }
+  ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
   return cell;
 }
 
 template <typename I>
-BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_barrier_helper(GuardedRequest &req)
+BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_barrier_helper(
+  GuardedRequest &req)
 {
   BlockGuardCell *cell = nullptr;
 
   ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(m_image_ctx.cct, 20) << dendl;
-  }
+  ldout(m_image_ctx.cct, 20) << dendl;
 
   if (m_barrier_in_progress) {
-    req.guard_ctx->m_state.queued = true;
+    req.guard_ctx->state.queued = true;
     m_awaiting_barrier.push_back(req);
   } else {
-    bool barrier = req.guard_ctx->m_state.barrier;
+    bool barrier = req.guard_ctx->state.barrier;
     if (barrier) {
       m_barrier_in_progress = true;
-      req.guard_ctx->m_state.current_barrier = true;
+      req.guard_ctx->state.current_barrier = true;
     }
     cell = detain_guarded_request_helper(req);
     if (barrier) {
@@ -613,19 +614,20 @@ BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_barrier_helper(Gua
 }
 
 template <typename I>
-void ReplicatedWriteLog<I>::detain_guarded_request(GuardedRequest &&req)
+void ReplicatedWriteLog<I>::detain_guarded_request(
+  C_BlockIORequestT *request, GuardedRequestFunctionContext *guarded_ctx)
 {
+  //TODO: add is_barrier for flush request in later PRs
+  auto req = GuardedRequest(request->image_extents_summary.block_extent(), guarded_ctx);
   BlockGuardCell *cell = nullptr;
 
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(m_image_ctx.cct, 20) << dendl;
-  }
+  ldout(m_image_ctx.cct, 20) << dendl;
   {
     std::lock_guard locker(m_blockguard_lock);
     cell = detain_guarded_request_barrier_helper(req);
   }
   if (cell) {
-    req.guard_ctx->m_cell = cell;
+    req.guard_ctx->cell = cell;
     req.guard_ctx->complete(0);
   }
 }
@@ -635,47 +637,39 @@ void ReplicatedWriteLog<I>::release_guarded_request(BlockGuardCell *released_cel
 {
   CephContext *cct = m_image_ctx.cct;
   WriteLogGuard::BlockOperations block_reqs;
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(cct, 20) << "released_cell=" << released_cell << dendl;
-  }
+  ldout(cct, 20) << "released_cell=" << released_cell << dendl;
 
   {
     std::lock_guard locker(m_blockguard_lock);
     m_write_log_guard.release(released_cell, &block_reqs);
 
     for (auto &req : block_reqs) {
-      req.guard_ctx->m_state.detained = true;
+      req.guard_ctx->state.detained = true;
       BlockGuardCell *detained_cell = detain_guarded_request_helper(req);
       if (detained_cell) {
-        if (req.guard_ctx->m_state.current_barrier) {
+        if (req.guard_ctx->state.current_barrier) {
           /* The current barrier is acquiring the block guard, so now we know its cell */
           m_barrier_cell = detained_cell;
           /* detained_cell could be == released_cell here */
-          if (RWL_VERBOSE_LOGGING) {
-            ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
-          }
+          ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
         }
-        req.guard_ctx->m_cell = detained_cell;
+        req.guard_ctx->cell = detained_cell;
         m_work_queue.queue(req.guard_ctx);
       }
     }
 
     if (m_barrier_in_progress && (released_cell == m_barrier_cell)) {
-      if (RWL_VERBOSE_LOGGING) {
-        ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
-      }
+      ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
       /* The released cell is the current barrier request */
       m_barrier_in_progress = false;
       m_barrier_cell = nullptr;
       /* Move waiting requests into the blockguard. Stop if there's another barrier */
       while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) {
         auto &req = m_awaiting_barrier.front();
-        if (RWL_VERBOSE_LOGGING) {
-          ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
-        }
+        ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
         BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req);
         if (detained_cell) {
-          req.guard_ctx->m_cell = detained_cell;
+          req.guard_ctx->cell = detained_cell;
           m_work_queue.queue(req.guard_ctx);
         }
         m_awaiting_barrier.pop_front();
@@ -683,9 +677,7 @@ void ReplicatedWriteLog<I>::release_guarded_request(BlockGuardCell *released_cel
     }
   }
 
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(cct, 20) << "exit" << dendl;
-  }
+  ldout(cct, 20) << "exit" << dendl;
 }
 
 /*
@@ -695,13 +687,11 @@ void ReplicatedWriteLog<I>::release_guarded_request(BlockGuardCell *released_cel
 template <typename I>
 void ReplicatedWriteLog<I>::append_scheduled_ops(void)
 {
-  GenericLogOperationsT ops;
+  GenericLogOperations ops;
   int append_result = 0;
   bool ops_remain = false;
   bool appending = false; /* true if we set m_appending */
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(m_image_ctx.cct, 20) << dendl;
-  }
+  ldout(m_image_ctx.cct, 20) << dendl;
   do {
     ops.clear();
 
@@ -709,9 +699,7 @@ void ReplicatedWriteLog<I>::append_scheduled_ops(void)
       std::lock_guard locker(m_lock);
       if (!appending && m_appending) {
         /* Another thread is appending */
-        if (RWL_VERBOSE_LOGGING) {
-          ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
-        }
+        ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
         return;
       }
       if (m_ops_to_append.size()) {
@@ -719,15 +707,14 @@ void ReplicatedWriteLog<I>::append_scheduled_ops(void)
         m_appending = true;
         auto last_in_batch = m_ops_to_append.begin();
         unsigned int ops_to_append = m_ops_to_append.size();
-        if (ops_to_append > ops_appended_together) {
-          ops_to_append = ops_appended_together;
+        if (ops_to_append > OPS_APPENDED_TOGETHER) {
+          ops_to_append = OPS_APPENDED_TOGETHER;
         }
         std::advance(last_in_batch, ops_to_append);
         ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
         ops_remain = true; /* Always check again before leaving */
-        if (RWL_VERBOSE_LOGGING) {
-          ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " << m_ops_to_append.size() << " remain" << dendl;
-        }
+        ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
+                                  << m_ops_to_append.size() << " remain" << dendl;
       } else {
         ops_remain = false;
         if (appending) {
@@ -770,10 +757,10 @@ void ReplicatedWriteLog<I>::enlist_op_appender()
  * all prior log entries are persisted everywhere.
  */
 template <typename I>
-void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsT &ops)
+void ReplicatedWriteLog<I>::schedule_append(GenericLogOperations &ops)
 {
   bool need_finisher;
-  GenericLogOperationsVectorT appending;
+  GenericLogOperationsVector appending;
 
   std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
   {
@@ -793,9 +780,9 @@ void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsT &ops)
 }
 
 template <typename I>
-void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsVectorT &ops)
+void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsVector &ops)
 {
-  GenericLogOperationsT to_append(ops.begin(), ops.end());
+  GenericLogOperations to_append(ops.begin(), ops.end());
 
   schedule_append(to_append);
 }
@@ -808,11 +795,9 @@ const unsigned long int ops_flushed_together = 4;
 template <typename I>
 void ReplicatedWriteLog<I>::flush_then_append_scheduled_ops(void)
 {
-  GenericLogOperationsT ops;
+  GenericLogOperations ops;
   bool ops_remain = false;
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(m_image_ctx.cct, 20) << dendl;
-  }
+  ldout(m_image_ctx.cct, 20) << dendl;
   do {
     {
       ops.clear();
@@ -823,15 +808,12 @@ void ReplicatedWriteLog<I>::flush_then_append_scheduled_ops(void)
         if (ops_to_flush > ops_flushed_together) {
           ops_to_flush = ops_flushed_together;
         }
-        if (RWL_VERBOSE_LOGGING) {
-          ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
-        }
+        ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
         std::advance(last_in_batch, ops_to_flush);
         ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
         ops_remain = !m_ops_to_flush.empty();
-        if (RWL_VERBOSE_LOGGING) {
-          ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", " << m_ops_to_flush.size() << " remain" << dendl;
-        }
+        ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", "
+                                  << m_ops_to_flush.size() << " remain" << dendl;
       } else {
         ops_remain = false;
       }
@@ -869,13 +851,11 @@ void ReplicatedWriteLog<I>::enlist_op_flusher()
  * then get their log entries appended.
  */
 template <typename I>
-void ReplicatedWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVectorT &ops)
+void ReplicatedWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVector &ops)
 {
-  GenericLogOperationsT to_flush(ops.begin(), ops.end());
+  GenericLogOperations to_flush(ops.begin(), ops.end());
   bool need_finisher;
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(m_image_ctx.cct, 20) << dendl;
-  }
+  ldout(m_image_ctx.cct, 20) << dendl;
   {
     std::lock_guard locker(m_lock);
 
@@ -898,12 +878,7 @@ template <typename V>
 void ReplicatedWriteLog<I>::flush_pmem_buffer(V& ops)
 {
   for (auto &operation : ops) {
-    if (operation->is_write() || operation->is_writesame()) {
-      operation->buf_persist_time = ceph_clock_now();
-      auto write_entry = operation->get_write_log_entry();
-
-      pmemobj_flush(m_log_pool, write_entry->pmem_buffer, write_entry->write_bytes());
-    }
+    operation->flush_pmem_buf_to_cache(m_log_pool);
   }
 
   /* Drain once for all */
@@ -911,12 +886,10 @@ void ReplicatedWriteLog<I>::flush_pmem_buffer(V& ops)
 
   utime_t now = ceph_clock_now();
   for (auto &operation : ops) {
-    if (operation->is_write() || operation->is_writesame()) {
+    if (operation->reserved_allocated()) {
       operation->buf_persist_comp_time = now;
     } else {
-      if (RWL_VERBOSE_LOGGING) {
-        ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
-      }
+      ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
     }
   }
 }
@@ -925,10 +898,10 @@ void ReplicatedWriteLog<I>::flush_pmem_buffer(V& ops)
  * Allocate the (already reserved) write log entries for a set of operations.
  *
  * Locking:
- * Acquires m_lock
+ * Acquires lock
  */
 template <typename I>
-void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperationsT &ops)
+void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops)
 {
   TOID(struct WriteLogPoolRoot) pool_root;
   pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
@@ -948,9 +921,7 @@ void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperationsT &ops)
     log_entry->pmem_entry = &pmem_log_entries[entry_index];
     log_entry->ram_entry.entry_valid = 1;
     m_log_entries.push_back(log_entry);
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
-    }
+    ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
   }
 }
 
@@ -959,7 +930,7 @@ void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperationsT &ops)
  * be contiguous in persistent memory.
  */
 template <typename I>
-void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVectorT &ops)
+void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops)
 {
   if (ops.empty()) {
     return;
@@ -969,12 +940,12 @@ void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVectorT &op
     ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry);
   }
 
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
-                               << "start address=" << ops.front()->get_log_entry()->pmem_entry << " "
-                               << "bytes=" << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
-                               << dendl;
-  }
+  ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
+                             << "start address="
+                            << ops.front()->get_log_entry()->pmem_entry << " "
+                             << "bytes="
+                            << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
+                             << dendl;
   pmemobj_flush(m_log_pool,
                 ops.front()->get_log_entry()->pmem_entry,
                 ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)));
@@ -986,10 +957,10 @@ void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVectorT &op
  * of these must already have been persisted to its reserved area.
  */
 template <typename I>
-int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperationsT &ops)
+int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperations &ops)
 {
   CephContext *cct = m_image_ctx.cct;
-  GenericLogOperationsVectorT entries_to_flush;
+  GenericLogOperationsVector entries_to_flush;
   TOID(struct WriteLogPoolRoot) pool_root;
   pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
   int ret = 0;
@@ -999,7 +970,7 @@ int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperationsT &ops)
   if (ops.empty()) {
     return 0;
   }
-  entries_to_flush.reserve(ops_appended_together);
+  entries_to_flush.reserve(OPS_APPENDED_TOGETHER);
 
   /* Write log entries to ring and persist */
   utime_t now = ceph_clock_now();
@@ -1009,34 +980,27 @@ int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperationsT &ops)
        * tail of the ring */
       if (entries_to_flush.back()->get_log_entry()->log_entry_index >
           operation->get_log_entry()->log_entry_index) {
-        if (RWL_VERBOSE_LOGGING) {
-          ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
-                                     << "operation=[" << *operation << "]" << dendl;
-        }
+        ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
+                                   << "operation=[" << *operation << "]" << dendl;
         flush_op_log_entries(entries_to_flush);
         entries_to_flush.clear();
         now = ceph_clock_now();
       }
     }
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
-                                 << operation->get_log_entry()->log_entry_index << " "
-                                 << "from " << &operation->get_log_entry()->ram_entry << " "
-                                 << "to " << operation->get_log_entry()->pmem_entry << " "
-                                 << "operation=[" << *operation << "]" << dendl;
-    }
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(m_image_ctx.cct, 05) << "APPENDING: index="
-                                 << operation->get_log_entry()->log_entry_index << " "
-                                 << "operation=[" << *operation << "]" << dendl;
-    }
+    ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
+                               << operation->get_log_entry()->log_entry_index << " "
+                               << "from " << &operation->get_log_entry()->ram_entry << " "
+                               << "to " << operation->get_log_entry()->pmem_entry << " "
+                               << "operation=[" << *operation << "]" << dendl;
+    ldout(m_image_ctx.cct, 05) << "APPENDING: index="
+                               << operation->get_log_entry()->log_entry_index << " "
+                               << "operation=[" << *operation << "]" << dendl;
     operation->log_append_time = now;
     *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry;
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(m_image_ctx.cct, 20) << "APPENDING: index="
-                                 << operation->get_log_entry()->log_entry_index << " "
-                                 << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry << "]" << dendl;
-    }
+    ldout(m_image_ctx.cct, 20) << "APPENDING: index="
+                               << operation->get_log_entry()->log_entry_index << " "
+                               << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry
+                              << "]" << dendl;
     entries_to_flush.push_back(operation);
   }
   flush_op_log_entries(entries_to_flush);
@@ -1052,18 +1016,17 @@ int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperationsT &ops)
   TX_BEGIN(m_log_pool) {
     D_RW(pool_root)->first_free_entry = m_first_free_entry;
     for (auto &operation : ops) {
-      if (operation->is_write() || operation->is_writesame()) {
-        auto write_op = (std::shared_ptr<WriteLogOperationT>&) operation;
+      if (operation->reserved_allocated()) {
+        auto write_op = (std::shared_ptr<WriteLogOperation>&) operation;
         pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
       } else {
-        if (RWL_VERBOSE_LOGGING) {
-          ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
-        }
+        ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
       }
     }
   } TX_ONCOMMIT {
   } TX_ONABORT {
-    lderr(cct) << "failed to commit " << ops.size() << " log entries (" << m_log_pool_name << ")" << dendl;
+    lderr(cct) << "failed to commit " << ops.size()
+               << " log entries (" << m_log_pool_name << ")" << dendl;
     ceph_assert(false);
     ret = -EIO;
   } TX_FINALLY {
@@ -1071,7 +1034,8 @@ int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperationsT &ops)
 
   utime_t tx_end = ceph_clock_now();
   m_perfcounter->tinc(l_librbd_rwl_append_tx_t, tx_end - tx_start);
-  m_perfcounter->hinc(l_librbd_rwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
+  m_perfcounter->hinc(
+    l_librbd_rwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
   for (auto &operation : ops) {
     operation->log_append_comp_time = tx_end;
   }
@@ -1083,48 +1047,32 @@ int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperationsT &ops)
  * Complete a set of write ops with the result of append_op_entries.
  */
 template <typename I>
-void ReplicatedWriteLog<I>::complete_op_log_entries(GenericLogOperationsT &&ops, const int result)
+void ReplicatedWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops,
+                                                    const int result)
 {
   GenericLogEntries dirty_entries;
   int published_reserves = 0;
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
-  }
+  ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
   for (auto &op : ops) {
     utime_t now = ceph_clock_now();
     auto log_entry = op->get_log_entry();
     log_entry->completed = true;
-    if (op->is_writing_op()) {
-      op->get_gen_write_op()->sync_point->log_entry->writes_completed++;
+    if (op->reserved_allocated()) {
+      op->mark_log_entry_completed();
       dirty_entries.push_back(log_entry);
-    }
-    if (op->is_write() || op->is_writesame()) {
       published_reserves++;
     }
-    if (op->is_discard()) {
-      if (RWL_VERBOSE_LOGGING) {
-        ldout(m_image_ctx.cct, 20) << __func__ << ": completing discard" << dendl;
-      }
-    }
     op->complete(result);
-    if (op->is_write()) {
-      m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_buf_t, op->buf_persist_time - op->dispatch_time);
-    }
-    m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t, op->log_append_time - op->dispatch_time);
+    m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t,
+                       op->log_append_time - op->dispatch_time);
     m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
-    m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist, utime_t(now - op->dispatch_time).to_nsec(),
+    m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist,
+                       utime_t(now - op->dispatch_time).to_nsec(),
                         log_entry->ram_entry.write_bytes);
-    if (op->is_write()) {
-      utime_t buf_lat = op->buf_persist_comp_time - op->buf_persist_time;
-      m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_bufc_t, buf_lat);
-      m_perfcounter->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist, buf_lat.to_nsec(),
-                          log_entry->ram_entry.write_bytes);
-      m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_app_t, op->log_append_time - op->buf_persist_time);
-    }
     utime_t app_lat = op->log_append_comp_time - op->log_append_time;
     m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_appc_t, app_lat);
     m_perfcounter->hinc(l_librbd_rwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
-                        log_entry->ram_entry.write_bytes);
+                      log_entry->ram_entry.write_bytes);
     m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_cmp_t, now - op->log_append_time);
   }
 
@@ -1167,7 +1115,7 @@ void ReplicatedWriteLog<I>::dispatch_deferred_writes(void)
         std::lock_guard locker(m_lock);
         ceph_assert(m_dispatching_deferred_ops);
         if (allocated) {
-          /* On the 2..n-1 th time we get m_lock, front_req->alloc_resources() will
+          /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will
            * have succeeded, and we'll need to pop it off the deferred ops list
            * here. */
           ceph_assert(front_req);
@@ -1229,13 +1177,11 @@ void ReplicatedWriteLog<I>::dispatch_deferred_writes(void)
  * deferred write
  */
 template <typename I>
-void ReplicatedWriteLog<I>::release_write_lanes(C_WriteRequestT *write_req)
+void ReplicatedWriteLog<I>::release_write_lanes(C_BlockIORequestT *req)
 {
   {
     std::lock_guard locker(m_lock);
-    ceph_assert(write_req->resources.allocated);
-    m_free_lanes += write_req->image_extents.size();
-    write_req->resources.allocated = false;
+    m_free_lanes += req->image_extents.size();
   }
   dispatch_deferred_writes();
 }
@@ -1259,9 +1205,7 @@ void ReplicatedWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
       dispatch_here = req->alloc_resources();
     }
     if (dispatch_here) {
-      if (RWL_VERBOSE_LOGGING) {
-        ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
-      }
+      ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
       req->dispatch();
     } else {
       req->deferred();
@@ -1269,17 +1213,128 @@ void ReplicatedWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
         std::lock_guard locker(m_lock);
         m_deferred_ios.push_back(req);
       }
-      if (RWL_VERBOSE_LOGGING) {
-        ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
-      }
+      ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
       dispatch_deferred_writes();
     }
   }
 }
+
+template <typename I>
+bool ReplicatedWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
+  bool alloc_succeeds = true;
+  bool no_space = false;
+  uint64_t bytes_allocated = 0;
+  uint64_t bytes_cached = 0;
+  uint64_t bytes_dirtied = 0;
+  uint64_t num_lanes = 0;
+  uint64_t num_unpublished_reserves = 0;
+  uint64_t num_log_entries = 0;
+
+  // Setup buffer, and get all the number of required resources
+  req->setup_buffer_resources(bytes_cached, bytes_dirtied, bytes_allocated,
+                              num_lanes, num_log_entries, num_unpublished_reserves);
+
+  {
+    std::lock_guard locker(m_lock);
+    if (m_free_lanes < num_lanes) {
+      req->set_io_waited_for_lanes(true);
+      ldout(m_image_ctx.cct, 20) << "not enough free lanes (need "
+                                 <<  num_lanes
+                                 << ", have " << m_free_lanes << ") "
+                                 << *req << dendl;
+      alloc_succeeds = false;
+      /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
+    }
+    if (m_free_log_entries < num_log_entries) {
+      req->set_io_waited_for_entries(true);
+      ldout(m_image_ctx.cct, 20) << "not enough free entries (need "
+                                 << num_log_entries
+                                 << ", have " << m_free_log_entries << ") "
+                                 << *req << dendl;
+      alloc_succeeds = false;
+      no_space = true; /* Entries must be retired */
+    }
+    /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
+    if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
+      if (!req->has_io_waited_for_buffers()) {
+        req->set_io_waited_for_entries(true);
+        ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap="
+                                 << m_bytes_allocated_cap
+                                  << ", allocated=" << m_bytes_allocated
+                                  << ") in write [" << *req << "]" << dendl;
+      }
+      alloc_succeeds = false;
+      no_space = true; /* Entries must be retired */
+    }
+  }
+
+  std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
+  if (alloc_succeeds) {
+    for (auto &buffer : buffers) {
+      utime_t before_reserve = ceph_clock_now();
+      buffer.buffer_oid = pmemobj_reserve(m_log_pool,
+                                          &buffer.buffer_alloc_action,
+                                          buffer.allocation_size,
+                                          0 /* Object type */);
+      buffer.allocation_lat = ceph_clock_now() - before_reserve;
+      if (TOID_IS_NULL(buffer.buffer_oid)) {
+        if (!req->has_io_waited_for_buffers()) {
+          req->set_io_waited_for_entries(true);
+        }
+        ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: "
+                                  << pmemobj_errormsg() << ". "
+                                  << *req << dendl;
+        alloc_succeeds = false;
+        no_space = true; /* Entries need to be retired */
+        break;
+      } else {
+        buffer.allocated = true;
+      }
+      ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
+                                 << "." << buffer.buffer_oid.oid.off
+                                 << ", size=" << buffer.allocation_size << dendl;
+    }
+  }
+
+  if (alloc_succeeds) {
+    std::lock_guard locker(m_lock);
+    /* We need one free log entry per extent (each is a separate entry), and
+     * one free "lane" for remote replication. */
+    if ((m_free_lanes >= num_lanes) &&
+        (m_free_log_entries >= num_log_entries)) {
+      m_free_lanes -= num_lanes;
+      m_free_log_entries -= num_log_entries;
+      m_unpublished_reserves += num_unpublished_reserves;
+      m_bytes_allocated += bytes_allocated;
+      m_bytes_cached += bytes_cached;
+      m_bytes_dirty += bytes_dirtied;
+    } else {
+      alloc_succeeds = false;
+    }
+  }
+
+  if (!alloc_succeeds) {
+    /* On alloc failure, free any buffers we did allocate */
+    for (auto &buffer : buffers) {
+      if (buffer.allocated) {
+        pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1);
+      }
+    }
+    if (no_space) {
+      /* Expedite flushing and/or retiring */
+      std::lock_guard locker(m_lock);
+      m_alloc_failed_since_retire = true;
+      m_last_alloc_fail = ceph_clock_now();
+    }
+  }
+
+  req->set_allocated(alloc_succeeds);
+
+  return alloc_succeeds;
+}
+
 } // namespace cache
 } // namespace librbd
 
-#ifndef TEST_F
 template class librbd::cache::ReplicatedWriteLog<librbd::ImageCtx>;
 template class librbd::cache::ImageCache<librbd::ImageCtx>;
-#endif
index adbdddca9bdfd160af4a951f7d1b31d0f1f39fa2..dea992f77cbe76f29a4aae96d5c280e770091e63 100644 (file)
@@ -29,11 +29,10 @@ namespace cache {
 namespace rwl {
 
 class SyncPointLogEntry;
-class GeneralWriteLogEntry;
+class GenericWriteLogEntry;
 class WriteLogEntry;
 class GenericLogEntry;
 
-typedef std::list<std::shared_ptr<GeneralWriteLogEntry>> GeneralWriteLogEntries;
 typedef std::list<std::shared_ptr<WriteLogEntry>> WriteLogEntries;
 typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
 
@@ -41,9 +40,6 @@ typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
 
 typedef librbd::BlockGuard<GuardedRequest> WriteLogGuard;
 
-template <typename T>
-struct C_GuardedBlockIORequest;
-
 class DeferredContexts;
 template <typename> class ImageCacheState;
 
@@ -53,8 +49,7 @@ struct C_BlockIORequest;
 template <typename T>
 struct C_WriteRequest;
 
-template <typename T>
-using GenericLogOperations = std::list<GenericLogOperationSharedPtr<T>>;
+using GenericLogOperations = std::list<GenericLogOperationSharedPtr>;
 
 } // namespace rwl
 
@@ -93,35 +88,46 @@ public:
   void invalidate(Context *on_finish);
   void flush(Context *on_finish) override;
 
-private:
   using This = ReplicatedWriteLog<ImageCtxT>;
-  using SyncPointT = rwl::SyncPoint<This>;
-  using GenericLogOperationT = rwl::GenericLogOperation<This>;
-  using GenericLogOperationSharedPtrT = rwl::GenericLogOperationSharedPtr<This>;
-  using WriteLogOperationT = rwl::WriteLogOperation<This>;
-  using WriteLogOperationSetT = rwl::WriteLogOperationSet<This>;
-  using SyncPointLogOperationT = rwl::SyncPointLogOperation<This>;
-  using GenericLogOperationsT = rwl::GenericLogOperations<This>;
-  using GenericLogOperationsVectorT = rwl::GenericLogOperationsVector<This>;
-  using C_BlockIORequestT = rwl::C_BlockIORequest<This>;
   using C_WriteRequestT = rwl::C_WriteRequest<This>;
-
-  friend class rwl::SyncPoint<This>;
-  friend class rwl::GenericLogOperation<This>;
-  friend class rwl::GeneralWriteLogOperation<This>;
-  friend class rwl::WriteLogOperation<This>;
-  friend class rwl::WriteLogOperationSet<This>;
-  friend class rwl::SyncPointLogOperation<This>;
-  friend struct rwl::C_GuardedBlockIORequest<This>;
-  friend struct rwl::C_BlockIORequest<This>;
-  friend struct rwl::C_WriteRequest<This>;
+  using C_BlockIORequestT = rwl::C_BlockIORequest<This>;
+  CephContext * get_context();
+  void release_guarded_request(BlockGuardCell *cell);
+  void release_write_lanes(C_BlockIORequestT *req);
+  bool alloc_resources(C_BlockIORequestT *req);
+  template <typename V>
+  void flush_pmem_buffer(V& ops);
+  void schedule_append(rwl::GenericLogOperationsVector &ops);
+  void schedule_flush_and_append(rwl::GenericLogOperationsVector &ops);
+  std::shared_ptr<rwl::SyncPoint> get_current_sync_point() {
+    return m_current_sync_point;
+  }
+  bool get_persist_on_flush() {
+    return m_persist_on_flush;
+  }
+  void inc_last_op_sequence_num() {
+    m_perfcounter->inc(l_librbd_rwl_log_ops, 1);
+    ++m_last_op_sequence_num;
+  }
+  uint64_t get_last_op_sequence_num() {
+    return m_last_op_sequence_num;
+  }
+  uint64_t get_current_sync_gen() {
+    return m_current_sync_gen;
+  }
+  unsigned int get_free_lanes() {
+    return m_free_lanes;
+  }
+  uint32_t get_free_log_entries() {
+    return m_free_log_entries;
+  }
+private:
   typedef std::list<rwl::C_WriteRequest<This> *> C_WriteRequests;
   typedef std::list<rwl::C_BlockIORequest<This> *> C_BlockIORequests;
 
   BlockGuardCell* detain_guarded_request_helper(rwl::GuardedRequest &req);
   BlockGuardCell* detain_guarded_request_barrier_helper(rwl::GuardedRequest &req);
-  void detain_guarded_request(rwl::GuardedRequest &&req);
-  void release_guarded_request(BlockGuardCell *cell);
+  void detain_guarded_request(C_BlockIORequestT *request, rwl::GuardedRequestFunctionContext *guarded_ctx);
 
   librbd::cache::rwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
 
@@ -159,7 +165,6 @@ private:
 
   /* Starts at 0 for a new write log. Incremented on every flush. */
   uint64_t m_current_sync_gen = 0;
-  std::shared_ptr<SyncPointT> m_current_sync_point = nullptr;
   /* Starts at 0 on each sync gen increase. Incremented before applied
      to an operation */
   uint64_t m_last_op_sequence_num = 0;
@@ -167,12 +172,6 @@ private:
   uint64_t m_flushed_sync_gen = 0;
 
   bool m_persist_on_write_until_flush = true;
-  /* True if it's safe to complete a user request in persist-on-flush
-   * mode before the write is persisted. This is only true if there is
-   * a local copy of the write data, or if local write failure always
-   * causes local node failure. */
-  bool m_persist_on_flush_early_user_comp = false; /* Assume local write failure does not cause node failure */
-  bool m_persist_on_flush = false; /* If false, persist each write before completion */
   bool m_flush_seen = false;
 
   AsyncOpTracker m_async_op_tracker;
@@ -187,13 +186,11 @@ private:
   mutable ceph::mutex m_deferred_dispatch_lock;
   /* Hold m_log_append_lock while appending or retiring log entries. */
   mutable ceph::mutex m_log_append_lock;
-
   /* Used for most synchronization */
   mutable ceph::mutex m_lock;
+
   /* Used in release/detain to make BlockGuard preserve submission order */
   mutable ceph::mutex m_blockguard_lock;
-  /* Used in WriteLogEntry::get_pmem_bl() to syncronize between threads making entries readable */
-  mutable ceph::mutex m_entry_bl_lock;
 
   /* Use m_blockguard_lock for the following 3 things */
   rwl::WriteLogGuard::BlockOperations m_awaiting_barrier;
@@ -203,19 +200,23 @@ private:
   bool m_appending = false;
   bool m_dispatching_deferred_ops = false;
 
-  GenericLogOperationsT m_ops_to_flush; /* Write ops needing flush in local log */
-  GenericLogOperationsT m_ops_to_append; /* Write ops needing event append in local log */
+  rwl::GenericLogOperations m_ops_to_flush; /* Write ops needing flush in local log */
+  rwl::GenericLogOperations m_ops_to_append; /* Write ops needing event append in local log */
 
   /* New entries are at the back. Oldest at the front */
   rwl::GenericLogEntries m_log_entries;
   rwl::GenericLogEntries m_dirty_log_entries;
 
+  PerfCounters *m_perfcounter = nullptr;
+
+  std::shared_ptr<rwl::SyncPoint> m_current_sync_point = nullptr;
+  bool m_persist_on_flush = false; /* If false, persist each write before completion */
+
   /* Writes that have left the block guard, but are waiting for resources */
   C_BlockIORequests m_deferred_ios;
   /* Throttle writes concurrently allocating & replicating */
-  unsigned int m_free_lanes = MAX_CONCURRENT_WRITES;
+  unsigned int m_free_lanes = rwl::MAX_CONCURRENT_WRITES;
   unsigned int m_unpublished_reserves = 0;
-  PerfCounters *m_perfcounter = nullptr;
 
   /* Initialized from config, then set false during shutdown */
   std::atomic<bool> m_periodic_stats_enabled = {false};
@@ -234,26 +235,20 @@ private:
 
   void rwl_init(Context *on_finish, rwl::DeferredContexts &later);
   void update_image_cache_state(Context *on_finish);
-  void start_workers();
   void wake_up();
 
   void dispatch_deferred_writes(void);
-  void release_write_lanes(C_WriteRequestT *write_req);
   void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
   void append_scheduled_ops(void);
   void enlist_op_appender();
-  void schedule_append(GenericLogOperationsVectorT &ops);
-  void schedule_append(GenericLogOperationsT &ops);
+  void schedule_append(rwl::GenericLogOperations &ops);
   void flush_then_append_scheduled_ops(void);
   void enlist_op_flusher();
-  void schedule_flush_and_append(GenericLogOperationsVectorT &ops);
-  template <typename V>
-  void flush_pmem_buffer(V& ops);
-  void alloc_op_log_entries(GenericLogOperationsT &ops);
-  void flush_op_log_entries(GenericLogOperationsVectorT &ops);
-  int append_op_log_entries(GenericLogOperationsT &ops);
-  void complete_op_log_entries(GenericLogOperationsT &&ops, const int r);
-  void schedule_complete_op_log_entries(GenericLogOperationsT &&ops, const int r);
+  void alloc_op_log_entries(rwl::GenericLogOperations &ops);
+  void flush_op_log_entries(rwl::GenericLogOperationsVector &ops);
+  int append_op_log_entries(rwl::GenericLogOperations &ops);
+  void complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r);
+  void schedule_complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r);
 };
 
 } // namespace cache
index cc5819a34f275facf27eaaa51b47f26c391e318d..dcf1d7e7901be6355f7fc1e977203998d9278752 100644 (file)
@@ -9,7 +9,7 @@
 #include "common/config_proxy.h"
 #include "common/ceph_json.h"
 
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::cache::rwl::ImageCacheState: " << this << " " \
                            <<  __func__ << ": "
index 50bf5a9a1c025f6703d01479f34a01c031e9debb..16858b5053764b5f9ec0b374e14ed80c22925e61 100644 (file)
@@ -4,7 +4,7 @@
 #include <iostream>
 #include "LogEntry.h"
 
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::cache::rwl::LogEntry: " << this << " " \
                            <<  __func__ << ": "
@@ -15,26 +15,6 @@ namespace cache {
 
 namespace rwl {
 
-bool GenericLogEntry::is_sync_point() {
-  return ram_entry.is_sync_point();
-}
-
-bool GenericLogEntry::is_discard() {
-  return ram_entry.is_discard();
-}
-
-bool GenericLogEntry::is_writesame() {
-  return ram_entry.is_writesame();
-}
-
-bool GenericLogEntry::is_write() {
-  return ram_entry.is_write();
-}
-
-bool GenericLogEntry::is_writer() {
-  return ram_entry.is_writer();
-}
-
 std::ostream& GenericLogEntry::format(std::ostream &os) const {
   os << "ram_entry=[" << ram_entry << "], "
      << "pmem_entry=" << (void*)pmem_entry << ", "
@@ -66,7 +46,7 @@ std::ostream &operator<<(std::ostream &os,
   return entry.format(os);
 }
 
-std::ostream& GeneralWriteLogEntry::format(std::ostream &os) const {
+std::ostream& GenericWriteLogEntry::format(std::ostream &os) const {
   GenericLogEntry::format(os);
   os << ", "
      << "sync_point_entry=[";
@@ -83,19 +63,38 @@ std::ostream& GeneralWriteLogEntry::format(std::ostream &os) const {
 };
 
 std::ostream &operator<<(std::ostream &os,
-                         const GeneralWriteLogEntry &entry) {
+                         const GenericWriteLogEntry &entry) {
   return entry.format(os);
 }
 
+void WriteLogEntry::init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation,
+                         uint64_t current_sync_gen, uint64_t last_op_sequence_num, bool persist_on_flush) {
+  ram_entry.has_data = 1;
+  ram_entry.write_data = allocation->buffer_oid;
+  ceph_assert(!TOID_IS_NULL(ram_entry.write_data));
+  pmem_buffer = D_RW(ram_entry.write_data);
+  ram_entry.sync_gen_number = current_sync_gen;
+  if (persist_on_flush) {
+    /* Persist on flush. Sequence #0 is never used. */
+    ram_entry.write_sequence_number = 0;
+  } else {
+     /* Persist on write */
+     ram_entry.write_sequence_number = last_op_sequence_num;
+     ram_entry.sequenced = 1;
+  }
+  ram_entry.sync_point = 0;
+  ram_entry.discard = 0;
+}
+
 void WriteLogEntry::init_pmem_bp() {
-  assert(!pmem_bp.have_raw());
+  ceph_assert(!pmem_bp.have_raw());
   pmem_bp = buffer::ptr(buffer::create_static(this->write_bytes(), (char*)pmem_buffer));
 }
 
 void WriteLogEntry::init_pmem_bl() {
   pmem_bl.clear();
   init_pmem_bp();
-  assert(pmem_bp.have_raw());
+  ceph_assert(pmem_bp.have_raw());
   int before_bl = pmem_bp.raw_nref();
   this->init_bl(pmem_bp, pmem_bl);
   int after_bl = pmem_bp.raw_nref();
@@ -111,9 +110,9 @@ unsigned int WriteLogEntry::reader_count() {
 }
 
 /* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */
-buffer::list& WriteLogEntry::get_pmem_bl(ceph::mutex &entry_bl_lock) {
+buffer::list& WriteLogEntry::get_pmem_bl() {
   if (0 == bl_refs) {
-    std::lock_guard locker(entry_bl_lock);
+    std::lock_guard locker(m_entry_bl_lock);
     if (0 == bl_refs) {
       init_pmem_bl();
     }
@@ -123,8 +122,8 @@ buffer::list& WriteLogEntry::get_pmem_bl(ceph::mutex &entry_bl_lock) {
 };
 
 /* Constructs a new bl containing copies of pmem_bp */
-void WriteLogEntry::copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl) {
-  this->get_pmem_bl(entry_bl_lock);
+void WriteLogEntry::copy_pmem_bl(bufferlist *out_bl) {
+  this->get_pmem_bl();
   /* pmem_bp is now initialized */
   buffer::ptr cloned_bp(pmem_bp.clone());
   out_bl->clear();
@@ -133,7 +132,7 @@ void WriteLogEntry::copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl)
 
 std::ostream& WriteLogEntry::format(std::ostream &os) const {
   os << "(Write) ";
-  GeneralWriteLogEntry::format(os);
+  GenericWriteLogEntry::format(os);
   os << ", "
      << "pmem_buffer=" << (void*)pmem_buffer << ", ";
   os << "pmem_bp=" << pmem_bp << ", ";
index b6d73b3f535ca0fe6ec4ec255b123c7915f5fcfb..b26fddb366f412c794c9022e03bd06d7766b093e 100644 (file)
@@ -4,6 +4,7 @@
 #ifndef CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H
 #define CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H
 
+#include "common/ceph_mutex.h"
 #include "librbd/Utils.h"
 #include "librbd/cache/rwl/Types.h"
 #include <atomic>
@@ -14,7 +15,7 @@ namespace cache {
 namespace rwl {
 
 class SyncPointLogEntry;
-class GeneralWriteLogEntry;
+class GenericWriteLogEntry;
 class WriteLogEntry;
 
 class GenericLogEntry {
@@ -29,22 +30,6 @@ public:
   virtual ~GenericLogEntry() { };
   GenericLogEntry(const GenericLogEntry&) = delete;
   GenericLogEntry &operator=(const GenericLogEntry&) = delete;
-  virtual unsigned int write_bytes() = 0;
-  bool is_sync_point();
-  bool is_discard();
-  bool is_writesame();
-  bool is_write();
-  bool is_writer();
-  virtual const GenericLogEntry* get_log_entry() = 0;
-  virtual const SyncPointLogEntry* get_sync_point_log_entry() {
-    return nullptr;
-  }
-  virtual const GeneralWriteLogEntry* get_gen_write_log_entry() {
-    return nullptr;
-  }
-  virtual const WriteLogEntry* get_write_log_entry() {
-    return nullptr;
-  }
   virtual std::ostream& format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const GenericLogEntry &entry);
@@ -67,36 +52,29 @@ public:
     ram_entry.sync_gen_number = sync_gen_number;
     ram_entry.sync_point = 1;
   };
+  ~SyncPointLogEntry() override {};
   SyncPointLogEntry(const SyncPointLogEntry&) = delete;
   SyncPointLogEntry &operator=(const SyncPointLogEntry&) = delete;
-  virtual inline unsigned int write_bytes() {
-    return 0;
-  }
-  const GenericLogEntry* get_log_entry() override {
-    return get_sync_point_log_entry();
-  }
-  const SyncPointLogEntry* get_sync_point_log_entry() override {
-    return this;
-  }
   std::ostream& format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const SyncPointLogEntry &entry);
 };
 
-class GeneralWriteLogEntry : public GenericLogEntry {
+class GenericWriteLogEntry : public GenericLogEntry {
 public:
   uint32_t referring_map_entries = 0;
   bool flushing = false;
   bool flushed = false; /* or invalidated */
   std::shared_ptr<SyncPointLogEntry> sync_point_entry;
-  GeneralWriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
+  GenericWriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
                        const uint64_t image_offset_bytes, const uint64_t write_bytes)
     : GenericLogEntry(image_offset_bytes, write_bytes), sync_point_entry(sync_point_entry) { }
-  GeneralWriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes)
+  GenericWriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes)
     : GenericLogEntry(image_offset_bytes, write_bytes), sync_point_entry(nullptr) { }
-  GeneralWriteLogEntry(const GeneralWriteLogEntry&) = delete;
-  GeneralWriteLogEntry &operator=(const GeneralWriteLogEntry&) = delete;
-  virtual inline unsigned int write_bytes() {
+  ~GenericWriteLogEntry() override {};
+  GenericWriteLogEntry(const GenericWriteLogEntry&) = delete;
+  GenericWriteLogEntry &operator=(const GenericWriteLogEntry&) = delete;
+  inline unsigned int write_bytes() {
     /* The valid bytes in this ops data buffer. Discard and WS override. */
     return ram_entry.write_bytes;
   };
@@ -104,15 +82,9 @@ public:
     /* The bytes in the image this op makes dirty. Discard and WS override. */
     return write_bytes();
   };
-  const BlockExtent block_extent() {
+  BlockExtent block_extent() {
     return ram_entry.block_extent();
   }
-  const GenericLogEntry* get_log_entry() override {
-    return get_gen_write_log_entry();
-  }
-  const GeneralWriteLogEntry* get_gen_write_log_entry() override {
-    return this;
-  }
   uint32_t get_map_ref() {
     return(referring_map_entries);
   }
@@ -120,14 +92,16 @@ public:
   void dec_map_ref() { referring_map_entries--; }
   std::ostream &format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
-                                  const GeneralWriteLogEntry &entry);
+                                  const GenericWriteLogEntry &entry);
 };
 
-class WriteLogEntry : public GeneralWriteLogEntry {
+class WriteLogEntry : public GenericWriteLogEntry {
 protected:
   buffer::ptr pmem_bp;
   buffer::list pmem_bl;
   std::atomic<int> bl_refs = {0}; /* The refs held on pmem_bp by pmem_bl */
+  /* Used in WriteLogEntry::get_pmem_bl() to syncronize between threads making entries readable */
+  mutable ceph::mutex m_entry_bl_lock;
 
   void init_pmem_bp();
 
@@ -142,30 +116,33 @@ public:
   uint8_t *pmem_buffer = nullptr;
   WriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
                 const uint64_t image_offset_bytes, const uint64_t write_bytes)
-    : GeneralWriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes) { }
+    : GenericWriteLogEntry(sync_point_entry, image_offset_bytes, write_bytes),
+      m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name(
+        "librbd::cache::rwl::WriteLogEntry::m_entry_bl_lock", this)))
+  { }
   WriteLogEntry(const uint64_t image_offset_bytes, const uint64_t write_bytes)
-    : GeneralWriteLogEntry(nullptr, image_offset_bytes, write_bytes) { }
+    : GenericWriteLogEntry(nullptr, image_offset_bytes, write_bytes),
+      m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name(
+        "librbd::cache::rwl::WriteLogEntry::m_entry_bl_lock", this)))
+  { }
+  ~WriteLogEntry() override {};
   WriteLogEntry(const WriteLogEntry&) = delete;
   WriteLogEntry &operator=(const WriteLogEntry&) = delete;
-  const BlockExtent block_extent();
+  void init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation,
+            uint64_t current_sync_gen, uint64_t last_op_sequence_num, bool persist_on_flush);
+  BlockExtent block_extent();
   unsigned int reader_count();
   /* Returns a ref to a bl containing bufferptrs to the entry pmem buffer */
-  buffer::list &get_pmem_bl(ceph::mutex &entry_bl_lock);
+  buffer::list &get_pmem_bl();
   /* Constructs a new bl containing copies of pmem_bp */
-  void copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl);
-  virtual const GenericLogEntry* get_log_entry() override {
-    return get_write_log_entry();
-  }
-  const WriteLogEntry* get_write_log_entry() override {
-    return this;
-  }
+  void copy_pmem_bl(bufferlist *out_bl);
   std::ostream &format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const WriteLogEntry &entry);
 };
 
-} // namespace rwl 
-} // namespace cache 
-} // namespace librbd 
+} // namespace rwl
+} // namespace cache
+} // namespace librbd
 
 #endif // CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H
index a7cb581b536f43b5aa3ad77badebe87ffa0e86db..c0c50a50b93f79b5148a63e2547ca4598e803031 100644 (file)
@@ -5,7 +5,7 @@
 #include "LogOperation.h"
 #include "librbd/cache/rwl/Types.h"
 
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::cache::rwl::LogOperation: " << this << " " \
                            <<  __func__ << ": "
@@ -16,13 +16,11 @@ namespace cache {
 
 namespace rwl {
 
-template <typename T>
-GenericLogOperation<T>::GenericLogOperation(T &rwl, const utime_t dispatch_time)
-  : rwl(rwl), dispatch_time(dispatch_time) {
+GenericLogOperation::GenericLogOperation(const utime_t dispatch_time, PerfCounters *perfcounter)
+  : m_perfcounter(perfcounter), dispatch_time(dispatch_time) {
 }
 
-template <typename T>
-std::ostream& GenericLogOperation<T>::format(std::ostream &os) const {
+std::ostream& GenericLogOperation::format(std::ostream &os) const {
   os << "dispatch_time=[" << dispatch_time << "], "
      << "buf_persist_time=[" << buf_persist_time << "], "
      << "buf_persist_comp_time=[" << buf_persist_comp_time << "], "
@@ -31,161 +29,144 @@ std::ostream& GenericLogOperation<T>::format(std::ostream &os) const {
   return os;
 };
 
-template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                         const GenericLogOperation<T> &op) {
+                         const GenericLogOperation &op) {
   return op.format(os);
 }
 
-template <typename T>
-SyncPointLogOperation<T>::SyncPointLogOperation(T &rwl,
-                                                std::shared_ptr<SyncPoint<T>> sync_point,
-                                                const utime_t dispatch_time)
-  : GenericLogOperation<T>(rwl, dispatch_time), sync_point(sync_point) {
+SyncPointLogOperation::SyncPointLogOperation(ceph::mutex &lock,
+                                             std::shared_ptr<SyncPoint> sync_point,
+                                             const utime_t dispatch_time,
+                                             PerfCounters *perfcounter,
+                                             CephContext *cct)
+  : GenericLogOperation(dispatch_time, perfcounter), m_cct(cct), m_lock(lock), sync_point(sync_point) {
 }
 
-template <typename T>
-SyncPointLogOperation<T>::~SyncPointLogOperation() { }
+SyncPointLogOperation::~SyncPointLogOperation() { }
 
-template <typename T>
-std::ostream &SyncPointLogOperation<T>::format(std::ostream &os) const {
+std::ostream &SyncPointLogOperation::format(std::ostream &os) const {
   os << "(Sync Point) ";
-  GenericLogOperation<T>::format(os);
+  GenericLogOperation::format(os);
   os << ", "
      << "sync_point=[" << *sync_point << "]";
   return os;
 };
 
-template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                         const SyncPointLogOperation<T> &op) {
+                         const SyncPointLogOperation &op) {
   return op.format(os);
 }
 
-template <typename T>
-void SyncPointLogOperation<T>::appending() {
+std::vector<Context*> SyncPointLogOperation::append_sync_point() {
   std::vector<Context*> appending_contexts;
+  std::lock_guard locker(m_lock);
+  if (!sync_point->appending) {
+    sync_point->appending = true;
+  }
+  appending_contexts.swap(sync_point->on_sync_point_appending);
+  return appending_contexts;
+}
+
+void SyncPointLogOperation::clear_earlier_sync_point() {
+  std::lock_guard locker(m_lock);
+  ceph_assert(sync_point->later_sync_point);
+  ceph_assert(sync_point->later_sync_point->earlier_sync_point ==
+              sync_point);
+  sync_point->later_sync_point->earlier_sync_point = nullptr;
+}
 
+std::vector<Context*> SyncPointLogOperation::swap_on_sync_point_persisted() {
+  std::lock_guard locker(m_lock);
+  std::vector<Context*> persisted_contexts;
+  persisted_contexts.swap(sync_point->on_sync_point_persisted);
+  return persisted_contexts;
+}
+
+void SyncPointLogOperation::appending() {
   ceph_assert(sync_point);
-  {
-    std::lock_guard locker(rwl.m_lock);
-    if (!sync_point->m_appending) {
-      ldout(rwl.m_image_ctx.cct, 20) << "Sync point op=[" << *this
-                                     << "] appending" << dendl;
-      sync_point->m_appending = true;
-    }
-    appending_contexts.swap(sync_point->m_on_sync_point_appending);
-  }
+  ldout(m_cct, 20) << "Sync point op=[" << *this
+                   << "] appending" << dendl;
+  auto appending_contexts = append_sync_point();
   for (auto &ctx : appending_contexts) {
     ctx->complete(0);
   }
 }
 
-template <typename T>
-void SyncPointLogOperation<T>::complete(int result) {
-  std::vector<Context*> persisted_contexts;
-
+void SyncPointLogOperation::complete(int result) {
   ceph_assert(sync_point);
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 20) << "Sync point op =[" << *this
-                                   << "] completed" << dendl;
-  }
-  {
-    std::lock_guard locker(rwl.m_lock);
-    /* Remove link from next sync point */
-    ceph_assert(sync_point->later_sync_point);
-    ceph_assert(sync_point->later_sync_point->earlier_sync_point ==
-           sync_point);
-    sync_point->later_sync_point->earlier_sync_point = nullptr;
-  }
+  ldout(m_cct, 20) << "Sync point op =[" << *this
+                   << "] completed" << dendl;
+  clear_earlier_sync_point();
 
   /* Do append now in case completion occurred before the
    * normal append callback executed, and to handle
    * on_append work that was queued after the sync point
    * entered the appending state. */
   appending();
-
-  {
-    std::lock_guard locker(rwl.m_lock);
-    /* The flush request that scheduled this op will be one of these
-     * contexts */
-    persisted_contexts.swap(sync_point->m_on_sync_point_persisted);
-    // TODO handle flushed sync point in later PRs
-  }
+  auto persisted_contexts = swap_on_sync_point_persisted();
   for (auto &ctx : persisted_contexts) {
     ctx->complete(result);
   }
 }
 
-template <typename T>
-GeneralWriteLogOperation<T>::GeneralWriteLogOperation(T &rwl,
-                                                      std::shared_ptr<SyncPoint<T>> sync_point,
-                                                      const utime_t dispatch_time)
-  : GenericLogOperation<T>(rwl, dispatch_time),
+GenericWriteLogOperation::GenericWriteLogOperation(std::shared_ptr<SyncPoint> sync_point,
+                                                   const utime_t dispatch_time,
+                                                   PerfCounters *perfcounter,
+                                                   CephContext *cct)
+  : GenericLogOperation(dispatch_time, perfcounter),
   m_lock(ceph::make_mutex(util::unique_lock_name(
-    "librbd::cache::rwl::GenericWriteLogOperation::m_lock", this))), sync_point(sync_point) {
+    "librbd::cache::rwl::GenericWriteLogOperation::m_lock", this))),
+  m_cct(cct),
+  sync_point(sync_point) {
 }
 
-template <typename T>
-GeneralWriteLogOperation<T>::~GeneralWriteLogOperation() { }
+GenericWriteLogOperation::~GenericWriteLogOperation() { }
 
-template <typename T>
-std::ostream &GeneralWriteLogOperation<T>::format(std::ostream &os) const {
-  GenericLogOperation<T>::format(os);
+std::ostream &GenericWriteLogOperation::format(std::ostream &os) const {
+  GenericLogOperation::format(os);
   return os;
 };
 
-template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                         const GeneralWriteLogOperation<T> &op) {
+                         const GenericWriteLogOperation &op) {
   return op.format(os);
 }
 
 /* Called when the write log operation is appending and its log position is guaranteed */
-template <typename T>
-void GeneralWriteLogOperation<T>::appending() {
+void GenericWriteLogOperation::appending() {
   Context *on_append = nullptr;
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << dendl;
-  }
+  ldout(m_cct, 20) << __func__ << " " << this << dendl;
   {
     std::lock_guard locker(m_lock);
     on_append = on_write_append;
     on_write_append = nullptr;
   }
   if (on_append) {
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << " on_append=" << on_append << dendl;
-    }
+    ldout(m_cct, 20) << __func__ << " " << this << " on_append=" << on_append << dendl;
     on_append->complete(0);
   }
 }
 
 /* Called when the write log operation is completed in all log replicas */
-template <typename T>
-void GeneralWriteLogOperation<T>::complete(int result) {
+void GenericWriteLogOperation::complete(int result) {
   appending();
   Context *on_persist = nullptr;
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << dendl;
-  }
+  ldout(m_cct, 20) << __func__ << " " << this << dendl;
   {
     std::lock_guard locker(m_lock);
     on_persist = on_write_persist;
     on_write_persist = nullptr;
   }
   if (on_persist) {
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(rwl.m_image_ctx.cct, 20) << __func__ << " " << this << " on_persist=" << on_persist << dendl;
-    }
+    ldout(m_cct, 20) << __func__ << " " << this << " on_persist=" << on_persist << dendl;
     on_persist->complete(result);
   }
 }
 
-template <typename T>
-WriteLogOperation<T>::WriteLogOperation(WriteLogOperationSet<T> &set,
-                                        uint64_t image_offset_bytes, uint64_t write_bytes)
-  : GeneralWriteLogOperation<T>(set.rwl, set.sync_point, set.dispatch_time),
+WriteLogOperation::WriteLogOperation(WriteLogOperationSet &set,
+                                     uint64_t image_offset_bytes, uint64_t write_bytes,
+                                     CephContext *cct)
+  : GenericWriteLogOperation(set.sync_point, set.dispatch_time, set.perfcounter, cct),
     log_entry(std::make_shared<WriteLogEntry>(set.sync_point->log_entry, image_offset_bytes, write_bytes)) {
   on_write_append = set.extent_ops_appending->new_sub();
   on_write_persist = set.extent_ops_persist->new_sub();
@@ -193,13 +174,20 @@ WriteLogOperation<T>::WriteLogOperation(WriteLogOperationSet<T> &set,
   log_entry->sync_point_entry->bytes += write_bytes;
 }
 
-template <typename T>
-WriteLogOperation<T>::~WriteLogOperation() { }
+WriteLogOperation::~WriteLogOperation() { }
 
-template <typename T>
-std::ostream &WriteLogOperation<T>::format(std::ostream &os) const {
+void WriteLogOperation::init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation, uint64_t current_sync_gen,
+                             uint64_t last_op_sequence_num, bufferlist &write_req_bl, uint64_t buffer_offset,
+                             bool persist_on_flush) {
+  log_entry->init(has_data, allocation, current_sync_gen, last_op_sequence_num, persist_on_flush);
+  buffer_alloc = &(*allocation);
+  bl.substr_of(write_req_bl, buffer_offset,
+               log_entry->write_bytes());
+}
+
+std::ostream &WriteLogOperation::format(std::ostream &os) const {
   os << "(Write) ";
-  GeneralWriteLogOperation<T>::format(os);
+  GenericWriteLogOperation::format(os);
   os << ", ";
   if (log_entry) {
     os << "log_entry=[" << *log_entry << "], ";
@@ -211,26 +199,48 @@ std::ostream &WriteLogOperation<T>::format(std::ostream &os) const {
   return os;
 };
 
-template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                         const WriteLogOperation<T> &op) {
+                         const WriteLogOperation &op) {
   return op.format(os);
 }
 
 
-template <typename T>
-WriteLogOperationSet<T>::WriteLogOperationSet(T &rwl, utime_t dispatched, std::shared_ptr<SyncPoint<T>> sync_point,
-                                              bool persist_on_flush, Context *on_finish)
-  : m_on_finish(on_finish), rwl(rwl),
-    persist_on_flush(persist_on_flush), dispatch_time(dispatched), sync_point(sync_point) {
+void WriteLogOperation::complete(int result) {
+  GenericWriteLogOperation::complete(result);
+  m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_buf_t, buf_persist_time - dispatch_time);
+  utime_t buf_lat = buf_persist_comp_time - buf_persist_time;
+  m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_bufc_t, buf_lat);
+  m_perfcounter->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist, buf_lat.to_nsec(),
+                    log_entry->ram_entry.write_bytes);
+  m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_app_t, log_append_time - buf_persist_time);
+}
+
+void WriteLogOperation::copy_bl_to_pmem_buffer() {
+  /* operation is a shared_ptr, so write_op is only good as long as operation is in scope */
+  bufferlist::iterator i(&bl);
+  m_perfcounter->inc(l_librbd_rwl_log_op_bytes, log_entry->write_bytes());
+  ldout(m_cct, 20) << bl << dendl;
+  i.copy((unsigned)log_entry->write_bytes(), (char*)log_entry->pmem_buffer);
+}
+
+void WriteLogOperation::flush_pmem_buf_to_cache(PMEMobjpool *log_pool) {
+  buf_persist_time = ceph_clock_now();
+  pmemobj_flush(log_pool, log_entry->pmem_buffer, log_entry->write_bytes());
+}
+
+WriteLogOperationSet::WriteLogOperationSet(utime_t dispatched, PerfCounters *perfcounter, std::shared_ptr<SyncPoint> sync_point,
+                                           bool persist_on_flush, CephContext *cct, Context *on_finish)
+  : m_cct(cct), m_on_finish(on_finish),
+    persist_on_flush(persist_on_flush),
+    dispatch_time(dispatched),
+    perfcounter(perfcounter),
+    sync_point(sync_point) {
   on_ops_appending = sync_point->prior_log_entries_persisted->new_sub();
   on_ops_persist = nullptr;
   extent_ops_persist =
-    new C_Gather(rwl.m_image_ctx.cct,
+    new C_Gather(m_cct,
                  new LambdaContext( [this](int r) {
-                     if (RWL_VERBOSE_LOGGING) {
-                       ldout(this->rwl.m_image_ctx.cct,20) << __func__ << " " << this << " m_extent_ops_persist completed" << dendl;
-                     }
+                     ldout(this->m_cct,20) << __func__ << " " << this << " m_extent_ops_persist completed" << dendl;
                      if (on_ops_persist) {
                        on_ops_persist->complete(r);
                      }
@@ -238,22 +248,18 @@ WriteLogOperationSet<T>::WriteLogOperationSet(T &rwl, utime_t dispatched, std::s
                    }));
   auto appending_persist_sub = extent_ops_persist->new_sub();
   extent_ops_appending =
-    new C_Gather(rwl.m_image_ctx.cct,
+    new C_Gather(m_cct,
                  new LambdaContext( [this, appending_persist_sub](int r) {
-                     if (RWL_VERBOSE_LOGGING) {
-                       ldout(this->rwl.m_image_ctx.cct, 20) << __func__ << " " << this << " m_extent_ops_appending completed" << dendl;
-                     }
+                     ldout(this->m_cct, 20) << __func__ << " " << this << " m_extent_ops_appending completed" << dendl;
                      on_ops_appending->complete(r);
                      appending_persist_sub->complete(r);
                    }));
 }
 
-template <typename T>
-WriteLogOperationSet<T>::~WriteLogOperationSet() { }
+WriteLogOperationSet::~WriteLogOperationSet() { }
 
-template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                                const WriteLogOperationSet<T> &s) {
+                         const WriteLogOperationSet &s) {
   os << "cell=" << (void*)s.cell << ", "
      << "extent_ops_appending=[" << s.extent_ops_appending << ", "
      << "extent_ops_persist=[" << s.extent_ops_persist << "]";
index dc0dd547356fb1599c1bfbdb1d862b18b576e671..1197e7be798a37ec589d3cd418221f79d751aacf 100644 (file)
@@ -13,200 +13,161 @@ namespace cache {
 namespace rwl {
 struct WriteBufferAllocation;
 
-template <typename T>
 class WriteLogOperationSet;
 
-template <typename T>
 class WriteLogOperation;
 
-template <typename T>
-class GeneralWriteLogOperation;
+class GenericWriteLogOperation;
 
-template <typename T>
 class SyncPointLogOperation;
 
-template <typename T>
 class GenericLogOperation;
 
-template <typename T>
-using GenericLogOperationSharedPtr = std::shared_ptr<GenericLogOperation<T>>;
+using GenericLogOperationSharedPtr = std::shared_ptr<GenericLogOperation>;
 
-template <typename T>
-using GenericLogOperationsVector = std::vector<GenericLogOperationSharedPtr<T>>;
+using GenericLogOperationsVector = std::vector<GenericLogOperationSharedPtr>;
 
-template <typename T>
 class GenericLogOperation {
+protected:
+  PerfCounters *m_perfcounter = nullptr;
 public:
-  T &rwl;
   utime_t dispatch_time;         // When op created
   utime_t buf_persist_time;      // When buffer persist begins
   utime_t buf_persist_comp_time; // When buffer persist completes
   utime_t log_append_time;       // When log append begins
   utime_t log_append_comp_time;  // When log append completes
-  GenericLogOperation(T &rwl, const utime_t dispatch_time);
+  GenericLogOperation(const utime_t dispatch_time, PerfCounters *perfcounter);
   virtual ~GenericLogOperation() { };
   GenericLogOperation(const GenericLogOperation&) = delete;
   GenericLogOperation &operator=(const GenericLogOperation&) = delete;
   virtual std::ostream &format(std::ostream &os) const;
-  template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
-                                  const GenericLogOperation<U> &op);
+                                  const GenericLogOperation &op);
   virtual const std::shared_ptr<GenericLogEntry> get_log_entry() = 0;
-  virtual const std::shared_ptr<SyncPointLogEntry> get_sync_point_log_entry() {
-    return nullptr;
-  }
-  virtual const std::shared_ptr<GeneralWriteLogEntry> get_gen_write_log_entry() {
-    return nullptr;
-  }
-  virtual const std::shared_ptr<WriteLogEntry> get_write_log_entry() {
-    return nullptr;
-  }
   virtual void appending() = 0;
   virtual void complete(int r) = 0;
-  virtual bool is_write() {
-    return false;
-  }
-  virtual bool is_sync_point() {
+  virtual void mark_log_entry_completed() {};
+  virtual bool reserved_allocated() {
     return false;
   }
-  virtual bool is_discard() {
-    return false;
-  }
-  virtual bool is_writesame() {
-    return false;
-  }
-  virtual bool is_writing_op() {
-    return false;
-  }
-  virtual GeneralWriteLogOperation<T> *get_gen_write_op() {
-    return nullptr;
-  };
-  virtual WriteLogOperation<T> *get_write_op() {
-    return nullptr;
-  };
+  virtual void copy_bl_to_pmem_buffer() {};
+  virtual void flush_pmem_buf_to_cache(PMEMobjpool *log_pool) {};
 };
 
-template <typename T>
-class SyncPointLogOperation : public GenericLogOperation<T> {
+class SyncPointLogOperation : public GenericLogOperation {
+private:
+  CephContext *m_cct;
+  ceph::mutex &m_lock;
+  std::vector<Context*> append_sync_point();
+  void clear_earlier_sync_point();
+  std::vector<Context*> swap_on_sync_point_persisted();
 public:
-  using GenericLogOperation<T>::rwl;
-  std::shared_ptr<SyncPoint<T>> sync_point;
-  SyncPointLogOperation(T &rwl,
-                        std::shared_ptr<SyncPoint<T>> sync_point,
-                        const utime_t dispatch_time);
-  ~SyncPointLogOperation();
+  std::shared_ptr<SyncPoint> sync_point;
+  SyncPointLogOperation(ceph::mutex &lock,
+                        std::shared_ptr<SyncPoint> sync_point,
+                        const utime_t dispatch_time,
+                        PerfCounters *perfcounter,
+                        CephContext *cct);
+  ~SyncPointLogOperation() override;
   SyncPointLogOperation(const SyncPointLogOperation&) = delete;
   SyncPointLogOperation &operator=(const SyncPointLogOperation&) = delete;
   std::ostream &format(std::ostream &os) const;
-  template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
-                                  const SyncPointLogOperation<U> &op);
-  const std::shared_ptr<GenericLogEntry> get_log_entry() {
-    return get_sync_point_log_entry();
-  }
-  const std::shared_ptr<SyncPointLogEntry> get_sync_point_log_entry() {
+                                  const SyncPointLogOperation &op);
+  const std::shared_ptr<GenericLogEntry> get_log_entry() override {
     return sync_point->log_entry;
   }
-  bool is_sync_point() {
-    return true;
-  }
-  void appending();
-  void complete(int r);
+  void appending() override;
+  void complete(int r) override;
 };
 
-template <typename T>
-class GeneralWriteLogOperation : public GenericLogOperation<T> {
+class GenericWriteLogOperation : public GenericLogOperation {
 protected:
   ceph::mutex m_lock;
+  CephContext *m_cct;
 public:
-  using GenericLogOperation<T>::rwl;
-  std::shared_ptr<SyncPoint<T>> sync_point;
+  std::shared_ptr<SyncPoint> sync_point;
   Context *on_write_append = nullptr;  /* Completion for things waiting on this
                                         * write's position in the log to be
                                         * guaranteed */
   Context *on_write_persist = nullptr; /* Completion for things waiting on this
                                         * write to persist */
-  GeneralWriteLogOperation(T &rwl,
-                           std::shared_ptr<SyncPoint<T>> sync_point,
-                           const utime_t dispatch_time);
-  ~GeneralWriteLogOperation();
-  GeneralWriteLogOperation(const GeneralWriteLogOperation&) = delete;
-  GeneralWriteLogOperation &operator=(const GeneralWriteLogOperation&) = delete;
+  GenericWriteLogOperation(std::shared_ptr<SyncPoint> sync_point,
+                           const utime_t dispatch_time,
+                           PerfCounters *perfcounter,
+                           CephContext *cct);
+  ~GenericWriteLogOperation() override;
+  GenericWriteLogOperation(const GenericWriteLogOperation&) = delete;
+  GenericWriteLogOperation &operator=(const GenericWriteLogOperation&) = delete;
   std::ostream &format(std::ostream &os) const;
-  template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
-                                  const GeneralWriteLogOperation<U> &op);
-  GeneralWriteLogOperation<T> *get_gen_write_op() {
-    return this;
+                                  const GenericWriteLogOperation &op);
+  void mark_log_entry_completed() override{
+    sync_point->log_entry->writes_completed++;
   }
-  bool is_writing_op() {
+  bool reserved_allocated() override {
     return true;
   }
-  void appending();
-  void complete(int r);
+  void appending() override;
+  void complete(int r) override;
 };
 
-template <typename T>
-class WriteLogOperation : public GeneralWriteLogOperation<T> {
+class WriteLogOperation : public GenericWriteLogOperation {
 public:
-  using GenericLogOperation<T>::rwl;
-  using GeneralWriteLogOperation<T>::m_lock;
-  using GeneralWriteLogOperation<T>::sync_point;
-  using GeneralWriteLogOperation<T>::on_write_append;
-  using GeneralWriteLogOperation<T>::on_write_persist;
+  using GenericWriteLogOperation::m_lock;
+  using GenericWriteLogOperation::sync_point;
+  using GenericWriteLogOperation::on_write_append;
+  using GenericWriteLogOperation::on_write_persist;
   std::shared_ptr<WriteLogEntry> log_entry;
   bufferlist bl;
   WriteBufferAllocation *buffer_alloc = nullptr;
-  WriteLogOperation(WriteLogOperationSet<T> &set, const uint64_t image_offset_bytes, const uint64_t write_bytes);
-  ~WriteLogOperation();
+  WriteLogOperation(WriteLogOperationSet &set, const uint64_t image_offset_bytes,
+                    const uint64_t write_bytes, CephContext *cct);
+  ~WriteLogOperation() override;
   WriteLogOperation(const WriteLogOperation&) = delete;
   WriteLogOperation &operator=(const WriteLogOperation&) = delete;
+  void init(bool has_data, std::vector<WriteBufferAllocation>::iterator allocation, uint64_t current_sync_gen,
+            uint64_t last_op_sequence_num, bufferlist &write_req_bl, uint64_t buffer_offset,
+            bool persist_on_flush);
   std::ostream &format(std::ostream &os) const;
-  template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
-                                  const WriteLogOperation<T> &op);
-  const std::shared_ptr<GenericLogEntry> get_log_entry() {
-    return get_write_log_entry();
-  }
-  const std::shared_ptr<WriteLogEntry> get_write_log_entry() {
+                                  const WriteLogOperation &op);
+  const std::shared_ptr<GenericLogEntry> get_log_entry() override {
     return log_entry;
   }
-  WriteLogOperation<T> *get_write_op() override {
-    return this;
-  }
-  bool is_write() {
-    return true;
-  }
+
+  void complete(int r) override;
+  void copy_bl_to_pmem_buffer() override;
+  void flush_pmem_buf_to_cache(PMEMobjpool *log_pool) override;
 };
 
 
-template <typename T>
 class WriteLogOperationSet {
 private:
+  CephContext *m_cct;
   Context *m_on_finish;
 public:
-  T &rwl;
   bool persist_on_flush;
   BlockGuardCell *cell;
   C_Gather *extent_ops_appending;
   Context *on_ops_appending;
   C_Gather *extent_ops_persist;
   Context *on_ops_persist;
-  GenericLogOperationsVector<T> operations;
+  GenericLogOperationsVector operations;
   utime_t dispatch_time; /* When set created */
-  std::shared_ptr<SyncPoint<T>> sync_point;
-  WriteLogOperationSet(T &rwl, const utime_t dispatched, std::shared_ptr<SyncPoint<T>> sync_point,
-                       const bool persist_on_flush, Context *on_finish);
+  PerfCounters *perfcounter = nullptr;
+  std::shared_ptr<SyncPoint> sync_point;
+  WriteLogOperationSet(const utime_t dispatched, PerfCounters *perfcounter, std::shared_ptr<SyncPoint> sync_point,
+                       const bool persist_on_flush, CephContext *cct, Context *on_finish);
   ~WriteLogOperationSet();
   WriteLogOperationSet(const WriteLogOperationSet&) = delete;
   WriteLogOperationSet &operator=(const WriteLogOperationSet&) = delete;
-  template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
-                                  const WriteLogOperationSet<U> &s);
+                                  const WriteLogOperationSet &s);
 };
 
-} // namespace rwl 
-} // namespace cache 
-} // namespace librbd 
+} // namespace rwl
+} // namespace cache
+} // namespace librbd
 
 #endif // CEPH_LIBRBD_CACHE_RWL_LOG_OPERATION_H
index c16f3363113a5cf57c1d8386faf97cde770e2838..6dd46fe8e07dada6212489f0e385d01e2f31c0e9 100644 (file)
@@ -5,7 +5,7 @@
 #include "librbd/BlockGuard.h"
 #include "librbd/cache/rwl/LogEntry.h"
 
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::cache::rwl::Request: " << this << " " \
                            <<  __func__ << ": "
@@ -14,129 +14,90 @@ namespace librbd {
 namespace cache {
 namespace rwl {
 
-typedef std::list<std::shared_ptr<GeneralWriteLogEntry>> GeneralWriteLogEntries;
+typedef std::list<std::shared_ptr<GenericWriteLogEntry>> GenericWriteLogEntries;
 
 template <typename T>
-C_GuardedBlockIORequest<T>::C_GuardedBlockIORequest(T &rwl)
-  : rwl(rwl) {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
-  }
+C_BlockIORequest<T>::C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents,
+                                      bufferlist&& bl, const int fadvise_flags, Context *user_req)
+  : rwl(rwl), image_extents(std::move(extents)),
+    bl(std::move(bl)), fadvise_flags(fadvise_flags),
+    user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) {
+  ldout(rwl.get_context(), 99) << this << dendl;
 }
 
 template <typename T>
-C_GuardedBlockIORequest<T>::~C_GuardedBlockIORequest() {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
-  }
+C_BlockIORequest<T>::~C_BlockIORequest() {
+  ldout(rwl.get_context(), 99) << this << dendl;
   ceph_assert(m_cell_released || !m_cell);
 }
 
 template <typename T>
-void C_GuardedBlockIORequest<T>::set_cell(BlockGuardCell *cell) {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << cell << dendl;
-  }
+std::ostream &operator<<(std::ostream &os,
+                         const C_BlockIORequest<T> &req) {
+   os << "image_extents=[" << req.image_extents << "], "
+      << "image_extents_summary=[" << req.image_extents_summary << "], "
+      << "bl=" << req.bl << ", "
+      << "user_req=" << req.user_req << ", "
+      << "m_user_req_completed=" << req.m_user_req_completed << ", "
+      << "m_deferred=" << req.m_deferred << ", "
+      << "detained=" << req.detained << ", "
+      << "waited_lanes=" << req.waited_lanes << ", "
+      << "waited_entries=" << req.waited_entries << ", "
+      << "waited_buffers=" << req.waited_buffers << "";
+   return os;
+}
+
+template <typename T>
+void C_BlockIORequest<T>::set_cell(BlockGuardCell *cell) {
+  ldout(rwl.get_context(), 20) << this << " cell=" << cell << dendl;
   ceph_assert(cell);
   ceph_assert(!m_cell);
   m_cell = cell;
 }
 
 template <typename T>
-BlockGuardCell *C_GuardedBlockIORequest<T>::get_cell(void) {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << m_cell << dendl;
-  }
+BlockGuardCell *C_BlockIORequest<T>::get_cell(void) {
+  ldout(rwl.get_context(), 20) << this << " cell=" << m_cell << dendl;
   return m_cell;
 }
 
 template <typename T>
-void C_GuardedBlockIORequest<T>::release_cell() {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << m_cell << dendl;
-  }
+void C_BlockIORequest<T>::release_cell() {
+  ldout(rwl.get_context(), 20) << this << " cell=" << m_cell << dendl;
   ceph_assert(m_cell);
   bool initial = false;
   if (m_cell_released.compare_exchange_strong(initial, true)) {
     rwl.release_guarded_request(m_cell);
   } else {
-    ldout(rwl.m_image_ctx.cct, 5) << "cell " << m_cell << " already released for " << this << dendl;
-  }
-}
-
-template <typename T>
-C_BlockIORequest<T>::C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents,
-                 bufferlist&& bl, const int fadvise_flags, Context *user_req)
-  : C_GuardedBlockIORequest<T>(rwl), image_extents(std::move(extents)),
-    bl(std::move(bl)), fadvise_flags(fadvise_flags),
-    user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
-  }
-  /* Remove zero length image extents from input */
-  for (auto it = image_extents.begin(); it != image_extents.end(); ) {
-    if (0 == it->second) {
-      it = image_extents.erase(it);
-      continue;
-    }
-    ++it;
-  }
-}
-
-template <typename T>
-C_BlockIORequest<T>::~C_BlockIORequest() {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
+    ldout(rwl.get_context(), 5) << "cell " << m_cell << " already released for " << this << dendl;
   }
 }
 
-template <typename T>
-std::ostream &operator<<(std::ostream &os,
-                                 const C_BlockIORequest<T> &req) {
-   os << "image_extents=[" << req.image_extents << "], "
-      << "image_extents_summary=[" << req.image_extents_summary << "], "
-      << "bl=" << req.bl << ", "
-      << "user_req=" << req.user_req << ", "
-      << "m_user_req_completed=" << req.m_user_req_completed << ", "
-      << "m_deferred=" << req.m_deferred << ", "
-      << "detained=" << req.detained << ", "
-      << "m_waited_lanes=" << req.m_waited_lanes << ", "
-      << "m_waited_entries=" << req.m_waited_entries << ", "
-      << "m_waited_buffers=" << req.m_waited_buffers << "";
-   return os;
- }
-
 template <typename T>
 void C_BlockIORequest<T>::complete_user_request(int r) {
   bool initial = false;
   if (m_user_req_completed.compare_exchange_strong(initial, true)) {
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(rwl.m_image_ctx.cct, 15) << this << " completing user req" << dendl;
-    }
+    ldout(rwl.get_context(), 15) << this << " completing user req" << dendl;
     m_user_req_completed_time = ceph_clock_now();
     user_req->complete(r);
     // Set user_req as null as it is deleted
     user_req = nullptr;
   } else {
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(rwl.m_image_ctx.cct, 20) << this << " user req already completed" << dendl;
-    }
+    ldout(rwl.get_context(), 20) << this << " user req already completed" << dendl;
   }
 }
 
 template <typename T>
 void C_BlockIORequest<T>::finish(int r) {
-  ldout(rwl.m_image_ctx.cct, 20) << this << dendl;
+  ldout(rwl.get_context(), 20) << this << dendl;
 
   complete_user_request(r);
   bool initial = false;
   if (m_finish_called.compare_exchange_strong(initial, true)) {
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(rwl.m_image_ctx.cct, 15) << this << " finishing" << dendl;
-    }
+    ldout(rwl.get_context(), 15) << this << " finishing" << dendl;
     finish_req(0);
   } else {
-    ldout(rwl.m_image_ctx.cct, 20) << this << " already finished" << dendl;
+    ldout(rwl.get_context(), 20) << this << " already finished" << dendl;
     ceph_assert(0);
   }
 }
@@ -151,93 +112,154 @@ void C_BlockIORequest<T>::deferred() {
 
 template <typename T>
 C_WriteRequest<T>::C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents,
-               bufferlist&& bl, const int fadvise_flags, Context *user_req)
-  : C_BlockIORequest<T>(rwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req) {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
-  }
+                                  bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+                                  PerfCounters *perfcounter, Context *user_req)
+  : C_BlockIORequest<T>(rwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req),
+    m_lock(lock), m_perfcounter(perfcounter) {
+  ldout(rwl.get_context(), 99) << this << dendl;
 }
 
 template <typename T>
 C_WriteRequest<T>::~C_WriteRequest() {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
-  }
+  ldout(rwl.get_context(), 99) << this << dendl;
 }
 
 template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                                const C_WriteRequest<T> &req) {
+                         const C_WriteRequest<T> &req) {
   os << (C_BlockIORequest<T>&)req
-     << " m_resources.allocated=" << req.resources.allocated;
-  if (req.m_op_set) {
-     os << "m_op_set=" << *req.m_op_set;
+     << " m_resources.allocated=" << req.m_resources.allocated;
+  if (req.op_set) {
+     os << "op_set=" << *req.op_set;
   }
   return os;
 };
 
 template <typename T>
 void C_WriteRequest<T>::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.m_cell << dendl;
-  }
+  ldout(rwl.get_context(), 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.cell << dendl;
 
-  ceph_assert(guard_ctx.m_cell);
-  this->detained = guard_ctx.m_state.detained; /* overlapped */
-  this->m_queued = guard_ctx.m_state.queued; /* queued behind at least one barrier */
-  this->set_cell(guard_ctx.m_cell);
+  ceph_assert(guard_ctx.cell);
+  this->detained = guard_ctx.state.detained; /* overlapped */
+  this->m_queued = guard_ctx.state.queued; /* queued behind at least one barrier */
+  this->set_cell(guard_ctx.cell);
 }
 
 template <typename T>
 void C_WriteRequest<T>::finish_req(int r) {
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
-  }
+  ldout(rwl.get_context(), 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
 
   /* Completed to caller by here (in finish(), which calls this) */
   utime_t now = ceph_clock_now();
   rwl.release_write_lanes(this);
+  ceph_assert(m_resources.allocated);
+  m_resources.allocated = false;
   this->release_cell(); /* TODO: Consider doing this in appending state */
   update_req_stats(now);
 }
 
 template <typename T>
-void C_WriteRequest<T>::setup_buffer_resources(uint64_t &bytes_cached, uint64_t &bytes_dirtied) {
+void C_WriteRequest<T>::setup_buffer_resources(
+    uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
+    uint64_t &number_lanes, uint64_t &number_log_entries,
+    uint64_t &number_unpublished_reserves) {
+
+  ceph_assert(!m_resources.allocated);
+
+  auto image_extents_size = this->image_extents.size();
+  m_resources.buffers.reserve(image_extents_size);
+
+  bytes_cached = 0;
+  bytes_allocated = 0;
+  number_lanes = image_extents_size;
+  number_log_entries = image_extents_size;
+  number_unpublished_reserves = image_extents_size;
+
   for (auto &extent : this->image_extents) {
-    resources.buffers.emplace_back();
-    struct WriteBufferAllocation &buffer = resources.buffers.back();
+    m_resources.buffers.emplace_back();
+    struct WriteBufferAllocation &buffer = m_resources.buffers.back();
     buffer.allocation_size = MIN_WRITE_ALLOC_SIZE;
     buffer.allocated = false;
     bytes_cached += extent.second;
     if (extent.second > buffer.allocation_size) {
       buffer.allocation_size = extent.second;
     }
+    bytes_allocated += buffer.allocation_size;
   }
   bytes_dirtied = bytes_cached;
 }
 
 template <typename T>
 void C_WriteRequest<T>::setup_log_operations() {
-  for (auto &extent : this->image_extents) {
-    /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
-    auto operation =
-      std::make_shared<WriteLogOperation<T>>(*m_op_set, extent.first, extent.second);
-    m_op_set->operations.emplace_back(operation);
+  {
+    std::lock_guard locker(m_lock);
+    // TODO: Add sync point if necessary
+    std::shared_ptr<SyncPoint> current_sync_point = rwl.get_current_sync_point();
+    uint64_t current_sync_gen = rwl.get_current_sync_gen();
+    op_set =
+      make_unique<WriteLogOperationSet>(this->m_dispatched_time,
+                                        m_perfcounter,
+                                        current_sync_point,
+                                        rwl.get_persist_on_flush(),
+                                        rwl.get_context(), this);
+    ldout(rwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get() << dendl;
+    ceph_assert(m_resources.allocated);
+    /* op_set->operations initialized differently for plain write or write same */
+    auto allocation = m_resources.buffers.begin();
+    uint64_t buffer_offset = 0;
+    for (auto &extent : this->image_extents) {
+      /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
+      auto operation =
+        std::make_shared<WriteLogOperation>(*op_set, extent.first, extent.second, rwl.get_context());
+      op_set->operations.emplace_back(operation);
+
+      /* A WS is also a write */
+      ldout(rwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get()
+                                   << " operation=" << operation << dendl;
+      rwl.inc_last_op_sequence_num();
+      operation->init(true, allocation, current_sync_gen,
+                      rwl.get_last_op_sequence_num(), this->bl, buffer_offset, op_set->persist_on_flush);
+      buffer_offset += operation->log_entry->write_bytes();
+      ldout(rwl.get_context(), 20) << "operation=[" << *operation << "]" << dendl;
+      allocation++;
+    }
+  }
+    /* All extent ops subs created */
+  op_set->extent_ops_appending->activate();
+  op_set->extent_ops_persist->activate();
+
+  /* Write data */
+  for (auto &operation : op_set->operations) {
+    operation->copy_bl_to_pmem_buffer();
   }
 }
 
+template <typename T>
+bool C_WriteRequest<T>::append_write_request(std::shared_ptr<SyncPoint> sync_point) {
+  std::lock_guard locker(m_lock);
+  auto write_req_sp = this;
+  if (sync_point->earlier_sync_point) {
+    Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) {
+        write_req_sp->schedule_append();
+      });
+    sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx);
+    return true;
+  }
+  return false;
+}
+
 template <typename T>
 void C_WriteRequest<T>::schedule_append() {
   ceph_assert(++m_appended == 1);
   if (m_do_early_flush) {
     /* This caller is waiting for persist, so we'll use their thread to
      * expedite it */
-    rwl.flush_pmem_buffer(this->m_op_set->operations);
-    rwl.schedule_append(this->m_op_set->operations);
+    rwl.flush_pmem_buffer(this->op_set->operations);
+    rwl.schedule_append(this->op_set->operations);
   } else {
     /* This is probably not still the caller's thread, so do the payload
      * flushing/replicating later. */
-    rwl.schedule_flush_and_append(this->m_op_set->operations);
+    rwl.schedule_flush_and_append(this->op_set->operations);
   }
 }
 
@@ -250,274 +272,49 @@ void C_WriteRequest<T>::schedule_append() {
  * Lanes are released after the write persists via release_write_lanes()
  */
 template <typename T>
-bool C_WriteRequest<T>::alloc_resources()
-{
-  bool alloc_succeeds = true;
-  bool no_space = false;
-  utime_t alloc_start = ceph_clock_now();
-  uint64_t bytes_allocated = 0;
-  uint64_t bytes_cached = 0;
-  uint64_t bytes_dirtied = 0;
-
-  ceph_assert(!resources.allocated);
-  resources.buffers.reserve(this->image_extents.size());
-  {
-    std::lock_guard locker(rwl.m_lock);
-    if (rwl.m_free_lanes < this->image_extents.size()) {
-      this->m_waited_lanes = true;
-      if (RWL_VERBOSE_LOGGING) {
-        ldout(rwl.m_image_ctx.cct, 20) << "not enough free lanes (need "
-                                       <<  this->image_extents.size()
-                                       << ", have " << rwl.m_free_lanes << ") "
-                                       << *this << dendl;
-      }
-      alloc_succeeds = false;
-      /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
-    }
-    if (rwl.m_free_log_entries < this->image_extents.size()) {
-      this->m_waited_entries = true;
-      if (RWL_VERBOSE_LOGGING) {
-        ldout(rwl.m_image_ctx.cct, 20) << "not enough free entries (need "
-                                       <<  this->image_extents.size()
-                                       << ", have " << rwl.m_free_log_entries << ") "
-                                       << *this << dendl;
-      }
-      alloc_succeeds = false;
-      no_space = true; /* Entries must be retired */
-    }
-    /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
-    if (rwl.m_bytes_allocated > rwl.m_bytes_allocated_cap) {
-      if (!this->m_waited_buffers) {
-        this->m_waited_buffers = true;
-        if (RWL_VERBOSE_LOGGING) {
-          ldout(rwl.m_image_ctx.cct, 1) << "Waiting for allocation cap (cap=" << rwl.m_bytes_allocated_cap
-                                        << ", allocated=" << rwl.m_bytes_allocated
-                                        << ") in write [" << *this << "]" << dendl;
-        }
-      }
-      alloc_succeeds = false;
-      no_space = true; /* Entries must be retired */
-    }
-  }
-  if (alloc_succeeds) {
-    setup_buffer_resources(bytes_cached, bytes_dirtied);
-  }
-
-  if (alloc_succeeds) {
-    for (auto &buffer : resources.buffers) {
-      bytes_allocated += buffer.allocation_size;
-      utime_t before_reserve = ceph_clock_now();
-      buffer.buffer_oid = pmemobj_reserve(rwl.m_log_pool,
-                                          &buffer.buffer_alloc_action,
-                                          buffer.allocation_size,
-                                          0 /* Object type */);
-      buffer.allocation_lat = ceph_clock_now() - before_reserve;
-      if (TOID_IS_NULL(buffer.buffer_oid)) {
-        if (!this->m_waited_buffers) {
-          this->m_waited_buffers = true;
-        }
-        if (RWL_VERBOSE_LOGGING) {
-          ldout(rwl.m_image_ctx.cct, 5) << "can't allocate all data buffers: "
-                                        << pmemobj_errormsg() << ". "
-                                        << *this << dendl;
-        }
-        alloc_succeeds = false;
-        no_space = true; /* Entries need to be retired */
-        break;
-      } else {
-        buffer.allocated = true;
-      }
-      if (RWL_VERBOSE_LOGGING) {
-        ldout(rwl.m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
-                                       << "." << buffer.buffer_oid.oid.off
-                                       << ", size=" << buffer.allocation_size << dendl;
-      }
-    }
-  }
-
-  if (alloc_succeeds) {
-    unsigned int num_extents = this->image_extents.size();
-    std::lock_guard locker(rwl.m_lock);
-    /* We need one free log entry per extent (each is a separate entry), and
-     * one free "lane" for remote replication. */
-    if ((rwl.m_free_lanes >= num_extents) &&
-        (rwl.m_free_log_entries >= num_extents)) {
-      rwl.m_free_lanes -= num_extents;
-      rwl.m_free_log_entries -= num_extents;
-      rwl.m_unpublished_reserves += num_extents;
-      rwl.m_bytes_allocated += bytes_allocated;
-      rwl.m_bytes_cached += bytes_cached;
-      rwl.m_bytes_dirty += bytes_dirtied;
-      resources.allocated = true;
-    } else {
-      alloc_succeeds = false;
-    }
-  }
-
-  if (!alloc_succeeds) {
-    /* On alloc failure, free any buffers we did allocate */
-    for (auto &buffer : resources.buffers) {
-      if (buffer.allocated) {
-        pmemobj_cancel(rwl.m_log_pool, &buffer.buffer_alloc_action, 1);
-      }
-    }
-    resources.buffers.clear();
-    if (no_space) {
-      /* Expedite flushing and/or retiring */
-      std::lock_guard locker(rwl.m_lock);
-      rwl.m_alloc_failed_since_retire = true;
-      rwl.m_last_alloc_fail = ceph_clock_now();
-    }
-  }
-
-  this->m_allocated_time = alloc_start;
-  return alloc_succeeds;
+bool C_WriteRequest<T>::alloc_resources() {
+  this->allocated_time = ceph_clock_now();
+  return rwl.alloc_resources(this);
 }
 
 /**
  * Takes custody of write_req. Resources must already be allocated.
  *
  * Locking:
- * Acquires m_lock
+ * Acquires lock
  */
 template <typename T>
 void C_WriteRequest<T>::dispatch()
 {
-  CephContext *cct = rwl.m_image_ctx.cct;
-  GeneralWriteLogEntries log_entries;
-  DeferredContexts on_exit;
+  CephContext *cct = rwl.get_context();
   utime_t now = ceph_clock_now();
-  auto write_req_sp = this;
   this->m_dispatched_time = now;
 
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(cct, 15) << "name: " << rwl.m_image_ctx.name << " id: " << rwl.m_image_ctx.id
-                   << "write_req=" << this << " cell=" << this->get_cell() << dendl;
-  }
-
-  {
-    uint64_t buffer_offset = 0;
-    std::lock_guard locker(rwl.m_lock);
-    Context *set_complete = this;
-    // TODO: Add sync point if necessary
-    //
-    m_op_set =
-      make_unique<WriteLogOperationSet<T>>(rwl, now, rwl.m_current_sync_point, rwl.m_persist_on_flush,
-                                           set_complete);
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(cct, 20) << "write_req=" << this << " m_op_set=" << m_op_set.get() << dendl;
-    }
-    ceph_assert(resources.allocated);
-    /* m_op_set->operations initialized differently for plain write or write same */
-    this->setup_log_operations();
-    auto allocation = resources.buffers.begin();
-    for (auto &gen_op : m_op_set->operations) {
-      /* A WS is also a write */
-      auto operation = gen_op->get_write_op();
-      if (RWL_VERBOSE_LOGGING) {
-        ldout(cct, 20) << "write_req=" << this << " m_op_set=" << m_op_set.get()
-                       << " operation=" << operation << dendl;
-      }
-      log_entries.emplace_back(operation->log_entry);
-      rwl.m_perfcounter->inc(l_librbd_rwl_log_ops, 1);
-
-      operation->log_entry->ram_entry.has_data = 1;
-      operation->log_entry->ram_entry.write_data = allocation->buffer_oid;
-      // TODO: make std::shared_ptr
-      operation->buffer_alloc = &(*allocation);
-      ceph_assert(!TOID_IS_NULL(operation->log_entry->ram_entry.write_data));
-      operation->log_entry->pmem_buffer = D_RW(operation->log_entry->ram_entry.write_data);
-      operation->log_entry->ram_entry.sync_gen_number = rwl.m_current_sync_gen;
-      if (m_op_set->persist_on_flush) {
-        /* Persist on flush. Sequence #0 is never used. */
-        operation->log_entry->ram_entry.write_sequence_number = 0;
-      } else {
-        /* Persist on write */
-        operation->log_entry->ram_entry.write_sequence_number = ++rwl.m_last_op_sequence_num;
-        operation->log_entry->ram_entry.sequenced = 1;
-      }
-      operation->log_entry->ram_entry.sync_point = 0;
-      operation->log_entry->ram_entry.discard = 0;
-      operation->bl.substr_of(this->bl, buffer_offset,
-                              operation->log_entry->write_bytes());
-      buffer_offset += operation->log_entry->write_bytes();
-      if (RWL_VERBOSE_LOGGING) {
-        ldout(cct, 20) << "operation=[" << *operation << "]" << dendl;
-      }
-      allocation++;
-    }
-  }
-  /* All extent ops subs created */
-  m_op_set->extent_ops_appending->activate();
-  m_op_set->extent_ops_persist->activate();
-
-  /* Write data */
-  for (auto &operation : m_op_set->operations) {
-    /* operation is a shared_ptr, so write_op is only good as long as operation is in scope */
-    auto write_op = operation->get_write_op();
-    ceph_assert(write_op != nullptr);
-    bufferlist::iterator i(&write_op->bl);
-    rwl.m_perfcounter->inc(l_librbd_rwl_log_op_bytes, write_op->log_entry->write_bytes());
-    if (RWL_VERBOSE_LOGGING) {
-      ldout(cct, 20) << write_op->bl << dendl;
-    }
-    i.copy((unsigned)write_op->log_entry->write_bytes(), (char*)write_op->log_entry->pmem_buffer);
-  }
-
-  // TODO: Add to log map for read
-
-  /*
-   * Entries are added to m_log_entries in alloc_op_log_entries() when their
-   * order is established. They're added to m_dirty_log_entries when the write
-   * completes to all replicas. They must not be flushed before then. We don't
-   * prevent the application from reading these before they persist. If we
-   * supported coherent shared access, that might be a problem (the write could
-   * fail after another initiator had read it). As it is the cost of running
-   * reads through the block guard (and exempting them from the barrier, which
-   * doesn't need to apply to them) to prevent reading before the previous
-   * write of that data persists doesn't seem justified.
-   */
-
-  if (rwl.m_persist_on_flush_early_user_comp &&
-      m_op_set->persist_on_flush) {
-    /*
-     * We're done with the caller's buffer, and not guaranteeing
-     * persistence until the next flush. The block guard for this
-     * write_req will not be released until the write is persisted
-     * everywhere, but the caller's request can complete now.
-     */
-    this->complete_user_request(0);
-  }
+  ldout(cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
+  setup_log_operations();
 
   bool append_deferred = false;
-  {
-    std::lock_guard locker(rwl.m_lock);
-    if (!m_op_set->persist_on_flush &&
-        m_op_set->sync_point->earlier_sync_point) {
-      /* In persist-on-write mode, we defer the append of this write until the
-       * previous sync point is appending (meaning all the writes before it are
-       * persisted and that previous sync point can now appear in the
-       * log). Since we insert sync points in persist-on-write mode when writes
-       * have already completed to the current sync point, this limits us to
-       * one inserted sync point in flight at a time, and gives the next
-       * inserted sync point some time to accumulate a few writes if they
-       * arrive soon. Without this we can insert an absurd number of sync
-       * points, each with one or two writes. That uses a lot of log entries,
-       * and limits flushing to very few writes at a time. */
-      m_do_early_flush = false;
-      Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) {
-          write_req_sp->schedule_append();
-        });
-      m_op_set->sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx);
-      append_deferred = true;
-    } else {
-      /* The prior sync point is done, so we'll schedule append here. If this is
-       * persist-on-write, and probably still the caller's thread, we'll use this
-       * caller's thread to perform the persist & replication of the payload
-       * buffer. */
-      m_do_early_flush =
-        !(this->detained || this->m_queued || this->m_deferred || m_op_set->persist_on_flush);
-    }
+  if (!op_set->persist_on_flush &&
+      append_write_request(op_set->sync_point)) {
+    /* In persist-on-write mode, we defer the append of this write until the
+     * previous sync point is appending (meaning all the writes before it are
+     * persisted and that previous sync point can now appear in the
+     * log). Since we insert sync points in persist-on-write mode when writes
+     * have already completed to the current sync point, this limits us to
+     * one inserted sync point in flight at a time, and gives the next
+     * inserted sync point some time to accumulate a few writes if they
+     * arrive soon. Without this we can insert an absurd number of sync
+     * points, each with one or two writes. That uses a lot of log entries,
+     * and limits flushing to very few writes at a time. */
+    m_do_early_flush = false;
+    append_deferred = true;
+  } else {
+    /* The prior sync point is done, so we'll schedule append here. If this is
+     * persist-on-write, and probably still the caller's thread, we'll use this
+     * caller's thread to perform the persist & replication of the payload
+     * buffer. */
+    m_do_early_flush =
+      !(this->detained || this->m_queued || this->m_deferred || op_set->persist_on_flush);
   }
   if (!append_deferred) {
     this->schedule_append();
@@ -539,18 +336,18 @@ GuardedRequestFunctionContext::GuardedRequestFunctionContext(boost::function<voi
 GuardedRequestFunctionContext::~GuardedRequestFunctionContext(void) { }
 
 void GuardedRequestFunctionContext::finish(int r) {
-  ceph_assert(m_cell);
+  ceph_assert(cell);
   m_callback(*this);
 }
 
 std::ostream &operator<<(std::ostream &os,
                          const GuardedRequest &r) {
-  os << "guard_ctx->m_state=[" << r.guard_ctx->m_state << "], "
+  os << "guard_ctx->state=[" << r.guard_ctx->state << "], "
      << "block_extent.block_start=" << r.block_extent.block_start << ", "
      << "block_extent.block_start=" << r.block_extent.block_end;
   return os;
 };
 
-} // namespace rwl 
-} // namespace cache 
-} // namespace librbd 
+} // namespace rwl
+} // namespace cache
+} // namespace librbd
index 7af0c196e889d063e2606c48469db042d4673d0c..595e06c4396d401630631e8ea07ef8c8b9b2a49b 100644 (file)
@@ -17,50 +17,43 @@ namespace rwl {
 
 class GuardedRequestFunctionContext;
 
-/**
- * A request that can be deferred in a BlockGuard to sequence
- * overlapping operations.
- */
-template <typename T>
-class C_GuardedBlockIORequest : public Context {
-public:
-  T &rwl;
-  C_GuardedBlockIORequest(T &rwl);
-  ~C_GuardedBlockIORequest();
-  C_GuardedBlockIORequest(const C_GuardedBlockIORequest&) = delete;
-  C_GuardedBlockIORequest &operator=(const C_GuardedBlockIORequest&) = delete;
-
-  virtual const char *get_name() const = 0;
-  void set_cell(BlockGuardCell *cell);
-  BlockGuardCell *get_cell(void);
-  void release_cell();
-  
-private:
-  std::atomic<bool> m_cell_released = {false};
-  BlockGuardCell* m_cell = nullptr;
+struct WriteRequestResources {
+  bool allocated = false;
+  std::vector<WriteBufferAllocation> buffers;
 };
 
 /**
+ * A request that can be deferred in a BlockGuard to sequence
+ * overlapping operations.
  * This is the custodian of the BlockGuard cell for this IO, and the
  * state information about the progress of this IO. This object lives
  * until the IO is persisted in all (live) log replicas.  User request
  * may be completed from here before the IO persists.
  */
 template <typename T>
-class C_BlockIORequest : public C_GuardedBlockIORequest<T> {
+class C_BlockIORequest : public Context {
 public:
-  using C_GuardedBlockIORequest<T>::rwl;
-
+  T &rwl;
   io::Extents image_extents;
   bufferlist bl;
   int fadvise_flags;
   Context *user_req; /* User write request */
   ExtentsSummary<io::Extents> image_extents_summary;
   bool detained = false;                /* Detained in blockguard (overlapped with a prior IO) */
+  utime_t allocated_time;               /* When allocation began */
+  bool waited_lanes = false;            /* This IO waited for free persist/replicate lanes */
+  bool waited_entries = false;          /* This IO waited for free log entries */
+  bool waited_buffers = false;          /* This IO waited for data buffers (pmemobj_reserve() failed) */
 
   C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents,
                    bufferlist&& bl, const int fadvise_flags, Context *user_req);
-  virtual ~C_BlockIORequest();
+  ~C_BlockIORequest() override;
+  C_BlockIORequest(const C_BlockIORequest&) = delete;
+  C_BlockIORequest &operator=(const C_BlockIORequest&) = delete;
+
+  void set_cell(BlockGuardCell *cell);
+  BlockGuardCell *get_cell(void);
+  void release_cell();
 
   void complete_user_request(int r);
   void finish(int r);
@@ -74,34 +67,59 @@ public:
 
   virtual void dispatch()  = 0;
 
-  virtual const char *get_name() const override {
+  virtual const char *get_name() const {
     return "C_BlockIORequest";
   }
+  uint64_t get_image_extents_size() {
+    return image_extents.size();
+  }
+  void set_io_waited_for_lanes(bool waited) {
+    waited_lanes = waited;
+  }
+  void set_io_waited_for_entries(bool waited) {
+    waited_entries = waited;
+  }
+  void set_io_waited_for_buffers(bool waited) {
+    waited_buffers = waited;
+  }
+  bool has_io_waited_for_buffers() {
+    return waited_buffers;
+  }
+  std::vector<WriteBufferAllocation>& get_resources_buffers() {
+    return m_resources.buffers;
+  }
+
+  void set_allocated(bool allocated) {
+    if (allocated) {
+      m_resources.allocated = true;
+    } else {
+      m_resources.buffers.clear();
+    }
+  }
+
+  virtual void setup_buffer_resources(
+      uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
+      uint64_t &number_lanes, uint64_t &number_log_entries,
+      uint64_t &number_unpublished_reserves) {};
 
 protected:
   utime_t m_arrived_time;
-  utime_t m_allocated_time;               /* When allocation began */
   utime_t m_dispatched_time;              /* When dispatch began */
   utime_t m_user_req_completed_time;
-  bool m_waited_lanes = false;            /* This IO waited for free persist/replicate lanes */
-  bool m_waited_entries = false;          /* This IO waited for free log entries */
-  bool m_waited_buffers = false;          /* This IO waited for data buffers (pmemobj_reserve() failed) */
   std::atomic<bool> m_deferred = {false}; /* Deferred because this or a prior IO had to wait for write resources */
+  WriteRequestResources m_resources;
 
 private:
   std::atomic<bool> m_user_req_completed = {false};
   std::atomic<bool> m_finish_called = {false};
+  std::atomic<bool> m_cell_released = {false};
+  BlockGuardCell* m_cell = nullptr;
 
   template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
                                   const C_BlockIORequest<U> &req);
 };
 
-struct WriteRequestResources {
-  bool allocated = false;
-  std::vector<WriteBufferAllocation> buffers;
-};
-
 /**
  * This is the custodian of the BlockGuard cell for this write. Block
  * guard is not released until the write persists everywhere (this is
@@ -112,26 +130,24 @@ template <typename T>
 class C_WriteRequest : public C_BlockIORequest<T> {
 public:
   using C_BlockIORequest<T>::rwl;
-  WriteRequestResources resources;
+  unique_ptr<WriteLogOperationSet> op_set = nullptr;
 
   C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents,
-                 bufferlist&& bl, const int fadvise_flags, Context *user_req);
+                 bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
+                 PerfCounters *perfcounter, Context *user_req);
 
-  ~C_WriteRequest();
+  ~C_WriteRequest() override;
 
   void blockguard_acquired(GuardedRequestFunctionContext &guard_ctx);
 
   /* Common finish to plain write and compare-and-write (if it writes) */
-  virtual void finish_req(int r);
+  void finish_req(int r) override;
 
   /* Compare and write will override this */
   virtual void update_req_stats(utime_t &now) {
     // TODO: Add in later PRs
   }
-  virtual bool alloc_resources() override;
-
-  /* Plain writes will allocate one buffer per request extent */
-  virtual void setup_buffer_resources(uint64_t &bytes_cached, uint64_t &bytes_dirtied);
+  bool alloc_resources() override;
 
   void deferred_handler() override { }
 
@@ -139,18 +155,28 @@ public:
 
   virtual void setup_log_operations();
 
+  bool append_write_request(std::shared_ptr<SyncPoint> sync_point);
+
   virtual void schedule_append();
 
   const char *get_name() const override {
     return "C_WriteRequest";
   }
 
+protected:
+  using C_BlockIORequest<T>::m_resources;
+  /* Plain writes will allocate one buffer per request extent */
+  void setup_buffer_resources(
+      uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
+      uint64_t &number_lanes, uint64_t &number_log_entries,
+      uint64_t &number_unpublished_reserves) override;
+
 private:
-  unique_ptr<WriteLogOperationSet<T>> m_op_set = nullptr;
   bool m_do_early_flush = false;
   std::atomic<int> m_appended = {0};
   bool m_queued = false;
-
+  ceph::mutex &m_lock;
+  PerfCounters *m_perfcounter = nullptr;
   template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
                                   const C_WriteRequest<U> &req);
@@ -167,10 +193,10 @@ struct BlockGuardReqState {
 
 class GuardedRequestFunctionContext : public Context {
 public:
-  BlockGuardCell *m_cell = nullptr;
-  BlockGuardReqState m_state;
+  BlockGuardCell *cell = nullptr;
+  BlockGuardReqState state;
   GuardedRequestFunctionContext(boost::function<void(GuardedRequestFunctionContext&)> &&callback);
-  ~GuardedRequestFunctionContext(void);
+  ~GuardedRequestFunctionContext(void) override;
   GuardedRequestFunctionContext(const GuardedRequestFunctionContext&) = delete;
   GuardedRequestFunctionContext &operator=(const GuardedRequestFunctionContext&) = delete;
 
@@ -187,14 +213,14 @@ public:
   GuardedRequest(const BlockExtent block_extent,
                  GuardedRequestFunctionContext *on_guard_acquire, bool barrier = false)
     : block_extent(block_extent), guard_ctx(on_guard_acquire) {
-    guard_ctx->m_state.barrier = barrier;
+    guard_ctx->state.barrier = barrier;
   }
   friend std::ostream &operator<<(std::ostream &os,
                                   const GuardedRequest &r);
 };
 
-} // namespace rwl 
-} // namespace cache 
-} // namespace librbd 
+} // namespace rwl
+} // namespace cache
+} // namespace librbd
 
-#endif // CEPH_LIBRBD_CACHE_RWL_REQUEST_H 
+#endif // CEPH_LIBRBD_CACHE_RWL_REQUEST_H
index c9c4582d8b28fd82d3b26bd4d590f630ae3dc6d1..d9573aa0750ccc1210a99bdda57d59a730bd0736 100644 (file)
@@ -3,7 +3,7 @@
 
 #include "SyncPoint.h"
 
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::cache::rwl::SyncPoint: " << this << " " \
                            <<  __func__ << ": "
@@ -12,39 +12,35 @@ namespace librbd {
 namespace cache {
 namespace rwl {
 
-template <typename T>
-SyncPoint<T>::SyncPoint(T &rwl, const uint64_t sync_gen_num)
-  : rwl(rwl), log_entry(std::make_shared<SyncPointLogEntry>(sync_gen_num)) {
-  prior_log_entries_persisted = new C_Gather(rwl.m_image_ctx.cct, nullptr);
-  sync_point_persist = new C_Gather(rwl.m_image_ctx.cct, nullptr);
+SyncPoint::SyncPoint(uint64_t sync_gen_num, CephContext *cct)
+  : log_entry(std::make_shared<SyncPointLogEntry>(sync_gen_num)), m_cct(cct) {
+  prior_log_entries_persisted = new C_Gather(cct, nullptr);
+  sync_point_persist = new C_Gather(cct, nullptr);
   on_sync_point_appending.reserve(MAX_WRITES_PER_SYNC_POINT + 2);
   on_sync_point_persisted.reserve(MAX_WRITES_PER_SYNC_POINT + 2);
-  if (RWL_VERBOSE_LOGGING) {
-    ldout(rwl.m_image_ctx.cct, 20) << "sync point " << sync_gen_num << dendl;
-  }
+  ldout(m_cct, 20) << "sync point " << sync_gen_num << dendl;
 }
 
-template <typename T>
-SyncPoint<T>::~SyncPoint() {
+SyncPoint::~SyncPoint() {
   ceph_assert(on_sync_point_appending.empty());
   ceph_assert(on_sync_point_persisted.empty());
   ceph_assert(!earlier_sync_point);
 }
 
-template <typename T>
-std::ostream &SyncPoint<T>::format(std::ostream &os) const {
-  os << "log_entry=[" << *log_entry << "], "
-     << "earlier_sync_point=" << earlier_sync_point << ", "
-     << "later_sync_point=" << later_sync_point << ", "
-     << "final_op_sequence_num=" << final_op_sequence_num << ", "
-     << "prior_log_entries_persisted=" << prior_log_entries_persisted << ", "
-     << "prior_log_entries_persisted_complete=" << prior_log_entries_persisted_complete << ", "
-     << "append_scheduled=" << append_scheduled << ", "
-     << "appending=" << appending << ", "
-     << "on_sync_point_appending=" << on_sync_point_appending.size() << ", "
-     << "on_sync_point_persisted=" << on_sync_point_persisted.size() << "";
+std::ostream &operator<<(std::ostream &os,
+                         const SyncPoint &p) {
+  os << "log_entry=[" << *p.log_entry << "], "
+     << "earlier_sync_point=" << p.earlier_sync_point << ", "
+     << "later_sync_point=" << p.later_sync_point << ", "
+     << "final_op_sequence_num=" << p.final_op_sequence_num << ", "
+     << "prior_log_entries_persisted=" << p.prior_log_entries_persisted << ", "
+     << "prior_log_entries_persisted_complete=" << p.prior_log_entries_persisted_complete << ", "
+     << "append_scheduled=" << p.append_scheduled << ", "
+     << "appending=" << p.appending << ", "
+     << "on_sync_point_appending=" << p.on_sync_point_appending.size() << ", "
+     << "on_sync_point_persisted=" << p.on_sync_point_persisted.size() << "";
   return os;
-};
+}
 
 } // namespace rwl
 } // namespace cache
index 7dad72046238a936ae27ba4ea001f9fb1d945616..da3cd09610ea211648c34ac12ab09ab234b807b5 100644 (file)
 namespace librbd {
 namespace cache {
 namespace rwl {
-/* Limit work between sync points */
-static const uint64_t MAX_WRITES_PER_SYNC_POINT = 256;
 
-template <typename T>
 class SyncPoint {
 public:
-  T &rwl;
   std::shared_ptr<SyncPointLogEntry> log_entry;
-  /* Use m_lock for earlier/later links */
-  std::shared_ptr<SyncPoint<T>> earlier_sync_point; /* NULL if earlier has completed */
-  std::shared_ptr<SyncPoint<T>> later_sync_point;
+  /* Use lock for earlier/later links */
+  std::shared_ptr<SyncPoint> earlier_sync_point; /* NULL if earlier has completed */
+  std::shared_ptr<SyncPoint> later_sync_point;
   uint64_t final_op_sequence_num = 0;
   /* A sync point can't appear in the log until all the writes bearing
    * it and all the prior sync points have been appended and
@@ -46,15 +42,15 @@ public:
    * aio_flush() calls are added to this. */
   std::vector<Context*> on_sync_point_persisted;
 
-  SyncPoint(T &rwl, const uint64_t sync_gen_num);
+  SyncPoint(uint64_t sync_gen_num, CephContext *cct);
   ~SyncPoint();
   SyncPoint(const SyncPoint&) = delete;
   SyncPoint &operator=(const SyncPoint&) = delete;
-  std::ostream &format(std::ostream &os) const;
+
+private:
+  CephContext *m_cct;
   friend std::ostream &operator<<(std::ostream &os,
-                                  const SyncPoint &p) {
-    return p.format(os);
-  }
+                                  const SyncPoint &p);
 };
 
 } // namespace rwl
index 3a6cfa6a5ead255a2da17a3e405a86d63c3dbb75..3ad62176f0d2b84043b1b31d522182d97d2c9463 100644 (file)
@@ -6,7 +6,7 @@
 #include "common/ceph_context.h"
 #include "include/Context.h"
 
-#define dout_subsys ceph_subsys_rbd
+#define dout_subsys ceph_subsys_rbd_rwl
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::cache::rwl::Types: " << this << " " \
                            <<  __func__ << ": "
@@ -35,43 +35,21 @@ void DeferredContexts::add(Context* ctx) {
  * convert between image and block extents here using a "block size"
  * of 1.
  */
-const BlockExtent block_extent(const uint64_t offset_bytes, const uint64_t length_bytes)
+BlockExtent convert_to_block_extent(const uint64_t offset_bytes, const uint64_t length_bytes)
 {
   return BlockExtent(offset_bytes,
                      offset_bytes + length_bytes - 1);
 }
 
-const BlockExtent WriteLogPmemEntry::block_extent() {
-  return BlockExtent(librbd::cache::rwl::block_extent(image_offset_bytes, write_bytes));
+BlockExtent WriteLogPmemEntry::block_extent() {
+  return convert_to_block_extent(image_offset_bytes, write_bytes);
 }
 
-bool WriteLogPmemEntry::is_sync_point() {
-  return sync_point;
-}
-
-bool WriteLogPmemEntry::is_discard() {
-  return discard;
-}
-
-bool WriteLogPmemEntry::is_writesame() {
-  return writesame;
-}
-
-bool WriteLogPmemEntry::is_write() {
-  /* Log entry is a basic write */
-  return !is_sync_point() && !is_discard() && !is_writesame();
-}
-
-bool WriteLogPmemEntry::is_writer() {
-  /* Log entry is any type that writes data */
-  return is_write() || is_discard() || is_writesame();
-}
-
-const uint64_t WriteLogPmemEntry::get_offset_bytes() {
+uint64_t WriteLogPmemEntry::get_offset_bytes() {
   return image_offset_bytes;
 }
 
-const uint64_t WriteLogPmemEntry::get_write_bytes() {
+uint64_t WriteLogPmemEntry::get_write_bytes() {
   return write_bytes;
 }
 
@@ -93,10 +71,9 @@ std::ostream& operator<<(std::ostream& os,
 };
 
 template <typename ExtentsType>
-ExtentsSummary<ExtentsType>::ExtentsSummary(const ExtentsType &extents) {
-  total_bytes = 0;
-  first_image_byte = 0;
-  last_image_byte = 0;
+ExtentsSummary<ExtentsType>::ExtentsSummary(const ExtentsType &extents)
+  : total_bytes(0), first_image_byte(0), last_image_byte(0)
+{
   if (extents.empty()) return;
   /* These extents refer to image offsets between first_image_byte
    * and last_image_byte, inclusive, but we don't guarantee here
@@ -119,7 +96,7 @@ ExtentsSummary<ExtentsType>::ExtentsSummary(const ExtentsType &extents) {
 
 template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                                const ExtentsSummary<T> &s) {
+                         const ExtentsSummary<T> &s) {
   os << "total_bytes=" << s.total_bytes << ", "
      << "first_image_byte=" << s.first_image_byte << ", "
      << "last_image_byte=" << s.last_image_byte << "";
index ac76aaf1f8567668ff11b381380b3dfc673fa710..ac300a8b38a57331762a1132c2a37a8f7571b765 100644 (file)
@@ -137,6 +137,13 @@ enum {
   l_librbd_rwl_last,
 };
 
+namespace librbd {
+namespace cache {
+namespace rwl {
+
+/* Limit work between sync points */
+const uint64_t MAX_WRITES_PER_SYNC_POINT = 256;
+
 const uint32_t MIN_WRITE_ALLOC_SIZE = 512;
 const uint32_t LOG_STATS_INTERVAL_SECONDS = 5;
 
@@ -152,12 +159,6 @@ const uint64_t BLOCK_ALLOC_OVERHEAD_BYTES = 16;
 const uint8_t RWL_POOL_VERSION = 1;
 const uint64_t MAX_LOG_ENTRIES = (1024 * 1024);
 
-namespace librbd {
-namespace cache {
-namespace rwl {
-
-static const bool RWL_VERBOSE_LOGGING = false;
-
 /* Defer a set of Contexts until destruct/exit. Used for deferring
  * work on a given thread until a required lock is dropped. */
 class DeferredContexts {
@@ -197,14 +198,9 @@ struct WriteLogPmemEntry {
     : image_offset_bytes(image_offset_bytes), write_bytes(write_bytes),
       entry_valid(0), sync_point(0), sequenced(0), has_data(0), discard(0), writesame(0) {
   }
-  const BlockExtent block_extent();
-  bool is_sync_point();
-  bool is_discard();
-  bool is_writesame();
-  bool is_write();
-  bool is_writer();
-  const uint64_t get_offset_bytes();
-  const uint64_t get_write_bytes();
+  BlockExtent block_extent();
+  uint64_t get_offset_bytes();
+  uint64_t get_write_bytes();
   friend std::ostream& operator<<(std::ostream& os,
                                   const WriteLogPmemEntry &entry);
 };
@@ -236,30 +232,31 @@ struct WriteBufferAllocation {
   utime_t allocation_lat;
 };
 
+static inline io::Extent image_extent(const BlockExtent& block_extent) {
+  return io::Extent(block_extent.block_start,
+                    block_extent.block_end - block_extent.block_start + 1);
+}
+
 template <typename ExtentsType>
 class ExtentsSummary {
 public:
   uint64_t total_bytes;
   uint64_t first_image_byte;
   uint64_t last_image_byte;
-  ExtentsSummary(const ExtentsType &extents);
+  explicit ExtentsSummary(const ExtentsType &extents);
   template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
                                   const ExtentsSummary<U> &s);
-  const BlockExtent block_extent() {
+  BlockExtent block_extent() {
     return BlockExtent(first_image_byte, last_image_byte);
   }
-  const io::Extent image_extent(const BlockExtent& block_extent) {
-    return io::Extent(block_extent.block_start,
-                      block_extent.block_end - block_extent.block_start + 1);
-  }
-  const io::Extent image_extent() {
+  io::Extent image_extent() {
     return image_extent(block_extent());
   }
 };
 
-} // namespace rwl 
-} // namespace cache 
-} // namespace librbd 
+} // namespace rwl
+} // namespace cache
+} // namespace librbd
 
 #endif // CEPH_LIBRBD_CACHE_RWL_TYPES_H
index 8cf86556c0bb7ce85740282f4686292de7a69df3..b3ea123499fb78c36e33471ab7a99e4f16e084b4 100644 (file)
@@ -100,8 +100,8 @@ struct TestMockCacheReplicatedWriteLog : public TestMockFixture {
   void expect_context_complete(MockContextRWL& mock_context, int r) {
     EXPECT_CALL(mock_context, complete(r))
       .WillRepeatedly(Invoke([&mock_context](int r) {
-                       mock_context.do_complete(r);
-                     }));
+                        mock_context.do_complete(r);
+                      }));
   }
 
   void expect_metadata_set(MockImageCtx& mock_image_ctx) {
@@ -196,5 +196,34 @@ TEST_F(TestMockCacheReplicatedWriteLog, init_shutdown) {
   ASSERT_EQ(0, finish_ctx2.wait());
 }
 
+TEST_F(TestMockCacheReplicatedWriteLog, aio_write) {
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockImageCtx mock_image_ctx(*ictx);
+  MockReplicatedWriteLog rwl(mock_image_ctx, get_cache_state(mock_image_ctx));
+
+  MockContextRWL finish_ctx1;
+  expect_op_work_queue(mock_image_ctx);
+  expect_metadata_set(mock_image_ctx);
+  expect_context_complete(finish_ctx1, 0);
+  rwl.init(&finish_ctx1);
+  ASSERT_EQ(0, finish_ctx1.wait());
+
+  MockContextRWL finish_ctx2;
+  expect_context_complete(finish_ctx2, 0);
+  Extents image_extents{{0, 4096}};
+  bufferlist bl;
+  bl.append(std::string(4096, '1'));
+  int fadvise_flags = 0;
+  rwl.aio_write(std::move(image_extents), std::move(bl), fadvise_flags, &finish_ctx2);
+  ASSERT_EQ(0, finish_ctx2.wait());
+
+  MockContextRWL finish_ctx3;
+  expect_context_complete(finish_ctx3, 0);
+  rwl.shut_down(&finish_ctx3);
+  ASSERT_EQ(0, finish_ctx3.wait());
+}
+
 } // namespace cache
 } // namespace librbd