]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd: add WriteRequest to handle write
authorYuan Lu <yuan.y.lu@intel.com>
Wed, 20 Nov 2019 14:21:55 +0000 (22:21 +0800)
committerYuan Lu <yuan.y.lu@intel.com>
Thu, 20 Feb 2020 13:18:10 +0000 (21:18 +0800)
This class is used to handle write operations.

Signed-off-by: Peterson, Scott <scott.d.peterson@intel.com>
Signed-off-by: Li, Xiaoyan <xiaoyan.li@intel.com>
Signed-off-by: Lu, Yuan <yuan.y.lu@intel.com>
Signed-off-by: Chamarthy, Mahati <mahati.chamarthy@intel.com>
12 files changed:
src/librbd/CMakeLists.txt
src/librbd/cache/ImageCache.h
src/librbd/cache/ReplicatedWriteLog.cc
src/librbd/cache/ReplicatedWriteLog.h
src/librbd/cache/rwl/LogEntry.cc
src/librbd/cache/rwl/LogOperation.cc
src/librbd/cache/rwl/Request.cc [new file with mode: 0644]
src/librbd/cache/rwl/Request.h [new file with mode: 0644]
src/librbd/cache/rwl/SyncPoint.cc
src/librbd/cache/rwl/Types.cc
src/librbd/cache/rwl/Types.h
src/librbd/io/Types.h

index 9dfb760d6d8d316805fa63409d60489df638a0ca..0645d781ee9a3084d30e5cbb8e890d6e0f29b3e6 100644 (file)
@@ -166,6 +166,7 @@ if(WITH_RBD_RWL)
     cache/rwl/ImageCacheState.cc
     cache/rwl/LogEntry.cc
     cache/rwl/LogOperation.cc
+    cache/rwl/Request.cc
     cache/rwl/SyncPoint.cc
     cache/rwl/Types.cc
     cache/ReplicatedWriteLog.cc)
index bd3adcf427243a8f6e833867d6125f499cf02584..01ca6d188bf57a09e82d785b6a922fc9c3b2ae8b 100644 (file)
@@ -23,6 +23,7 @@ struct ImageCache {
 protected:
   ImageCache() {}
 public:
+  typedef io::Extent Extent;
   typedef io::Extents Extents;
 
   virtual ~ImageCache() {}
index 9f1e13040b07a0593025ac36aada86670327ad29..5415fa569d049158d3f1019e31582ff6466be6c9 100644 (file)
 #define dout_prefix *_dout << "librbd::cache::ReplicatedWriteLog: " << this << " " \
                            <<  __func__ << ": "
 
-const uint32_t MIN_WRITE_ALLOC_SIZE = 512;
-const uint32_t LOG_STATS_INTERVAL_SECONDS = 5;
-
-/**** Write log entries ****/
-const uint64_t DEFAULT_POOL_SIZE = 1u<<30;
-const uint64_t MIN_POOL_SIZE = DEFAULT_POOL_SIZE;
-constexpr double USABLE_SIZE = (7.0 / 10);
-const uint64_t BLOCK_ALLOC_OVERHEAD_BYTES = 16;
-const uint8_t RWL_POOL_VERSION = 1;
-const uint64_t MAX_LOG_ENTRIES = (1024 * 1024);
-
 namespace librbd {
 namespace cache {
 
 using namespace librbd::cache::rwl;
 
+typedef ReplicatedWriteLog<ImageCtx>::Extent Extent;
 typedef ReplicatedWriteLog<ImageCtx>::Extents Extents;
 
 template <typename I>
index b81b90bd6eb64a100bb36582afda2b7a83a7e276..c5f37b76f37eca1adcaebea9dcadb5f82264e92f 100644 (file)
@@ -46,6 +46,7 @@ template <typename> class ImageCacheState;
 template <typename ImageCtxT>
 class ReplicatedWriteLog : public ImageCache<ImageCtxT> {
 public:
+  using typename ImageCache<ImageCtxT>::Extent;
   using typename ImageCache<ImageCtxT>::Extents;
 
   ReplicatedWriteLog(ImageCtxT &image_ctx, librbd::cache::rwl::ImageCacheState<ImageCtxT>* cache_state);
index 3accf27c7664ffcf0b3a1cfcf2c769de77747ef9..5fb67ad7847263bd5ebd8e5a9cb57f4b5c264640 100644 (file)
@@ -117,7 +117,7 @@ buffer::list& WriteLogEntry::get_pmem_bl(ceph::mutex &entry_bl_lock) {
     if (0 == bl_refs) {
       init_pmem_bl();
     }
-    assert(0 != bl_refs);
+    ceph_assert(0 != bl_refs);
   }
   return pmem_bl;
 };
index c31cd909e2b6177ceeebb392d8d2dc2694b8c0b9..11ed3e4b2a06cffeb2aed10d772180059137c02f 100644 (file)
@@ -66,7 +66,7 @@ template <typename T>
 void SyncPointLogOperation<T>::appending() {
   std::vector<Context*> appending_contexts;
 
-  assert(sync_point);
+  ceph_assert(sync_point);
   {
     std::lock_guard locker(rwl.m_lock);
     if (!sync_point->m_appending) {
@@ -85,7 +85,7 @@ template <typename T>
 void SyncPointLogOperation<T>::complete(int result) {
   std::vector<Context*> persisted_contexts;
 
-  assert(sync_point);
+  ceph_assert(sync_point);
   if (RWL_VERBOSE_LOGGING) {
     ldout(rwl.m_image_ctx.cct, 20) << "Sync point op =[" << *this
                                    << "] completed" << dendl;
@@ -93,8 +93,8 @@ void SyncPointLogOperation<T>::complete(int result) {
   {
     std::lock_guard locker(rwl.m_lock);
     /* Remove link from next sync point */
-    assert(sync_point->later_sync_point);
-    assert(sync_point->later_sync_point->earlier_sync_point ==
+    ceph_assert(sync_point->later_sync_point);
+    ceph_assert(sync_point->later_sync_point->earlier_sync_point ==
            sync_point);
     sync_point->later_sync_point->earlier_sync_point = nullptr;
   }
diff --git a/src/librbd/cache/rwl/Request.cc b/src/librbd/cache/rwl/Request.cc
new file mode 100644 (file)
index 0000000..b02f6d9
--- /dev/null
@@ -0,0 +1,508 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Request.h"
+#include "librbd/BlockGuard.h"
+#include "librbd/cache/rwl/LogEntry.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::cache::rwl::Request: " << this << " " \
+                           <<  __func__ << ": "
+
+namespace librbd {
+namespace cache {
+namespace rwl {
+
+typedef std::list<std::shared_ptr<GeneralWriteLogEntry>> GeneralWriteLogEntries;
+
+template <typename T>
+C_GuardedBlockIORequest<T>::C_GuardedBlockIORequest(T &rwl)
+  : rwl(rwl) {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
+  }
+}
+
+template <typename T>
+C_GuardedBlockIORequest<T>::~C_GuardedBlockIORequest() {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
+  }
+  ceph_assert(m_cell_released || !m_cell);
+}
+
+template <typename T>
+void C_GuardedBlockIORequest<T>::set_cell(BlockGuardCell *cell) {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << cell << dendl;
+  }
+  ceph_assert(cell);
+  ceph_assert(!m_cell);
+  m_cell = cell;
+}
+
+template <typename T>
+BlockGuardCell *C_GuardedBlockIORequest<T>::get_cell(void) {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << m_cell << dendl;
+  }
+  return m_cell;
+}
+
+template <typename T>
+void C_GuardedBlockIORequest<T>::release_cell() {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 20) << this << " cell=" << m_cell << dendl;
+  }
+  ceph_assert(m_cell);
+  bool initial = false;
+  if (m_cell_released.compare_exchange_strong(initial, true)) {
+    rwl.release_guarded_request(m_cell);
+  } else {
+    ldout(rwl.m_image_ctx.cct, 5) << "cell " << m_cell << " already released for " << this << dendl;
+  }
+}
+
+template <typename T>
+C_BlockIORequest<T>::C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents,
+                 bufferlist&& bl, const int fadvise_flags, Context *user_req)
+  : C_GuardedBlockIORequest<T>(rwl), image_extents(std::move(extents)),
+    bl(std::move(bl)), fadvise_flags(fadvise_flags),
+    user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
+  }
+  /* Remove zero length image extents from input */
+  for (auto it = image_extents.begin(); it != image_extents.end(); ) {
+    if (0 == it->second) {
+      it = image_extents.erase(it);
+      continue;
+    }
+    ++it;
+  }
+}
+
+template <typename T>
+C_BlockIORequest<T>::~C_BlockIORequest() {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
+  }
+}
+
+template <typename T>
+std::ostream &operator<<(std::ostream &os,
+                                 const C_BlockIORequest<T> &req) {
+   os << "image_extents=[" << req.image_extents << "], "
+      << "image_extents_summary=[" << req.image_extents_summary << "], "
+      << "bl=" << req.bl << ", "
+      << "user_req=" << req.user_req << ", "
+      << "m_user_req_completed=" << req.m_user_req_completed << ", "
+      << "m_deferred=" << req.m_deferred << ", "
+      << "detained=" << req.detained << ", "
+      << "m_waited_lanes=" << req.m_waited_lanes << ", "
+      << "m_waited_entries=" << req.m_waited_entries << ", "
+      << "m_waited_buffers=" << req.m_waited_buffers << "";
+   return os;
+ }
+
+template <typename T>
+void C_BlockIORequest<T>::complete_user_request(int r) {
+  bool initial = false;
+  if (m_user_req_completed.compare_exchange_strong(initial, true)) {
+    if (RWL_VERBOSE_LOGGING) {
+      ldout(rwl.m_image_ctx.cct, 15) << this << " completing user req" << dendl;
+    }
+    m_user_req_completed_time = ceph_clock_now();
+    user_req->complete(r);
+    // Set user_req as null as it is deleted
+    user_req = nullptr;
+  } else {
+    if (RWL_VERBOSE_LOGGING) {
+      ldout(rwl.m_image_ctx.cct, 20) << this << " user req already completed" << dendl;
+    }
+  }
+}
+
+template <typename T>
+void C_BlockIORequest<T>::finish(int r) {
+  ldout(rwl.m_image_ctx.cct, 20) << this << dendl;
+
+  complete_user_request(r);
+  bool initial = false;
+  if (m_finish_called.compare_exchange_strong(initial, true)) {
+    if (RWL_VERBOSE_LOGGING) {
+      ldout(rwl.m_image_ctx.cct, 15) << this << " finishing" << dendl;
+    }
+    finish_req(0);
+  } else {
+    ldout(rwl.m_image_ctx.cct, 20) << this << " already finished" << dendl;
+    ceph_assert(0);
+  }
+}
+
+template <typename T>
+void C_BlockIORequest<T>::deferred() {
+  bool initial = false;
+  if (m_deferred.compare_exchange_strong(initial, true)) {
+    deferred_handler();
+  }
+}
+
+template <typename T>
+C_WriteRequest<T>::C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents,
+               bufferlist&& bl, const int fadvise_flags, Context *user_req)
+  : C_BlockIORequest<T>(rwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req) {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
+  }
+}
+
+template <typename T>
+C_WriteRequest<T>::~C_WriteRequest() {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 99) << this << dendl;
+  }
+}
+
+template <typename T>
+std::ostream &operator<<(std::ostream &os,
+                                const C_WriteRequest<T> &req) {
+  os << (C_BlockIORequest<T>&)req
+     << " m_resources.allocated=" << req.resources.allocated;
+  if (req.m_op_set) {
+     os << "m_op_set=" << *req.m_op_set;
+  }
+  return os;
+};
+
+template <typename T>
+void C_WriteRequest<T>::finish_req(int r) {
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(rwl.m_image_ctx.cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
+  }
+
+  /* Completed to caller by here (in finish(), which calls this) */
+  utime_t now = ceph_clock_now();
+  rwl.release_write_lanes(this);
+  this->release_cell(); /* TODO: Consider doing this in appending state */
+  update_req_stats(now);
+}
+
+template <typename T>
+void C_WriteRequest<T>::setup_buffer_resources(uint64_t &bytes_cached, uint64_t &bytes_dirtied) {
+  for (auto &extent : this->image_extents) {
+    resources.buffers.emplace_back();
+    struct WriteBufferAllocation &buffer = resources.buffers.back();
+    buffer.allocation_size = MIN_WRITE_ALLOC_SIZE;
+    buffer.allocated = false;
+    bytes_cached += extent.second;
+    if (extent.second > buffer.allocation_size) {
+      buffer.allocation_size = extent.second;
+    }
+  }
+  bytes_dirtied = bytes_cached;
+}
+
+template <typename T>
+void C_WriteRequest<T>::setup_log_operations() {
+  for (auto &extent : this->image_extents) {
+    /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
+    auto operation =
+      std::make_shared<WriteLogOperation<T>>(*m_op_set, extent.first, extent.second);
+    m_op_set->operations.emplace_back(operation);
+  }
+}
+
+template <typename T>
+void C_WriteRequest<T>::schedule_append() {
+  // TODO: call rwl to complete it
+}
+
+/**
+ * Attempts to allocate log resources for a write. Returns true if successful.
+ *
+ * Resources include 1 lane per extent, 1 log entry per extent, and the payload
+ * data space for each extent.
+ *
+ * Lanes are released after the write persists via release_write_lanes()
+ */
+template <typename T>
+bool C_WriteRequest<T>::alloc_resources()
+{
+  bool alloc_succeeds = true;
+  bool no_space = false;
+  utime_t alloc_start = ceph_clock_now();
+  uint64_t bytes_allocated = 0;
+  uint64_t bytes_cached = 0;
+  uint64_t bytes_dirtied = 0;
+
+  ceph_assert(!rwl.m_lock.is_locked_by_me());
+  ceph_assert(!resources.allocated);
+  resources.buffers.reserve(this->image_extents.size());
+  {
+    std::lock_guard locker(rwl.m_lock);
+    if (rwl.m_free_lanes < this->image_extents.size()) {
+      this->m_waited_lanes = true;
+      if (RWL_VERBOSE_LOGGING) {
+        ldout(rwl.m_image_ctx.cct, 20) << "not enough free lanes (need "
+                                       <<  this->image_extents.size()
+                                       << ", have " << rwl.m_free_lanes << ") "
+                                       << *this << dendl;
+      }
+      alloc_succeeds = false;
+      /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
+    }
+    if (rwl.m_free_log_entries < this->image_extents.size()) {
+      this->m_waited_entries = true;
+      if (RWL_VERBOSE_LOGGING) {
+        ldout(rwl.m_image_ctx.cct, 20) << "not enough free entries (need "
+                                       <<  this->image_extents.size()
+                                       << ", have " << rwl.m_free_log_entries << ") "
+                                       << *this << dendl;
+      }
+      alloc_succeeds = false;
+      no_space = true; /* Entries must be retired */
+    }
+    /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
+    if (rwl.m_bytes_allocated > rwl.m_bytes_allocated_cap) {
+      if (!this->m_waited_buffers) {
+        this->m_waited_buffers = true;
+        if (RWL_VERBOSE_LOGGING) {
+          ldout(rwl.m_image_ctx.cct, 1) << "Waiting for allocation cap (cap=" << rwl.m_bytes_allocated_cap
+                                        << ", allocated=" << rwl.m_bytes_allocated
+                                        << ") in write [" << *this << "]" << dendl;
+        }
+      }
+      alloc_succeeds = false;
+      no_space = true; /* Entries must be retired */
+    }
+  }
+  if (alloc_succeeds) {
+    setup_buffer_resources(bytes_cached, bytes_dirtied);
+  }
+
+  if (alloc_succeeds) {
+    for (auto &buffer : resources.buffers) {
+      bytes_allocated += buffer.allocation_size;
+      utime_t before_reserve = ceph_clock_now();
+      buffer.buffer_oid = pmemobj_reserve(rwl.m_log_pool,
+                                          &buffer.buffer_alloc_action,
+                                          buffer.allocation_size,
+                                          0 /* Object type */);
+      buffer.allocation_lat = ceph_clock_now() - before_reserve;
+      if (TOID_IS_NULL(buffer.buffer_oid)) {
+        if (!this->m_waited_buffers) {
+          this->m_waited_buffers = true;
+        }
+        if (RWL_VERBOSE_LOGGING) {
+          ldout(rwl.m_image_ctx.cct, 5) << "can't allocate all data buffers: "
+                                        << pmemobj_errormsg() << ". "
+                                        << *this << dendl;
+        }
+        alloc_succeeds = false;
+        no_space = true; /* Entries need to be retired */
+        break;
+      } else {
+        buffer.allocated = true;
+      }
+      if (RWL_VERBOSE_LOGGING) {
+        ldout(rwl.m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
+                                       << "." << buffer.buffer_oid.oid.off
+                                       << ", size=" << buffer.allocation_size << dendl;
+      }
+    }
+  }
+
+  if (alloc_succeeds) {
+    unsigned int num_extents = this->image_extents.size();
+    std::lock_guard locker(rwl.m_lock);
+    /* We need one free log entry per extent (each is a separate entry), and
+     * one free "lane" for remote replication. */
+    if ((rwl.m_free_lanes >= num_extents) &&
+        (rwl.m_free_log_entries >= num_extents)) {
+      rwl.m_free_lanes -= num_extents;
+      rwl.m_free_log_entries -= num_extents;
+      rwl.m_unpublished_reserves += num_extents;
+      rwl.m_bytes_allocated += bytes_allocated;
+      rwl.m_bytes_cached += bytes_cached;
+      rwl.m_bytes_dirty += bytes_dirtied;
+      resources.allocated = true;
+    } else {
+      alloc_succeeds = false;
+    }
+  }
+
+  if (!alloc_succeeds) {
+    /* On alloc failure, free any buffers we did allocate */
+    for (auto &buffer : resources.buffers) {
+      if (buffer.allocated) {
+        pmemobj_cancel(rwl.m_log_pool, &buffer.buffer_alloc_action, 1);
+      }
+    }
+    resources.buffers.clear();
+    if (no_space) {
+      /* Expedite flushing and/or retiring */
+      std::lock_guard locker(rwl.m_lock);
+      rwl.m_alloc_failed_since_retire = true;
+      rwl.m_last_alloc_fail = ceph_clock_now();
+    }
+  }
+
+  this->m_allocated_time = alloc_start;
+  return alloc_succeeds;
+}
+
+/**
+ * Takes custody of write_req. Resources must already be allocated.
+ *
+ * Locking:
+ * Acquires m_lock
+ */
+template <typename T>
+void C_WriteRequest<T>::dispatch()
+{
+  CephContext *cct = rwl.m_image_ctx.cct;
+  GeneralWriteLogEntries log_entries;
+  DeferredContexts on_exit;
+  utime_t now = ceph_clock_now();
+  auto write_req_sp = this;
+  this->m_dispatched_time = now;
+
+  if (RWL_VERBOSE_LOGGING) {
+    ldout(cct, 15) << "name: " << rwl.m_image_ctx.name << " id: " << rwl.m_image_ctx.id
+                   << "write_req=" << this << " cell=" << this->get_cell() << dendl;
+  }
+
+  {
+    uint64_t buffer_offset = 0;
+    std::lock_guard locker(rwl.m_lock);
+    Context *set_complete = this;
+    // TODO: Add sync point if necessary
+    //
+    m_op_set =
+      make_unique<WriteLogOperationSet<T>>(rwl, now, rwl.m_current_sync_point, rwl.m_persist_on_flush,
+                                           set_complete);
+    if (RWL_VERBOSE_LOGGING) {
+      ldout(cct, 20) << "write_req=" << this << " m_op_set=" << m_op_set.get() << dendl;
+    }
+    ceph_assert(resources.allocated);
+    /* m_op_set->operations initialized differently for plain write or write same */
+    this->setup_log_operations();
+    auto allocation = resources.buffers.begin();
+    for (auto &gen_op : m_op_set->operations) {
+      /* A WS is also a write */
+      auto operation = gen_op->get_write_op();
+      if (RWL_VERBOSE_LOGGING) {
+        ldout(cct, 20) << "write_req=" << this << " m_op_set=" << m_op_set.get()
+                       << " operation=" << operation << dendl;
+      }
+      log_entries.emplace_back(operation->log_entry);
+      rwl.m_perfcounter->inc(l_librbd_rwl_log_ops, 1);
+
+      operation->log_entry->ram_entry.has_data = 1;
+      operation->log_entry->ram_entry.write_data = allocation->buffer_oid;
+      // TODO: make std::shared_ptr
+      operation->buffer_alloc = &(*allocation);
+      ceph_assert(!TOID_IS_NULL(operation->log_entry->ram_entry.write_data));
+      operation->log_entry->pmem_buffer = D_RW(operation->log_entry->ram_entry.write_data);
+      operation->log_entry->ram_entry.sync_gen_number = rwl.m_current_sync_gen;
+      if (m_op_set->persist_on_flush) {
+        /* Persist on flush. Sequence #0 is never used. */
+        operation->log_entry->ram_entry.write_sequence_number = 0;
+      } else {
+        /* Persist on write */
+        operation->log_entry->ram_entry.write_sequence_number = ++rwl.m_last_op_sequence_num;
+        operation->log_entry->ram_entry.sequenced = 1;
+      }
+      operation->log_entry->ram_entry.sync_point = 0;
+      operation->log_entry->ram_entry.discard = 0;
+      operation->bl.substr_of(this->bl, buffer_offset,
+                              operation->log_entry->write_bytes());
+      buffer_offset += operation->log_entry->write_bytes();
+      if (RWL_VERBOSE_LOGGING) {
+        ldout(cct, 20) << "operation=[" << *operation << "]" << dendl;
+      }
+      allocation++;
+    }
+  }
+  /* All extent ops subs created */
+  m_op_set->extent_ops_appending->activate();
+  m_op_set->extent_ops_persist->activate();
+
+  /* Write data */
+  for (auto &operation : m_op_set->operations) {
+    /* operation is a shared_ptr, so write_op is only good as long as operation is in scope */
+    auto write_op = operation->get_write_op();
+    ceph_assert(write_op != nullptr);
+    bufferlist::iterator i(&write_op->bl);
+    rwl.m_perfcounter->inc(l_librbd_rwl_log_op_bytes, write_op->log_entry->write_bytes());
+    if (RWL_VERBOSE_LOGGING) {
+      ldout(cct, 20) << write_op->bl << dendl;
+    }
+    i.copy((unsigned)write_op->log_entry->write_bytes(), (char*)write_op->log_entry->pmem_buffer);
+  }
+
+  // TODO: Add to log map for read
+
+  /*
+   * Entries are added to m_log_entries in alloc_op_log_entries() when their
+   * order is established. They're added to m_dirty_log_entries when the write
+   * completes to all replicas. They must not be flushed before then. We don't
+   * prevent the application from reading these before they persist. If we
+   * supported coherent shared access, that might be a problem (the write could
+   * fail after another initiator had read it). As it is the cost of running
+   * reads through the block guard (and exempting them from the barrier, which
+   * doesn't need to apply to them) to prevent reading before the previous
+   * write of that data persists doesn't seem justified.
+   */
+
+  if (rwl.m_persist_on_flush_early_user_comp &&
+      m_op_set->persist_on_flush) {
+    /*
+     * We're done with the caller's buffer, and not guaranteeing
+     * persistence until the next flush. The block guard for this
+     * write_req will not be released until the write is persisted
+     * everywhere, but the caller's request can complete now.
+     */
+    this->complete_user_request(0);
+  }
+
+  bool append_deferred = false;
+  {
+    std::lock_guard locker(rwl.m_lock);
+    if (!m_op_set->persist_on_flush &&
+        m_op_set->sync_point->earlier_sync_point) {
+      /* In persist-on-write mode, we defer the append of this write until the
+       * previous sync point is appending (meaning all the writes before it are
+       * persisted and that previous sync point can now appear in the
+       * log). Since we insert sync points in persist-on-write mode when writes
+       * have already completed to the current sync point, this limits us to
+       * one inserted sync point in flight at a time, and gives the next
+       * inserted sync point some time to accumulate a few writes if they
+       * arrive soon. Without this we can insert an absurd number of sync
+       * points, each with one or two writes. That uses a lot of log entries,
+       * and limits flushing to very few writes at a time. */
+      m_do_early_flush = false;
+      Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) {
+          write_req_sp->schedule_append();
+        });
+      m_op_set->sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx);
+      append_deferred = true;
+    } else {
+      /* The prior sync point is done, so we'll schedule append here. If this is
+       * persist-on-write, and probably still the caller's thread, we'll use this
+       * caller's thread to perform the persist & replication of the payload
+       * buffer. */
+      m_do_early_flush =
+        !(this->detained || this->m_queued || this->m_deferred || m_op_set->persist_on_flush);
+    }
+  }
+  if (!append_deferred) {
+    this->schedule_append();
+  }
+}
+
+} // namespace rwl 
+} // namespace cache 
+} // namespace librbd 
diff --git a/src/librbd/cache/rwl/Request.h b/src/librbd/cache/rwl/Request.h
new file mode 100644 (file)
index 0000000..9556c4f
--- /dev/null
@@ -0,0 +1,159 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_CACHE_RWL_REQUEST_H 
+#define CEPH_LIBRBD_CACHE_RWL_REQUEST_H 
+
+#include "include/Context.h"
+#include "librbd/cache/ImageCache.h"
+#include "librbd/cache/rwl/Types.h"
+#include "librbd/cache/rwl/LogOperation.h"
+
+namespace librbd {
+class BlockGuardCell;
+
+namespace cache {
+namespace rwl {
+
+/**
+ * A request that can be deferred in a BlockGuard to sequence
+ * overlapping operations.
+ */
+template <typename T>
+class C_GuardedBlockIORequest : public Context {
+public:
+  T &rwl;
+  C_GuardedBlockIORequest(T &rwl);
+  ~C_GuardedBlockIORequest();
+  C_GuardedBlockIORequest(const C_GuardedBlockIORequest&) = delete;
+  C_GuardedBlockIORequest &operator=(const C_GuardedBlockIORequest&) = delete;
+
+  virtual const char *get_name() const = 0;
+  void set_cell(BlockGuardCell *cell);
+  BlockGuardCell *get_cell(void);
+  void release_cell();
+  
+private:
+  std::atomic<bool> m_cell_released = {false};
+  BlockGuardCell* m_cell = nullptr;
+};
+
+/**
+ * This is the custodian of the BlockGuard cell for this IO, and the
+ * state information about the progress of this IO. This object lives
+ * until the IO is persisted in all (live) log replicas.  User request
+ * may be completed from here before the IO persists.
+ */
+template <typename T>
+class C_BlockIORequest : public C_GuardedBlockIORequest<T> {
+public:
+  using C_GuardedBlockIORequest<T>::rwl;
+
+  io::Extents image_extents;
+  bufferlist bl;
+  int fadvise_flags;
+  Context *user_req; /* User write request */
+  ExtentsSummary<io::Extents> image_extents_summary;
+  bool detained = false;                /* Detained in blockguard (overlapped with a prior IO) */
+
+  C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents,
+                   bufferlist&& bl, const int fadvise_flags, Context *user_req);
+  virtual ~C_BlockIORequest();
+
+  void complete_user_request(int r);
+  void finish(int r);
+  virtual void finish_req(int r) = 0;
+
+  virtual bool alloc_resources() = 0;
+
+  void deferred();
+
+  virtual void deferred_handler() = 0;
+
+  virtual void dispatch()  = 0;
+
+  virtual const char *get_name() const override {
+    return "C_BlockIORequest";
+  }
+
+protected:
+  utime_t m_arrived_time;
+  utime_t m_allocated_time;               /* When allocation began */
+  utime_t m_dispatched_time;              /* When dispatch began */
+  utime_t m_user_req_completed_time;
+  bool m_waited_lanes = false;            /* This IO waited for free persist/replicate lanes */
+  bool m_waited_entries = false;          /* This IO waited for free log entries */
+  bool m_waited_buffers = false;          /* This IO waited for data buffers (pmemobj_reserve() failed) */
+  std::atomic<bool> m_deferred = {false}; /* Deferred because this or a prior IO had to wait for write resources */
+
+private:
+  std::atomic<bool> m_user_req_completed = {false};
+  std::atomic<bool> m_finish_called = {false};
+
+  template <typename U>
+  friend std::ostream &operator<<(std::ostream &os,
+                                  const C_BlockIORequest<U> &req);
+};
+
+struct WriteRequestResources {
+  bool allocated = false;
+  std::vector<WriteBufferAllocation> buffers;
+};
+
+/**
+ * This is the custodian of the BlockGuard cell for this write. Block
+ * guard is not released until the write persists everywhere (this is
+ * how we guarantee to each log replica that they will never see
+ * overlapping writes).
+ */
+template <typename T>
+class C_WriteRequest : public C_BlockIORequest<T> {
+public:
+  using C_BlockIORequest<T>::rwl;
+  WriteRequestResources resources;
+
+  C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents,
+                 bufferlist&& bl, const int fadvise_flags, Context *user_req);
+
+  ~C_WriteRequest();
+
+  /* Common finish to plain write and compare-and-write (if it writes) */
+  virtual void finish_req(int r);
+
+  /* Compare and write will override this */
+  virtual void update_req_stats(utime_t &now) {
+    // TODO: Add in later PRs
+  }
+  virtual bool alloc_resources() override;
+
+  /* Plain writes will allocate one buffer per request extent */
+  virtual void setup_buffer_resources(uint64_t &bytes_cached, uint64_t &bytes_dirtied);
+
+  void deferred_handler() override { }
+
+  void dispatch() override;
+
+  virtual void setup_log_operations();
+
+  virtual void schedule_append();
+
+  const char *get_name() const override {
+    return "C_WriteRequest";
+  }
+
+private:
+  unique_ptr<WriteLogOperationSet<T>> m_op_set = nullptr;
+  bool m_do_early_flush = false;
+  std::atomic<int> m_appended = {0};
+  bool m_queued = false;
+
+  template <typename U>
+  friend std::ostream &operator<<(std::ostream &os,
+                                  const C_WriteRequest<U> &req);
+};
+
+} // namespace rwl 
+} // namespace cache 
+} // namespace librbd 
+
+#endif // CEPH_LIBRBD_CACHE_RWL_REQUEST_H 
index a135c6c3d5b8d0143c236f3482786adb2ec11191..f486b934d0d5158da0f986d43b1696b4cf21c0d5 100644 (file)
@@ -26,9 +26,9 @@ SyncPoint<T>::SyncPoint(T &rwl, const uint64_t sync_gen_num)
 
 template <typename T>
 SyncPoint<T>::~SyncPoint() {
-  assert(on_sync_point_appending.empty());
-  assert(on_sync_point_persisted.empty());
-  assert(!earlier_sync_point);
+  ceph_assert(on_sync_point_appending.empty());
+  ceph_assert(on_sync_point_persisted.empty());
+  ceph_assert(!earlier_sync_point);
 }
 
 template <typename T>
index 8da93875e71efc2bb1c55867ddedee0bbc87ea5f..768b85b3fa30b8149929cd0c21e10eca96834865 100644 (file)
@@ -92,6 +92,40 @@ std::ostream& operator<<(std::ostream& os,
   return os;
 };
 
+template <typename ExtentsType>
+ExtentsSummary<ExtentsType>::ExtentsSummary(const ExtentsType &extents) {
+  total_bytes = 0;
+  first_image_byte = 0;
+  last_image_byte = 0;
+  if (extents.empty()) return;
+  /* These extents refer to image offsets between first_image_byte
+   * and last_image_byte, inclusive, but we don't guarantee here
+   * that they address all of those bytes. There may be gaps. */
+  first_image_byte = extents.front().first;
+  last_image_byte = first_image_byte + extents.front().second;
+  for (auto &extent : extents) {
+    /* Ignore zero length extents */
+    if (extent.second) {
+      total_bytes += extent.second;
+      if (extent.first < first_image_byte) {
+        first_image_byte = extent.first;
+      }
+      if ((extent.first + extent.second) > last_image_byte) {
+        last_image_byte = extent.first + extent.second;
+      }
+    }
+  }
+}
+
+template <typename T>
+std::ostream &operator<<(std::ostream &os,
+                                const ExtentsSummary<T> &s) {
+  os << "total_bytes=" << s.total_bytes << ", "
+     << "first_image_byte=" << s.first_image_byte << ", "
+     << "last_image_byte=" << s.last_image_byte << "";
+  return os;
+};
+
 } // namespace rwl
 } // namespace cache
 } // namespace librbd
index 972fd2ea724ccb5eaca99d76b4ec301f2420b0b4..ecf655453566c02fe85cbc24fce289c1562af778 100644 (file)
@@ -7,6 +7,7 @@
 #include <vector>
 #include <libpmemobj.h>
 #include "librbd/BlockGuard.h"
+#include "librbd/io/Types.h"
 
 class Context;
 
@@ -136,6 +137,18 @@ enum {
   l_librbd_rwl_last,
 };
 
+const uint32_t MIN_WRITE_ALLOC_SIZE = 512;
+const uint32_t LOG_STATS_INTERVAL_SECONDS = 5;
+
+/**** Write log entries ****/
+
+const uint64_t DEFAULT_POOL_SIZE = 1u<<30;
+const uint64_t MIN_POOL_SIZE = DEFAULT_POOL_SIZE;
+constexpr double USABLE_SIZE = (7.0 / 10);
+const uint64_t BLOCK_ALLOC_OVERHEAD_BYTES = 16;
+const uint8_t RWL_POOL_VERSION = 1;
+const uint64_t MAX_LOG_ENTRIES = (1024 * 1024);
+
 namespace librbd {
 namespace cache {
 namespace rwl {
@@ -220,6 +233,29 @@ struct WriteBufferAllocation {
   utime_t allocation_lat;
 };
 
+template <typename ExtentsType>
+class ExtentsSummary {
+public:
+  uint64_t total_bytes;
+  uint64_t first_image_byte;
+  uint64_t last_image_byte;
+  ExtentsSummary(const ExtentsType &extents);
+  template <typename U>
+  friend std::ostream &operator<<(std::ostream &os,
+                                  const ExtentsSummary<U> &s);
+  const BlockExtent block_extent() {
+    return BlockExtent(first_image_byte, last_image_byte);
+  }
+  const io::Extent image_extent(const BlockExtent& block_extent)
+  {
+    return io::Extent(block_extent.block_start,
+                  block_extent.block_end - block_extent.block_start + 1);
+  }
+  const io::Extent image_extent() {
+    return image_extent(block_extent());
+  }
+};
+
 } // namespace rwl 
 } // namespace cache 
 } // namespace librbd 
index 53c8f4d74f76120d6e19ff64d5e094fc1085d229..1742d70fb42f163f76685d8117f3fdd1d67df2f5 100644 (file)
@@ -81,7 +81,9 @@ using striper::LightweightBufferExtents;
 using striper::LightweightObjectExtent;
 using striper::LightweightObjectExtents;
 
-typedef std::vector<std::pair<uint64_t, uint64_t> > Extents;
+typedef std::pair<uint64_t,uint64_t> Extent;
+typedef std::vector<Extent> Extents;
+
 typedef std::map<uint64_t, uint64_t> ExtentMap;
 
 } // namespace io