]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd: aio_write to flush data into cache device
authorYuan Lu <yuan.y.lu@intel.com>
Thu, 21 Nov 2019 08:11:01 +0000 (16:11 +0800)
committerYuan Lu <yuan.y.lu@intel.com>
Thu, 20 Feb 2020 13:18:22 +0000 (21:18 +0800)
This part calls WriteQuest to flush data into cache device.

Signed-off-by: Peterson, Scott <scott.d.peterson@intel.com>
Signed-off-by: Li, Xiaoyan <xiaoyan.li@intel.com>
Signed-off-by: Lu, Yuan <yuan.y.lu@intel.com>
Signed-off-by: Chamarthy, Mahati <mahati.chamarthy@intel.com>
12 files changed:
src/librbd/cache/ReplicatedWriteLog.cc
src/librbd/cache/ReplicatedWriteLog.h
src/librbd/cache/rwl/LogEntry.cc
src/librbd/cache/rwl/LogEntry.h
src/librbd/cache/rwl/LogOperation.cc
src/librbd/cache/rwl/LogOperation.h
src/librbd/cache/rwl/Request.cc
src/librbd/cache/rwl/Request.h
src/librbd/cache/rwl/SyncPoint.cc
src/librbd/cache/rwl/Types.cc
src/librbd/cache/rwl/Types.h
src/test/librbd/cache/test_mock_ReplicatedWriteLog.cc

index 5415fa569d049158d3f1019e31582ff6466be6c9..343790a41c44b313e51dd80db524e4a07a4a9776 100644 (file)
@@ -33,15 +33,25 @@ using namespace librbd::cache::rwl;
 typedef ReplicatedWriteLog<ImageCtx>::Extent Extent;
 typedef ReplicatedWriteLog<ImageCtx>::Extents Extents;
 
+const unsigned long int ops_appended_together = MAX_ALLOC_PER_TRANSACTION;
+
 template <typename I>
 ReplicatedWriteLog<I>::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::ImageCacheState<I>* cache_state)
   : m_cache_state(cache_state),
     m_rwl_pool_layout_name(POBJ_LAYOUT_NAME(rbd_rwl)),
     m_image_ctx(image_ctx),
     m_log_pool_config_size(DEFAULT_POOL_SIZE),
-    m_image_writeback(image_ctx),
+    m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct),
+    m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name(
+      "librbd::cache::ReplicatedWriteLog::m_deferred_dispatch_lock", this))),
+    m_log_append_lock(ceph::make_mutex(util::unique_lock_name(
+      "librbd::cache::ReplicatedWriteLog::m_log_append_lock", this))),
     m_lock(ceph::make_mutex(util::unique_lock_name(
       "librbd::cache::ReplicatedWriteLog::m_lock", this))),
+    m_blockguard_lock(ceph::make_mutex(util::unique_lock_name(
+      "librbd::cache::ReplicatedWriteLog::m_blockguard_lock", this))),
+    m_entry_bl_lock(ceph::make_mutex(util::unique_lock_name(
+      "librbd::cache::ReplicatedWriteLog::m_entry_bl_lock", this))),
     m_thread_pool(image_ctx.cct, "librbd::cache::ReplicatedWriteLog::thread_pool", "tp_rwl",
                   4,
                   ""),
@@ -337,7 +347,7 @@ void ReplicatedWriteLog<I>::rwl_init(Context *on_finish, DeferredContexts &later
                         m_log_pool_config_size,
                         (S_IWUSR | S_IRUSR))) == NULL) {
       lderr(cct) << "failed to create pool (" << m_log_pool_name << ")"
-                   << pmemobj_errormsg() << dendl;
+                 << pmemobj_errormsg() << dendl;
       m_cache_state->present = false;
       m_cache_state->clean = true;
       m_cache_state->empty = true;
@@ -407,6 +417,9 @@ void ReplicatedWriteLog<I>::rwl_init(Context *on_finish, DeferredContexts &later
 
   // TODO: Will init sync point, this will be covered in later PR.
   //  init_flush_new_sync_point(later);
+  ++m_current_sync_gen;
+  auto new_sync_point = std::make_shared<SyncPointT>(*this, m_current_sync_gen);
+  m_current_sync_point = new_sync_point;
 
   m_initialized = true;
   // Start the thread
@@ -469,6 +482,41 @@ void ReplicatedWriteLog<I>::aio_write(Extents &&image_extents,
                                       bufferlist&& bl,
                                       int fadvise_flags,
                                       Context *on_finish) {
+  CephContext *cct = m_image_ctx.cct;
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(cct, 20) << "aio_write" << dendl;
+  }
+  utime_t now = ceph_clock_now();
+  m_perfcounter->inc(l_librbd_rwl_wr_req, 1);
+
+  ceph_assert(m_initialized);
+  {
+    std::shared_lock image_locker(m_image_ctx.image_lock);
+    if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
+      on_finish->complete(-EROFS);
+      return;
+    }
+  }
+
+  if (ExtentsSummary<Extents>(image_extents).total_bytes == 0) {
+    on_finish->complete(0);
+    return;
+  }
+
+  auto *write_req =
+    new C_WriteRequestT(*this, now, std::move(image_extents), std::move(bl), fadvise_flags, on_finish);
+  m_perfcounter->inc(l_librbd_rwl_wr_bytes, write_req->image_extents_summary.total_bytes);
+
+  /* The lambda below will be called when the block guard for all
+   * blocks affected by this write is obtained */
+  GuardedRequestFunctionContext *guarded_ctx =
+    new GuardedRequestFunctionContext([this, write_req](GuardedRequestFunctionContext &guard_ctx) {
+      write_req->blockguard_acquired(guard_ctx);
+      alloc_and_dispatch_io_req(write_req);
+    });
+
+  detain_guarded_request(GuardedRequest(write_req->image_extents_summary.block_extent(),
+                                        guarded_ctx));
 }
 
 template <typename I>
@@ -495,6 +543,11 @@ void ReplicatedWriteLog<I>::aio_compare_and_write(Extents &&image_extents,
                                                   Context *on_finish) {
 }
 
+template <typename I>
+void ReplicatedWriteLog<I>::wake_up() {
+  //TODO: handle the task to flush data from cache device to OSD
+}
+
 template <typename I>
 void ReplicatedWriteLog<I>::flush(Context *on_finish) {
 }
@@ -503,9 +556,730 @@ template <typename I>
 void ReplicatedWriteLog<I>::invalidate(Context *on_finish) {
 }
 
+template <typename I>
+BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req)
+{
+  CephContext *cct = m_image_ctx.cct;
+  BlockGuardCell *cell;
+
+  ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(cct, 20) << dendl;
+  }
+
+  int r = m_write_log_guard.detain(req.block_extent, &req, &cell);
+  ceph_assert(r>=0);
+  if (r > 0) {
+    if (RWL_VERBOSE_LOGGING) {
+      ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
+                     << "req=" << req << dendl;
+    }
+    return nullptr;
+  }
+
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
+  }
+  return cell;
+}
+
+template <typename I>
+BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_barrier_helper(GuardedRequest &req)
+{
+  BlockGuardCell *cell = nullptr;
+
+  ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(m_image_ctx.cct, 20) << dendl;
+  }
+
+  if (m_barrier_in_progress) {
+    req.guard_ctx->m_state.queued = true;
+    m_awaiting_barrier.push_back(req);
+  } else {
+    bool barrier = req.guard_ctx->m_state.barrier;
+    if (barrier) {
+      m_barrier_in_progress = true;
+      req.guard_ctx->m_state.current_barrier = true;
+    }
+    cell = detain_guarded_request_helper(req);
+    if (barrier) {
+      /* Only non-null if the barrier acquires the guard now */
+      m_barrier_cell = cell;
+    }
+  }
+
+  return cell;
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::detain_guarded_request(GuardedRequest &&req)
+{
+  BlockGuardCell *cell = nullptr;
+
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(m_image_ctx.cct, 20) << dendl;
+  }
+  {
+    std::lock_guard locker(m_blockguard_lock);
+    cell = detain_guarded_request_barrier_helper(req);
+  }
+  if (cell) {
+    req.guard_ctx->m_cell = cell;
+    req.guard_ctx->complete(0);
+  }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell)
+{
+  CephContext *cct = m_image_ctx.cct;
+  WriteLogGuard::BlockOperations block_reqs;
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(cct, 20) << "released_cell=" << released_cell << dendl;
+  }
+
+  {
+    std::lock_guard locker(m_blockguard_lock);
+    m_write_log_guard.release(released_cell, &block_reqs);
+
+    for (auto &req : block_reqs) {
+      req.guard_ctx->m_state.detained = true;
+      BlockGuardCell *detained_cell = detain_guarded_request_helper(req);
+      if (detained_cell) {
+        if (req.guard_ctx->m_state.current_barrier) {
+          /* The current barrier is acquiring the block guard, so now we know its cell */
+          m_barrier_cell = detained_cell;
+          /* detained_cell could be == released_cell here */
+          if (RWL_VERBOSE_LOGGING) {
+            ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
+          }
+        }
+        req.guard_ctx->m_cell = detained_cell;
+        m_work_queue.queue(req.guard_ctx);
+      }
+    }
+
+    if (m_barrier_in_progress && (released_cell == m_barrier_cell)) {
+      if (RWL_VERBOSE_LOGGING) {
+        ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
+      }
+      /* The released cell is the current barrier request */
+      m_barrier_in_progress = false;
+      m_barrier_cell = nullptr;
+      /* Move waiting requests into the blockguard. Stop if there's another barrier */
+      while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) {
+        auto &req = m_awaiting_barrier.front();
+        if (RWL_VERBOSE_LOGGING) {
+          ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
+        }
+        BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req);
+        if (detained_cell) {
+          req.guard_ctx->m_cell = detained_cell;
+          m_work_queue.queue(req.guard_ctx);
+        }
+        m_awaiting_barrier.pop_front();
+      }
+    }
+  }
+
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(cct, 20) << "exit" << dendl;
+  }
+}
+
+/*
+ * Performs the log event append operation for all of the scheduled
+ * events.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::append_scheduled_ops(void)
+{
+  GenericLogOperationsT ops;
+  int append_result = 0;
+  bool ops_remain = false;
+  bool appending = false; /* true if we set m_appending */
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(m_image_ctx.cct, 20) << dendl;
+  }
+  do {
+    ops.clear();
+
+    {
+      std::lock_guard locker(m_lock);
+      if (!appending && m_appending) {
+        /* Another thread is appending */
+        if (RWL_VERBOSE_LOGGING) {
+          ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
+        }
+        return;
+      }
+      if (m_ops_to_append.size()) {
+        appending = true;
+        m_appending = true;
+        auto last_in_batch = m_ops_to_append.begin();
+        unsigned int ops_to_append = m_ops_to_append.size();
+        if (ops_to_append > ops_appended_together) {
+          ops_to_append = ops_appended_together;
+        }
+        std::advance(last_in_batch, ops_to_append);
+        ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
+        ops_remain = true; /* Always check again before leaving */
+        if (RWL_VERBOSE_LOGGING) {
+          ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " << m_ops_to_append.size() << " remain" << dendl;
+        }
+      } else {
+        ops_remain = false;
+        if (appending) {
+          appending = false;
+          m_appending = false;
+        }
+      }
+    }
+
+    if (ops.size()) {
+      std::lock_guard locker(m_log_append_lock);
+      alloc_op_log_entries(ops);
+      append_result = append_op_log_entries(ops);
+    }
+
+    int num_ops = ops.size();
+    if (num_ops) {
+      /* New entries may be flushable. Completion will wake up flusher. */
+      complete_op_log_entries(std::move(ops), append_result);
+    }
+  } while (ops_remain);
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::enlist_op_appender()
+{
+  m_async_append_ops++;
+  m_async_op_tracker.start_op();
+  Context *append_ctx = new LambdaContext([this](int r) {
+      append_scheduled_ops();
+      m_async_append_ops--;
+      m_async_op_tracker.finish_op();
+    });
+  m_work_queue.queue(append_ctx);
+}
+
+/*
+ * Takes custody of ops. They'll all get their log entries appended,
+ * and have their on_write_persist contexts completed once they and
+ * all prior log entries are persisted everywhere.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsT &ops)
+{
+  bool need_finisher;
+  GenericLogOperationsVectorT appending;
+
+  std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
+  {
+    std::lock_guard locker(m_lock);
+
+    need_finisher = m_ops_to_append.empty() && !m_appending;
+    m_ops_to_append.splice(m_ops_to_append.end(), ops);
+  }
+
+  if (need_finisher) {
+    enlist_op_appender();
+  }
+
+  for (auto &op : appending) {
+    op->appending();
+  }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_append(GenericLogOperationsVectorT &ops)
+{
+  GenericLogOperationsT to_append(ops.begin(), ops.end());
+
+  schedule_append(to_append);
+}
+
+const unsigned long int ops_flushed_together = 4;
+/*
+ * Performs the pmem buffer flush on all scheduled ops, then schedules
+ * the log event append operation for all of them.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::flush_then_append_scheduled_ops(void)
+{
+  GenericLogOperationsT ops;
+  bool ops_remain = false;
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(m_image_ctx.cct, 20) << dendl;
+  }
+  do {
+    {
+      ops.clear();
+      std::lock_guard locker(m_lock);
+      if (m_ops_to_flush.size()) {
+        auto last_in_batch = m_ops_to_flush.begin();
+        unsigned int ops_to_flush = m_ops_to_flush.size();
+        if (ops_to_flush > ops_flushed_together) {
+          ops_to_flush = ops_flushed_together;
+        }
+        if (RWL_VERBOSE_LOGGING) {
+          ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
+        }
+        std::advance(last_in_batch, ops_to_flush);
+        ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
+        ops_remain = !m_ops_to_flush.empty();
+        if (RWL_VERBOSE_LOGGING) {
+          ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", " << m_ops_to_flush.size() << " remain" << dendl;
+        }
+      } else {
+        ops_remain = false;
+      }
+    }
+    if (ops_remain) {
+      enlist_op_flusher();
+    }
+
+    /* Ops subsequently scheduled for flush may finish before these,
+     * which is fine. We're unconcerned with completion order until we
+     * get to the log message append step. */
+    if (ops.size()) {
+      flush_pmem_buffer(ops);
+      schedule_append(ops);
+    }
+  } while (ops_remain);
+  append_scheduled_ops();
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::enlist_op_flusher()
+{
+  m_async_flush_ops++;
+  m_async_op_tracker.start_op();
+  Context *flush_ctx = new LambdaContext([this](int r) {
+      flush_then_append_scheduled_ops();
+      m_async_flush_ops--;
+      m_async_op_tracker.finish_op();
+    });
+  m_work_queue.queue(flush_ctx);
+}
+
+/*
+ * Takes custody of ops. They'll all get their pmem blocks flushed,
+ * then get their log entries appended.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVectorT &ops)
+{
+  GenericLogOperationsT to_flush(ops.begin(), ops.end());
+  bool need_finisher;
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(m_image_ctx.cct, 20) << dendl;
+  }
+  {
+    std::lock_guard locker(m_lock);
+
+    need_finisher = m_ops_to_flush.empty();
+    m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush);
+  }
+
+  if (need_finisher) {
+    enlist_op_flusher();
+  }
+}
+
+/*
+ * Flush the pmem regions for the data blocks of a set of operations
+ *
+ * V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I>
+ */
+template <typename I>
+template <typename V>
+void ReplicatedWriteLog<I>::flush_pmem_buffer(V& ops)
+{
+  for (auto &operation : ops) {
+    if (operation->is_write() || operation->is_writesame()) {
+      operation->buf_persist_time = ceph_clock_now();
+      auto write_entry = operation->get_write_log_entry();
+
+      pmemobj_flush(m_log_pool, write_entry->pmem_buffer, write_entry->write_bytes());
+    }
+  }
+
+  /* Drain once for all */
+  pmemobj_drain(m_log_pool);
+
+  utime_t now = ceph_clock_now();
+  for (auto &operation : ops) {
+    if (operation->is_write() || operation->is_writesame()) {
+      operation->buf_persist_comp_time = now;
+    } else {
+      if (RWL_VERBOSE_LOGGING) {
+        ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
+      }
+    }
+  }
+}
+
+/*
+ * Allocate the (already reserved) write log entries for a set of operations.
+ *
+ * Locking:
+ * Acquires m_lock
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperationsT &ops)
+{
+  TOID(struct WriteLogPoolRoot) pool_root;
+  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+  struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
+
+  ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock));
+
+  /* Allocate the (already reserved) log entries */
+  std::lock_guard locker(m_lock);
+
+  for (auto &operation : ops) {
+    uint32_t entry_index = m_first_free_entry;
+    m_first_free_entry = (m_first_free_entry + 1) % m_total_log_entries;
+    auto &log_entry = operation->get_log_entry();
+    log_entry->log_entry_index = entry_index;
+    log_entry->ram_entry.entry_index = entry_index;
+    log_entry->pmem_entry = &pmem_log_entries[entry_index];
+    log_entry->ram_entry.entry_valid = 1;
+    m_log_entries.push_back(log_entry);
+    if (RWL_VERBOSE_LOGGING) {
+      ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
+    }
+  }
+}
+
+/*
+ * Flush the persistent write log entries set of ops. The entries must
+ * be contiguous in persistent memory.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVectorT &ops)
+{
+  if (ops.empty()) {
+    return;
+  }
+
+  if (ops.size() > 1) {
+    ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry);
+  }
+
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
+                               << "start address=" << ops.front()->get_log_entry()->pmem_entry << " "
+                               << "bytes=" << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
+                               << dendl;
+  }
+  pmemobj_flush(m_log_pool,
+                ops.front()->get_log_entry()->pmem_entry,
+                ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)));
+}
+
+/*
+ * Write and persist the (already allocated) write log entries and
+ * data buffer allocations for a set of ops. The data buffer for each
+ * of these must already have been persisted to its reserved area.
+ */
+template <typename I>
+int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperationsT &ops)
+{
+  CephContext *cct = m_image_ctx.cct;
+  GenericLogOperationsVectorT entries_to_flush;
+  TOID(struct WriteLogPoolRoot) pool_root;
+  pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
+  int ret = 0;
+
+  ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock));
+
+  if (ops.empty()) {
+    return 0;
+  }
+  entries_to_flush.reserve(ops_appended_together);
+
+  /* Write log entries to ring and persist */
+  utime_t now = ceph_clock_now();
+  for (auto &operation : ops) {
+    if (!entries_to_flush.empty()) {
+      /* Flush these and reset the list if the current entry wraps to the
+       * tail of the ring */
+      if (entries_to_flush.back()->get_log_entry()->log_entry_index >
+          operation->get_log_entry()->log_entry_index) {
+        if (RWL_VERBOSE_LOGGING) {
+          ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
+                                     << "operation=[" << *operation << "]" << dendl;
+        }
+        flush_op_log_entries(entries_to_flush);
+        entries_to_flush.clear();
+        now = ceph_clock_now();
+      }
+    }
+    if (RWL_VERBOSE_LOGGING) {
+      ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
+                                 << operation->get_log_entry()->log_entry_index << " "
+                                 << "from " << &operation->get_log_entry()->ram_entry << " "
+                                 << "to " << operation->get_log_entry()->pmem_entry << " "
+                                 << "operation=[" << *operation << "]" << dendl;
+    }
+    if (RWL_VERBOSE_LOGGING) {
+      ldout(m_image_ctx.cct, 05) << "APPENDING: index="
+                                 << operation->get_log_entry()->log_entry_index << " "
+                                 << "operation=[" << *operation << "]" << dendl;
+    }
+    operation->log_append_time = now;
+    *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry;
+    if (RWL_VERBOSE_LOGGING) {
+      ldout(m_image_ctx.cct, 20) << "APPENDING: index="
+                                 << operation->get_log_entry()->log_entry_index << " "
+                                 << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry << "]" << dendl;
+    }
+    entries_to_flush.push_back(operation);
+  }
+  flush_op_log_entries(entries_to_flush);
+
+  /* Drain once for all */
+  pmemobj_drain(m_log_pool);
+
+  /*
+   * Atomically advance the log head pointer and publish the
+   * allocations for all the data buffers they refer to.
+   */
+  utime_t tx_start = ceph_clock_now();
+  TX_BEGIN(m_log_pool) {
+    D_RW(pool_root)->first_free_entry = m_first_free_entry;
+    for (auto &operation : ops) {
+      if (operation->is_write() || operation->is_writesame()) {
+        auto write_op = (std::shared_ptr<WriteLogOperationT>&) operation;
+        pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
+      } else {
+        if (RWL_VERBOSE_LOGGING) {
+          ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
+        }
+      }
+    }
+  } TX_ONCOMMIT {
+  } TX_ONABORT {
+    lderr(cct) << "failed to commit " << ops.size() << " log entries (" << m_log_pool_name << ")" << dendl;
+    ceph_assert(false);
+    ret = -EIO;
+  } TX_FINALLY {
+  } TX_END;
+
+  utime_t tx_end = ceph_clock_now();
+  m_perfcounter->tinc(l_librbd_rwl_append_tx_t, tx_end - tx_start);
+  m_perfcounter->hinc(l_librbd_rwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
+  for (auto &operation : ops) {
+    operation->log_append_comp_time = tx_end;
+  }
+
+  return ret;
+}
+
+/*
+ * Complete a set of write ops with the result of append_op_entries.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::complete_op_log_entries(GenericLogOperationsT &&ops, const int result)
+{
+  GenericLogEntries dirty_entries;
+  int published_reserves = 0;
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
+  }
+  for (auto &op : ops) {
+    utime_t now = ceph_clock_now();
+    auto log_entry = op->get_log_entry();
+    log_entry->completed = true;
+    if (op->is_writing_op()) {
+      op->get_gen_write_op()->sync_point->log_entry->writes_completed++;
+      dirty_entries.push_back(log_entry);
+    }
+    if (op->is_write() || op->is_writesame()) {
+      published_reserves++;
+    }
+    if (op->is_discard()) {
+      if (RWL_VERBOSE_LOGGING) {
+        ldout(m_image_ctx.cct, 20) << __func__ << ": completing discard" << dendl;
+      }
+    }
+    op->complete(result);
+    if (op->is_write()) {
+      m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_buf_t, op->buf_persist_time - op->dispatch_time);
+    }
+    m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t, op->log_append_time - op->dispatch_time);
+    m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
+    m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist, utime_t(now - op->dispatch_time).to_nsec(),
+                        log_entry->ram_entry.write_bytes);
+    if (op->is_write()) {
+      utime_t buf_lat = op->buf_persist_comp_time - op->buf_persist_time;
+      m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_bufc_t, buf_lat);
+      m_perfcounter->hinc(l_librbd_rwl_log_op_buf_to_bufc_t_hist, buf_lat.to_nsec(),
+                          log_entry->ram_entry.write_bytes);
+      m_perfcounter->tinc(l_librbd_rwl_log_op_buf_to_app_t, op->log_append_time - op->buf_persist_time);
+    }
+    utime_t app_lat = op->log_append_comp_time - op->log_append_time;
+    m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_appc_t, app_lat);
+    m_perfcounter->hinc(l_librbd_rwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
+                        log_entry->ram_entry.write_bytes);
+    m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_cmp_t, now - op->log_append_time);
+  }
+
+  {
+    std::lock_guard locker(m_lock);
+    m_unpublished_reserves -= published_reserves;
+    m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries);
+
+    /* New entries may be flushable */
+    wake_up();
+  }
+}
+
+/**
+ * Dispatch as many deferred writes as possible
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::dispatch_deferred_writes(void)
+{
+  C_BlockIORequestT *front_req = nullptr;     /* req still on front of deferred list */
+  C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */
+  bool allocated = false; /* front_req allocate succeeded */
+  bool cleared_dispatching_flag = false;
+
+  /* If we can't become the dispatcher, we'll exit */
+  {
+    std::lock_guard locker(m_lock);
+    if (m_dispatching_deferred_ops ||
+        !m_deferred_ios.size()) {
+      return;
+    }
+    m_dispatching_deferred_ops = true;
+  }
+
+  /* There are ops to dispatch, and this should be the only thread dispatching them */
+  {
+    std::lock_guard deferred_dispatch(m_deferred_dispatch_lock);
+    do {
+      {
+        std::lock_guard locker(m_lock);
+        ceph_assert(m_dispatching_deferred_ops);
+        if (allocated) {
+          /* On the 2..n-1 th time we get m_lock, front_req->alloc_resources() will
+           * have succeeded, and we'll need to pop it off the deferred ops list
+           * here. */
+          ceph_assert(front_req);
+          ceph_assert(!allocated_req);
+          m_deferred_ios.pop_front();
+          allocated_req = front_req;
+          front_req = nullptr;
+          allocated = false;
+        }
+        ceph_assert(!allocated);
+        if (!allocated && front_req) {
+          /* front_req->alloc_resources() failed on the last iteration. We'll stop dispatching. */
+          front_req = nullptr;
+          ceph_assert(!cleared_dispatching_flag);
+          m_dispatching_deferred_ops = false;
+          cleared_dispatching_flag = true;
+        } else {
+          ceph_assert(!front_req);
+          if (m_deferred_ios.size()) {
+            /* New allocation candidate */
+            front_req = m_deferred_ios.front();
+          } else {
+            ceph_assert(!cleared_dispatching_flag);
+            m_dispatching_deferred_ops = false;
+            cleared_dispatching_flag = true;
+          }
+        }
+      }
+      /* Try allocating for front_req before we decide what to do with allocated_req
+       * (if any) */
+      if (front_req) {
+        ceph_assert(!cleared_dispatching_flag);
+        allocated = front_req->alloc_resources();
+      }
+      if (allocated_req && front_req && allocated) {
+        /* Push dispatch of the first allocated req to a wq */
+        m_work_queue.queue(new LambdaContext(
+          [this, allocated_req](int r) {
+            allocated_req->dispatch();
+          }), 0);
+        allocated_req = nullptr;
+      }
+      ceph_assert(!(allocated_req && front_req && allocated));
+
+      /* Continue while we're still considering the front of the deferred ops list */
+    } while (front_req);
+    ceph_assert(!allocated);
+  }
+  ceph_assert(cleared_dispatching_flag);
+
+  /* If any deferred requests were allocated, the last one will still be in allocated_req */
+  if (allocated_req) {
+    allocated_req->dispatch();
+  }
+}
+
+/**
+ * Returns the lanes used by this write, and attempts to dispatch the next
+ * deferred write
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::release_write_lanes(C_WriteRequestT *write_req)
+{
+  {
+    std::lock_guard locker(m_lock);
+    ceph_assert(write_req->resources.allocated);
+    m_free_lanes += write_req->image_extents.size();
+    write_req->resources.allocated = false;
+  }
+  dispatch_deferred_writes();
+}
+
+/**
+ * Attempts to allocate log resources for a write. Write is dispatched if
+ * resources are available, or queued if they aren't.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
+{
+  bool dispatch_here = false;
+
+  {
+    /* If there are already deferred writes, queue behind them for resources */
+    {
+      std::lock_guard locker(m_lock);
+      dispatch_here = m_deferred_ios.empty();
+    }
+    if (dispatch_here) {
+      dispatch_here = req->alloc_resources();
+    }
+    if (dispatch_here) {
+      if (RWL_VERBOSE_LOGGING) {
+        ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
+      }
+      req->dispatch();
+    } else {
+      req->deferred();
+      {
+        std::lock_guard locker(m_lock);
+        m_deferred_ios.push_back(req);
+      }
+      if (RWL_VERBOSE_LOGGING) {
+        ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
+      }
+      dispatch_deferred_writes();
+    }
+  }
+}
 } // namespace cache
 } // namespace librbd
 
+#ifndef TEST_F
 template class librbd::cache::ReplicatedWriteLog<librbd::ImageCtx>;
 template class librbd::cache::ImageCache<librbd::ImageCtx>;
-
+#endif
index c5f37b76f37eca1adcaebea9dcadb5f82264e92f..adbdddca9bdfd160af4a951f7d1b31d0f1f39fa2 100644 (file)
@@ -12,6 +12,8 @@
 #include "librbd/Utils.h"
 #include "librbd/BlockGuard.h"
 #include "librbd/cache/Types.h"
+#include "librbd/cache/rwl/LogOperation.h"
+#include "librbd/cache/rwl/Request.h"
 #include <functional>
 #include <list>
 
@@ -37,9 +39,23 @@ typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
 
 /**** Write log entries end ****/
 
+typedef librbd::BlockGuard<GuardedRequest> WriteLogGuard;
+
+template <typename T>
+struct C_GuardedBlockIORequest;
 
 class DeferredContexts;
 template <typename> class ImageCacheState;
+
+template <typename T>
+struct C_BlockIORequest;
+
+template <typename T>
+struct C_WriteRequest;
+
+template <typename T>
+using GenericLogOperations = std::list<GenericLogOperationSharedPtr<T>>;
+
 } // namespace rwl
 
 
@@ -78,6 +94,35 @@ public:
   void flush(Context *on_finish) override;
 
 private:
+  using This = ReplicatedWriteLog<ImageCtxT>;
+  using SyncPointT = rwl::SyncPoint<This>;
+  using GenericLogOperationT = rwl::GenericLogOperation<This>;
+  using GenericLogOperationSharedPtrT = rwl::GenericLogOperationSharedPtr<This>;
+  using WriteLogOperationT = rwl::WriteLogOperation<This>;
+  using WriteLogOperationSetT = rwl::WriteLogOperationSet<This>;
+  using SyncPointLogOperationT = rwl::SyncPointLogOperation<This>;
+  using GenericLogOperationsT = rwl::GenericLogOperations<This>;
+  using GenericLogOperationsVectorT = rwl::GenericLogOperationsVector<This>;
+  using C_BlockIORequestT = rwl::C_BlockIORequest<This>;
+  using C_WriteRequestT = rwl::C_WriteRequest<This>;
+
+  friend class rwl::SyncPoint<This>;
+  friend class rwl::GenericLogOperation<This>;
+  friend class rwl::GeneralWriteLogOperation<This>;
+  friend class rwl::WriteLogOperation<This>;
+  friend class rwl::WriteLogOperationSet<This>;
+  friend class rwl::SyncPointLogOperation<This>;
+  friend struct rwl::C_GuardedBlockIORequest<This>;
+  friend struct rwl::C_BlockIORequest<This>;
+  friend struct rwl::C_WriteRequest<This>;
+  typedef std::list<rwl::C_WriteRequest<This> *> C_WriteRequests;
+  typedef std::list<rwl::C_BlockIORequest<This> *> C_BlockIORequests;
+
+  BlockGuardCell* detain_guarded_request_helper(rwl::GuardedRequest &req);
+  BlockGuardCell* detain_guarded_request_barrier_helper(rwl::GuardedRequest &req);
+  void detain_guarded_request(rwl::GuardedRequest &&req);
+  void release_guarded_request(BlockGuardCell *cell);
+
   librbd::cache::rwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
 
   std::atomic<bool> m_initialized = {false};
@@ -99,8 +144,11 @@ private:
   uint64_t m_bytes_dirty = 0;     /* Total bytes yet to flush to RBD */
   uint64_t m_bytes_allocated_cap = 0;
 
-  ImageWriteback<ImageCtxT> m_image_writeback;
+  utime_t m_last_alloc_fail;      /* Entry or buffer allocation fail seen */
+  std::atomic<bool> m_alloc_failed_since_retire = {false};
 
+  ImageWriteback<ImageCtxT> m_image_writeback;
+  rwl::WriteLogGuard m_write_log_guard;
   /*
    * When m_first_free_entry == m_first_valid_entry, the log is
    * empty. There is always at least one free entry, which can't be
@@ -111,23 +159,62 @@ private:
 
   /* Starts at 0 for a new write log. Incremented on every flush. */
   uint64_t m_current_sync_gen = 0;
+  std::shared_ptr<SyncPointT> m_current_sync_point = nullptr;
   /* Starts at 0 on each sync gen increase. Incremented before applied
      to an operation */
   uint64_t m_last_op_sequence_num = 0;
   /* All writes bearing this and all prior sync gen numbers are flushed */
   uint64_t m_flushed_sync_gen = 0;
 
+  bool m_persist_on_write_until_flush = true;
+  /* True if it's safe to complete a user request in persist-on-flush
+   * mode before the write is persisted. This is only true if there is
+   * a local copy of the write data, or if local write failure always
+   * causes local node failure. */
+  bool m_persist_on_flush_early_user_comp = false; /* Assume local write failure does not cause node failure */
+  bool m_persist_on_flush = false; /* If false, persist each write before completion */
+  bool m_flush_seen = false;
+
+  AsyncOpTracker m_async_op_tracker;
+  /* Debug counters for the places m_async_op_tracker is used */
+  std::atomic<int> m_async_flush_ops = {0};
+  std::atomic<int> m_async_append_ops = {0};
+  std::atomic<int> m_async_complete_ops = {0};
+
   /* Acquire locks in order declared here */
 
+  /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
+  mutable ceph::mutex m_deferred_dispatch_lock;
+  /* Hold m_log_append_lock while appending or retiring log entries. */
+  mutable ceph::mutex m_log_append_lock;
+
   /* Used for most synchronization */
   mutable ceph::mutex m_lock;
+  /* Used in release/detain to make BlockGuard preserve submission order */
+  mutable ceph::mutex m_blockguard_lock;
+  /* Used in WriteLogEntry::get_pmem_bl() to syncronize between threads making entries readable */
+  mutable ceph::mutex m_entry_bl_lock;
+
+  /* Use m_blockguard_lock for the following 3 things */
+  rwl::WriteLogGuard::BlockOperations m_awaiting_barrier;
+  bool m_barrier_in_progress = false;
+  BlockGuardCell *m_barrier_cell = nullptr;
+
+  bool m_appending = false;
+  bool m_dispatching_deferred_ops = false;
 
-  librbd::cache::Contexts m_flush_complete_contexts;
+  GenericLogOperationsT m_ops_to_flush; /* Write ops needing flush in local log */
+  GenericLogOperationsT m_ops_to_append; /* Write ops needing event append in local log */
 
   /* New entries are at the back. Oldest at the front */
   rwl::GenericLogEntries m_log_entries;
   rwl::GenericLogEntries m_dirty_log_entries;
 
+  /* Writes that have left the block guard, but are waiting for resources */
+  C_BlockIORequests m_deferred_ios;
+  /* Throttle writes concurrently allocating & replicating */
+  unsigned int m_free_lanes = MAX_CONCURRENT_WRITES;
+  unsigned int m_unpublished_reserves = 0;
   PerfCounters *m_perfcounter = nullptr;
 
   /* Initialized from config, then set false during shutdown */
@@ -147,6 +234,26 @@ private:
 
   void rwl_init(Context *on_finish, rwl::DeferredContexts &later);
   void update_image_cache_state(Context *on_finish);
+  void start_workers();
+  void wake_up();
+
+  void dispatch_deferred_writes(void);
+  void release_write_lanes(C_WriteRequestT *write_req);
+  void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
+  void append_scheduled_ops(void);
+  void enlist_op_appender();
+  void schedule_append(GenericLogOperationsVectorT &ops);
+  void schedule_append(GenericLogOperationsT &ops);
+  void flush_then_append_scheduled_ops(void);
+  void enlist_op_flusher();
+  void schedule_flush_and_append(GenericLogOperationsVectorT &ops);
+  template <typename V>
+  void flush_pmem_buffer(V& ops);
+  void alloc_op_log_entries(GenericLogOperationsT &ops);
+  void flush_op_log_entries(GenericLogOperationsVectorT &ops);
+  int append_op_log_entries(GenericLogOperationsT &ops);
+  void complete_op_log_entries(GenericLogOperationsT &&ops, const int r);
+  void schedule_complete_op_log_entries(GenericLogOperationsT &&ops, const int r);
 };
 
 } // namespace cache
index 5fb67ad7847263bd5ebd8e5a9cb57f4b5c264640..50bf5a9a1c025f6703d01479f34a01c031e9debb 100644 (file)
@@ -62,7 +62,7 @@ std::ostream& SyncPointLogEntry::format(std::ostream &os) const {
 };
 
 std::ostream &operator<<(std::ostream &os,
-                                const SyncPointLogEntry &entry) {
+                         const SyncPointLogEntry &entry) {
   return entry.format(os);
 }
 
@@ -83,7 +83,7 @@ std::ostream& GeneralWriteLogEntry::format(std::ostream &os) const {
 };
 
 std::ostream &operator<<(std::ostream &os,
-                                const GeneralWriteLogEntry &entry) {
+                         const GeneralWriteLogEntry &entry) {
   return entry.format(os);
 }
 
@@ -143,7 +143,7 @@ std::ostream& WriteLogEntry::format(std::ostream &os) const {
 };
 
 std::ostream &operator<<(std::ostream &os,
-                                const WriteLogEntry &entry) {
+                         const WriteLogEntry &entry) {
   return entry.format(os);
 }
 
index e1a8744b908d776b396a1851fe415440d5744b31..b6d73b3f535ca0fe6ec4ec255b123c7915f5fcfb 100644 (file)
@@ -4,6 +4,7 @@
 #ifndef CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H
 #define CEPH_LIBRBD_CACHE_RWL_LOG_ENTRY_H
 
+#include "librbd/Utils.h"
 #include "librbd/cache/rwl/Types.h"
 #include <atomic>
 #include <memory>
@@ -35,9 +36,15 @@ public:
   bool is_write();
   bool is_writer();
   virtual const GenericLogEntry* get_log_entry() = 0;
-  virtual const SyncPointLogEntry* get_sync_point_log_entry() { return nullptr;}
-  virtual const GeneralWriteLogEntry* get_gen_write_log_entry() { return nullptr; }
-  virtual const WriteLogEntry* get_write_log_entry() { return nullptr; }
+  virtual const SyncPointLogEntry* get_sync_point_log_entry() {
+    return nullptr;
+  }
+  virtual const GeneralWriteLogEntry* get_gen_write_log_entry() {
+    return nullptr;
+  }
+  virtual const WriteLogEntry* get_write_log_entry() {
+    return nullptr;
+  }
   virtual std::ostream& format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const GenericLogEntry &entry);
@@ -62,9 +69,15 @@ public:
   };
   SyncPointLogEntry(const SyncPointLogEntry&) = delete;
   SyncPointLogEntry &operator=(const SyncPointLogEntry&) = delete;
-  virtual inline unsigned int write_bytes() { return 0; }
-  const GenericLogEntry* get_log_entry() override { return get_sync_point_log_entry(); }
-  const SyncPointLogEntry* get_sync_point_log_entry() override { return this; }
+  virtual inline unsigned int write_bytes() {
+    return 0;
+  }
+  const GenericLogEntry* get_log_entry() override {
+    return get_sync_point_log_entry();
+  }
+  const SyncPointLogEntry* get_sync_point_log_entry() override {
+    return this;
+  }
   std::ostream& format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const SyncPointLogEntry &entry);
@@ -91,10 +104,18 @@ public:
     /* The bytes in the image this op makes dirty. Discard and WS override. */
     return write_bytes();
   };
-  const BlockExtent block_extent() { return ram_entry.block_extent(); }
-  const GenericLogEntry* get_log_entry() override { return get_gen_write_log_entry(); }
-  const GeneralWriteLogEntry* get_gen_write_log_entry() override { return this; }
-  uint32_t get_map_ref() { return(referring_map_entries); }
+  const BlockExtent block_extent() {
+    return ram_entry.block_extent();
+  }
+  const GenericLogEntry* get_log_entry() override {
+    return get_gen_write_log_entry();
+  }
+  const GeneralWriteLogEntry* get_gen_write_log_entry() override {
+    return this;
+  }
+  uint32_t get_map_ref() {
+    return(referring_map_entries);
+  }
   void inc_map_ref() { referring_map_entries++; }
   void dec_map_ref() { referring_map_entries--; }
   std::ostream &format(std::ostream &os) const;
@@ -132,8 +153,12 @@ public:
   buffer::list &get_pmem_bl(ceph::mutex &entry_bl_lock);
   /* Constructs a new bl containing copies of pmem_bp */
   void copy_pmem_bl(ceph::mutex &entry_bl_lock, bufferlist *out_bl);
-  virtual const GenericLogEntry* get_log_entry() override { return get_write_log_entry(); }
-  const WriteLogEntry* get_write_log_entry() override { return this; }
+  virtual const GenericLogEntry* get_log_entry() override {
+    return get_write_log_entry();
+  }
+  const WriteLogEntry* get_write_log_entry() override {
+    return this;
+  }
   std::ostream &format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const WriteLogEntry &entry);
index 11ed3e4b2a06cffeb2aed10d772180059137c02f..a7cb581b536f43b5aa3ad77badebe87ffa0e86db 100644 (file)
@@ -8,7 +8,7 @@
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::cache::rwl::LogOperation: " << this << " " \
-                          <<  __func__ << ": "
+                           <<  __func__ << ": "
 
 namespace librbd {
 
@@ -33,7 +33,7 @@ std::ostream& GenericLogOperation<T>::format(std::ostream &os) const {
 
 template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                                const GenericLogOperation<T> &op) {
+                         const GenericLogOperation<T> &op) {
   return op.format(os);
 }
 
@@ -58,7 +58,7 @@ std::ostream &SyncPointLogOperation<T>::format(std::ostream &os) const {
 
 template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                                const SyncPointLogOperation<T> &op) {
+                         const SyncPointLogOperation<T> &op) {
   return op.format(os);
 }
 
@@ -122,7 +122,8 @@ GeneralWriteLogOperation<T>::GeneralWriteLogOperation(T &rwl,
                                                       std::shared_ptr<SyncPoint<T>> sync_point,
                                                       const utime_t dispatch_time)
   : GenericLogOperation<T>(rwl, dispatch_time),
-  m_lock("librbd::cache::rwl::GeneralWriteLogOperation::m_lock"), sync_point(sync_point) {
+  m_lock(ceph::make_mutex(util::unique_lock_name(
+    "librbd::cache::rwl::GenericWriteLogOperation::m_lock", this))), sync_point(sync_point) {
 }
 
 template <typename T>
@@ -136,7 +137,7 @@ std::ostream &GeneralWriteLogOperation<T>::format(std::ostream &os) const {
 
 template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                                const GeneralWriteLogOperation<T> &op) {
+                         const GeneralWriteLogOperation<T> &op) {
   return op.format(os);
 }
 
@@ -212,7 +213,7 @@ std::ostream &WriteLogOperation<T>::format(std::ostream &os) const {
 
 template <typename T>
 std::ostream &operator<<(std::ostream &os,
-                                const WriteLogOperation<T> &op) {
+                         const WriteLogOperation<T> &op) {
   return op.format(os);
 }
 
@@ -220,9 +221,9 @@ std::ostream &operator<<(std::ostream &os,
 template <typename T>
 WriteLogOperationSet<T>::WriteLogOperationSet(T &rwl, utime_t dispatched, std::shared_ptr<SyncPoint<T>> sync_point,
                                               bool persist_on_flush, Context *on_finish)
-  : rwl(rwl), m_on_finish(on_finish),
+  : m_on_finish(on_finish), rwl(rwl),
     persist_on_flush(persist_on_flush), dispatch_time(dispatched), sync_point(sync_point) {
-  on_ops_appending = sync_point->m_prior_log_entries_persisted->new_sub();
+  on_ops_appending = sync_point->prior_log_entries_persisted->new_sub();
   on_ops_persist = nullptr;
   extent_ops_persist =
     new C_Gather(rwl.m_image_ctx.cct,
index 96aaa692872449a60fab266b1ad5e6afbec947df..dc0dd547356fb1599c1bfbdb1d862b18b576e671 100644 (file)
@@ -52,18 +52,38 @@ public:
   friend std::ostream &operator<<(std::ostream &os,
                                   const GenericLogOperation<U> &op);
   virtual const std::shared_ptr<GenericLogEntry> get_log_entry() = 0;
-  virtual const std::shared_ptr<SyncPointLogEntry> get_sync_point_log_entry() { return nullptr; }
-  virtual const std::shared_ptr<GeneralWriteLogEntry> get_gen_write_log_entry() { return nullptr; }
-  virtual const std::shared_ptr<WriteLogEntry> get_write_log_entry() { return nullptr; }
+  virtual const std::shared_ptr<SyncPointLogEntry> get_sync_point_log_entry() {
+    return nullptr;
+  }
+  virtual const std::shared_ptr<GeneralWriteLogEntry> get_gen_write_log_entry() {
+    return nullptr;
+  }
+  virtual const std::shared_ptr<WriteLogEntry> get_write_log_entry() {
+    return nullptr;
+  }
   virtual void appending() = 0;
   virtual void complete(int r) = 0;
-  virtual bool is_write() { return false; }
-  virtual bool is_sync_point() { return false; }
-  virtual bool is_discard() { return false; }
-  virtual bool is_writesame() { return false; }
-  virtual bool is_writing_op() { return false; }
-  virtual GeneralWriteLogOperation<T> *get_gen_write_op() { return nullptr; };
-  virtual WriteLogOperation<T> *get_write_op() { return nullptr; };
+  virtual bool is_write() {
+    return false;
+  }
+  virtual bool is_sync_point() {
+    return false;
+  }
+  virtual bool is_discard() {
+    return false;
+  }
+  virtual bool is_writesame() {
+    return false;
+  }
+  virtual bool is_writing_op() {
+    return false;
+  }
+  virtual GeneralWriteLogOperation<T> *get_gen_write_op() {
+    return nullptr;
+  };
+  virtual WriteLogOperation<T> *get_write_op() {
+    return nullptr;
+  };
 };
 
 template <typename T>
@@ -81,9 +101,15 @@ public:
   template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
                                   const SyncPointLogOperation<U> &op);
-  const std::shared_ptr<GenericLogEntry> get_log_entry() { return get_sync_point_log_entry(); }
-  const std::shared_ptr<SyncPointLogEntry> get_sync_point_log_entry() { return sync_point->log_entry; }
-  bool is_sync_point() { return true; }
+  const std::shared_ptr<GenericLogEntry> get_log_entry() {
+    return get_sync_point_log_entry();
+  }
+  const std::shared_ptr<SyncPointLogEntry> get_sync_point_log_entry() {
+    return sync_point->log_entry;
+  }
+  bool is_sync_point() {
+    return true;
+  }
   void appending();
   void complete(int r);
 };
@@ -110,8 +136,12 @@ public:
   template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
                                   const GeneralWriteLogOperation<U> &op);
-  GeneralWriteLogOperation<T> *get_gen_write_op() { return this; };
-  bool is_writing_op() { return true; }
+  GeneralWriteLogOperation<T> *get_gen_write_op() {
+    return this;
+  }
+  bool is_writing_op() {
+    return true;
+  }
   void appending();
   void complete(int r);
 };
@@ -135,10 +165,18 @@ public:
   template <typename U>
   friend std::ostream &operator<<(std::ostream &os,
                                   const WriteLogOperation<T> &op);
-  const std::shared_ptr<GenericLogEntry> get_log_entry() { return get_write_log_entry(); }
-  const std::shared_ptr<WriteLogEntry> get_write_log_entry() { return log_entry; }
-  WriteLogOperation<T> *get_write_op() override { return this; }
-  bool is_write() { return true; }
+  const std::shared_ptr<GenericLogEntry> get_log_entry() {
+    return get_write_log_entry();
+  }
+  const std::shared_ptr<WriteLogEntry> get_write_log_entry() {
+    return log_entry;
+  }
+  WriteLogOperation<T> *get_write_op() override {
+    return this;
+  }
+  bool is_write() {
+    return true;
+  }
 };
 
 
index b02f6d9f321ab46232b102b1d25ba6e9f679c06f..c16f3363113a5cf57c1d8386faf97cde770e2838 100644 (file)
@@ -176,6 +176,18 @@ std::ostream &operator<<(std::ostream &os,
   return os;
 };
 
+template <typename T>
+void C_WriteRequest<T>::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.m_cell << dendl;
+  }
+
+  ceph_assert(guard_ctx.m_cell);
+  this->detained = guard_ctx.m_state.detained; /* overlapped */
+  this->m_queued = guard_ctx.m_state.queued; /* queued behind at least one barrier */
+  this->set_cell(guard_ctx.m_cell);
+}
+
 template <typename T>
 void C_WriteRequest<T>::finish_req(int r) {
   if (RWL_VERBOSE_LOGGING) {
@@ -216,7 +228,17 @@ void C_WriteRequest<T>::setup_log_operations() {
 
 template <typename T>
 void C_WriteRequest<T>::schedule_append() {
-  // TODO: call rwl to complete it
+  ceph_assert(++m_appended == 1);
+  if (m_do_early_flush) {
+    /* This caller is waiting for persist, so we'll use their thread to
+     * expedite it */
+    rwl.flush_pmem_buffer(this->m_op_set->operations);
+    rwl.schedule_append(this->m_op_set->operations);
+  } else {
+    /* This is probably not still the caller's thread, so do the payload
+     * flushing/replicating later. */
+    rwl.schedule_flush_and_append(this->m_op_set->operations);
+  }
 }
 
 /**
@@ -237,7 +259,6 @@ bool C_WriteRequest<T>::alloc_resources()
   uint64_t bytes_cached = 0;
   uint64_t bytes_dirtied = 0;
 
-  ceph_assert(!rwl.m_lock.is_locked_by_me());
   ceph_assert(!resources.allocated);
   resources.buffers.reserve(this->image_extents.size());
   {
@@ -503,6 +524,33 @@ void C_WriteRequest<T>::dispatch()
   }
 }
 
+std::ostream &operator<<(std::ostream &os,
+                         const BlockGuardReqState &r) {
+  os << "barrier=" << r.barrier << ", "
+     << "current_barrier=" << r.current_barrier << ", "
+     << "detained=" << r.detained << ", "
+     << "queued=" << r.queued;
+  return os;
+};
+
+GuardedRequestFunctionContext::GuardedRequestFunctionContext(boost::function<void(GuardedRequestFunctionContext&)> &&callback)
+  : m_callback(std::move(callback)){ }
+
+GuardedRequestFunctionContext::~GuardedRequestFunctionContext(void) { }
+
+void GuardedRequestFunctionContext::finish(int r) {
+  ceph_assert(m_cell);
+  m_callback(*this);
+}
+
+std::ostream &operator<<(std::ostream &os,
+                         const GuardedRequest &r) {
+  os << "guard_ctx->m_state=[" << r.guard_ctx->m_state << "], "
+     << "block_extent.block_start=" << r.block_extent.block_start << ", "
+     << "block_extent.block_start=" << r.block_extent.block_end;
+  return os;
+};
+
 } // namespace rwl 
 } // namespace cache 
 } // namespace librbd 
index 9556c4f4f57ec79a2e1796f616a671c4bcedf9ba..7af0c196e889d063e2606c48469db042d4673d0c 100644 (file)
@@ -15,6 +15,8 @@ class BlockGuardCell;
 namespace cache {
 namespace rwl {
 
+class GuardedRequestFunctionContext;
+
 /**
  * A request that can be deferred in a BlockGuard to sequence
  * overlapping operations.
@@ -117,6 +119,8 @@ public:
 
   ~C_WriteRequest();
 
+  void blockguard_acquired(GuardedRequestFunctionContext &guard_ctx);
+
   /* Common finish to plain write and compare-and-write (if it writes) */
   virtual void finish_req(int r);
 
@@ -152,6 +156,43 @@ private:
                                   const C_WriteRequest<U> &req);
 };
 
+struct BlockGuardReqState {
+  bool barrier = false; /* This is a barrier request */
+  bool current_barrier = false; /* This is the currently active barrier */
+  bool detained = false;
+  bool queued = false; /* Queued for barrier */
+  friend std::ostream &operator<<(std::ostream &os,
+                                  const BlockGuardReqState &r);
+};
+
+class GuardedRequestFunctionContext : public Context {
+public:
+  BlockGuardCell *m_cell = nullptr;
+  BlockGuardReqState m_state;
+  GuardedRequestFunctionContext(boost::function<void(GuardedRequestFunctionContext&)> &&callback);
+  ~GuardedRequestFunctionContext(void);
+  GuardedRequestFunctionContext(const GuardedRequestFunctionContext&) = delete;
+  GuardedRequestFunctionContext &operator=(const GuardedRequestFunctionContext&) = delete;
+
+private:
+  boost::function<void(GuardedRequestFunctionContext&)> m_callback;
+  void finish(int r) override;
+};
+
+class GuardedRequest {
+public:
+  const BlockExtent block_extent;
+  GuardedRequestFunctionContext *guard_ctx; /* Work to do when guard on range obtained */
+
+  GuardedRequest(const BlockExtent block_extent,
+                 GuardedRequestFunctionContext *on_guard_acquire, bool barrier = false)
+    : block_extent(block_extent), guard_ctx(on_guard_acquire) {
+    guard_ctx->m_state.barrier = barrier;
+  }
+  friend std::ostream &operator<<(std::ostream &os,
+                                  const GuardedRequest &r);
+};
+
 } // namespace rwl 
 } // namespace cache 
 } // namespace librbd 
index f486b934d0d5158da0f986d43b1696b4cf21c0d5..c9c4582d8b28fd82d3b26bd4d590f630ae3dc6d1 100644 (file)
@@ -6,7 +6,7 @@
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
 #define dout_prefix *_dout << "librbd::cache::rwl::SyncPoint: " << this << " " \
-                          <<  __func__ << ": "
+                           <<  __func__ << ": "
 
 namespace librbd {
 namespace cache {
index 768b85b3fa30b8149929cd0c21e10eca96834865..3a6cfa6a5ead255a2da17a3e405a86d63c3dbb75 100644 (file)
@@ -22,7 +22,7 @@ DeferredContexts::~DeferredContexts() {
 }
 
 void DeferredContexts::add(Context* ctx) {
-    contexts.push_back(ctx);
+  contexts.push_back(ctx);
 }
 
 /*
index ecf655453566c02fe85cbc24fce289c1562af778..ac76aaf1f8567668ff11b381380b3dfc673fa710 100644 (file)
@@ -141,6 +141,9 @@ const uint32_t MIN_WRITE_ALLOC_SIZE = 512;
 const uint32_t LOG_STATS_INTERVAL_SECONDS = 5;
 
 /**** Write log entries ****/
+const unsigned long int MAX_ALLOC_PER_TRANSACTION = 8;
+const unsigned long int MAX_FREE_PER_TRANSACTION = 1;
+const unsigned int MAX_CONCURRENT_WRITES = 256;
 
 const uint64_t DEFAULT_POOL_SIZE = 1u<<30;
 const uint64_t MIN_POOL_SIZE = DEFAULT_POOL_SIZE;
@@ -246,10 +249,9 @@ public:
   const BlockExtent block_extent() {
     return BlockExtent(first_image_byte, last_image_byte);
   }
-  const io::Extent image_extent(const BlockExtent& block_extent)
-  {
+  const io::Extent image_extent(const BlockExtent& block_extent) {
     return io::Extent(block_extent.block_start,
-                  block_extent.block_end - block_extent.block_start + 1);
+                      block_extent.block_end - block_extent.block_start + 1);
   }
   const io::Extent image_extent() {
     return image_extent(block_extent());
index 43d4a820c63b0f4505ad5d8a195b1729ee0ea580..8cf86556c0bb7ce85740282f4686292de7a69df3 100644 (file)
@@ -41,6 +41,10 @@ inline ImageCtx *get_image_ctx(MockImageCtx *image_ctx) {
 // template definitions
 #include "librbd/cache/ImageWriteback.cc"
 #include "librbd/cache/rwl/ImageCacheState.cc"
+#include "librbd/cache/rwl/SyncPoint.cc"
+#include "librbd/cache/rwl/Request.cc"
+#include "librbd/cache/rwl/Types.cc"
+#include "librbd/cache/rwl/LogOperation.cc"
 
 template class librbd::cache::ImageWriteback<librbd::MockImageCtx>;
 template class librbd::cache::rwl::ImageCacheState<librbd::MockImageCtx>;
@@ -65,8 +69,8 @@ struct TestMockCacheReplicatedWriteLog : public TestMockFixture {
   void validate_cache_state(librbd::ImageCtx *image_ctx,
                             MockImageCacheStateRWL &state,
                             bool present, bool empty, bool clean,
-                           string host="", string path="",
-                           uint64_t size=0) {
+                            string host="", string path="",
+                            uint64_t size=0) {
     ConfigProxy &config = image_ctx->config;
     ASSERT_EQ(present, state.present);
     ASSERT_EQ(empty, state.empty);
@@ -96,8 +100,8 @@ struct TestMockCacheReplicatedWriteLog : public TestMockFixture {
   void expect_context_complete(MockContextRWL& mock_context, int r) {
     EXPECT_CALL(mock_context, complete(r))
       .WillRepeatedly(Invoke([&mock_context](int r) {
-                  mock_context.do_complete(r);
-                }));
+                       mock_context.do_complete(r);
+                     }));
   }
 
   void expect_metadata_set(MockImageCtx& mock_image_ctx) {