]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: flush dirty entries to osd
authorYuan Lu <yuan.y.lu@intel.com>
Mon, 17 Feb 2020 07:43:27 +0000 (15:43 +0800)
committerYuan Lu <yuan.y.lu@intel.com>
Tue, 31 Mar 2020 03:06:42 +0000 (11:06 +0800)
Signed-off-by: Peterson, Scott <scott.d.peterson@intel.com>
Signed-off-by: Li, Xiaoyan <xiaoyan.li@intel.com>
Signed-off-by: Lu, Yuan <yuan.y.lu@intel.com>
Signed-off-by: Chamarthy, Mahati <mahati.chamarthy@intel.com>
src/librbd/cache/ImageWriteback.h
src/librbd/cache/ReplicatedWriteLog.cc
src/librbd/cache/ReplicatedWriteLog.h
src/librbd/cache/rwl/LogEntry.cc
src/librbd/cache/rwl/LogEntry.h
src/librbd/cache/rwl/Types.h

index 382c57c1d390d39c3259a51594978ab6bdbd6768..bbcc85c8ae5439cb9046d48dd1eee623a311b604 100644 (file)
@@ -16,13 +16,35 @@ struct ImageCtx;
 
 namespace cache {
 
+class ImageWritebackInterface {
+public:
+  typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
+  virtual ~ImageWritebackInterface() {
+  }
+  virtual void aio_read(Extents &&image_extents, ceph::bufferlist *bl,
+                        int fadvise_flags, Context *on_finish) = 0;
+  virtual void aio_write(Extents &&image_extents, ceph::bufferlist&& bl,
+                         int fadvise_flags, Context *on_finish) = 0;
+  virtual void aio_discard(uint64_t offset, uint64_t length,
+                           uint32_t discard_granularity_bytes, Context *on_finish) = 0;
+  virtual void aio_flush(Context *on_finish) = 0 ;
+  virtual void aio_writesame(uint64_t offset, uint64_t length,
+                             ceph::bufferlist&& bl,
+                             int fadvise_flags, Context *on_finish) = 0;
+  virtual void aio_compare_and_write(Extents &&image_extents,
+                                     ceph::bufferlist&& cmp_bl,
+                                     ceph::bufferlist&& bl,
+                                     uint64_t *mismatch_offset,
+                                     int fadvise_flags, Context *on_finish) = 0;
+};
+
 /**
  * client-side, image extent cache writeback handler
  */
 template <typename ImageCtxT = librbd::ImageCtx>
-class ImageWriteback {
+class ImageWriteback : public ImageWritebackInterface {
 public:
-  typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
+  using ImageWritebackInterface::Extents;
 
   explicit ImageWriteback(ImageCtxT &image_ctx);
 
index b43c073e88a0a8bb3815aed6fed858afe3817382..ca0fd2751f3de2f38e0298b653857d80502def5a 100644 (file)
@@ -42,6 +42,7 @@ ReplicatedWriteLog<I>::ReplicatedWriteLog(I &image_ctx, librbd::cache::rwl::Imag
     m_image_ctx(image_ctx),
     m_log_pool_config_size(DEFAULT_POOL_SIZE),
     m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct),
+    m_entry_reader_lock("librbd::cache::ReplicatedWriteLog::m_entry_reader_lock"),
     m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name(
       "librbd::cache::ReplicatedWriteLog::m_deferred_dispatch_lock", this))),
     m_log_append_lock(ceph::make_mutex(util::unique_lock_name(
@@ -546,11 +547,6 @@ void ReplicatedWriteLog<I>::aio_compare_and_write(Extents &&image_extents,
                                                   Context *on_finish) {
 }
 
-template <typename I>
-void ReplicatedWriteLog<I>::wake_up() {
-  //TODO: handle the task to flush data from cache device to OSD
-}
-
 template <typename I>
 void ReplicatedWriteLog<I>::flush(Context *on_finish) {
 }
@@ -1352,6 +1348,215 @@ C_FlushRequest<ReplicatedWriteLog<I>>* ReplicatedWriteLog<I>::make_flush_req(Con
   return flush_req;
 }
 
+template <typename I>
+void ReplicatedWriteLog<I>::wake_up() {
+  CephContext *cct = m_image_ctx.cct;
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  if (!m_wake_up_enabled) {
+    // wake_up is disabled during shutdown after flushing completes
+    ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl;
+    return;
+  }
+
+  if (m_wake_up_requested && m_wake_up_scheduled) {
+    return;
+  }
+
+  ldout(cct, 20) << dendl;
+
+  /* Wake-up can be requested while it's already scheduled */
+  m_wake_up_requested = true;
+
+  /* Wake-up cannot be scheduled if it's already scheduled */
+  if (m_wake_up_scheduled) {
+    return;
+  }
+  m_wake_up_scheduled = true;
+  m_async_process_work++;
+  m_async_op_tracker.start_op();
+  m_work_queue.queue(new LambdaContext(
+    [this](int r) {
+      process_work();
+      m_async_op_tracker.finish_op();
+      m_async_process_work--;
+    }), 0);
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::process_work() {
+  // TODO: handle retiring entries in later PRs
+  CephContext *cct = m_image_ctx.cct;
+  int max_iterations = 4;
+  bool wake_up_requested = false;
+
+  ldout(cct, 20) << dendl;
+
+  do {
+    {
+      std::lock_guard locker(m_lock);
+      m_wake_up_requested = false;
+    }
+    // TODO: retire entries if fulfill conditions
+    dispatch_deferred_writes();
+    process_writeback_dirty_entries();
+
+    {
+      std::lock_guard locker(m_lock);
+      wake_up_requested = m_wake_up_requested;
+    }
+  } while (wake_up_requested && --max_iterations > 0);
+
+  {
+    std::lock_guard locker(m_lock);
+    m_wake_up_scheduled = false;
+    /* Reschedule if it's still requested */
+    if (m_wake_up_requested) {
+      wake_up();
+    }
+  }
+}
+
+template <typename I>
+bool ReplicatedWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) {
+  CephContext *cct = m_image_ctx.cct;
+
+  ldout(cct, 20) << "" << dendl;
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  //TODO handle invalidate
+
+  /* For OWB we can flush entries with the same sync gen number (write between
+   * aio_flush() calls) concurrently. Here we'll consider an entry flushable if
+   * its sync gen number is <= the lowest sync gen number carried by all the
+   * entries currently flushing.
+   *
+   * If the entry considered here bears a sync gen number lower than a
+   * previously flushed entry, the application had to have submitted the write
+   * bearing the higher gen number before the write with the lower gen number
+   * completed. So, flushing these concurrently is OK.
+   *
+   * If the entry considered here bears a sync gen number higher than a
+   * currently flushing entry, the write with the lower gen number may have
+   * completed to the application before the write with the higher sync gen
+   * number was submitted, and the application may rely on that completion
+   * order for volume consistency. In this case the entry will not be
+   * considered flushable until all the entries bearing lower sync gen numbers
+   * finish flushing.
+   */
+
+  if (m_flush_ops_in_flight &&
+      (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) {
+    return false;
+  }
+
+  return (log_entry->can_writeback() &&
+         (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) &&
+         (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT));
+}
+
+template <typename I>
+Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<GenericLogEntry> log_entry) {
+  //TODO handle writesame, invalidate and discard in later PRs
+  CephContext *cct = m_image_ctx.cct;
+
+  ldout(cct, 20) << "" << dendl;
+  ceph_assert(m_entry_reader_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+  if (!m_flush_ops_in_flight ||
+      (log_entry->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
+    m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
+  }
+  m_flush_ops_in_flight += 1;
+  /* For write same this is the bytes affected bt the flush op, not the bytes transferred */
+  m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
+
+  /* Flush write completion action */
+  Context *ctx = new LambdaContext(
+    [this, log_entry](int r) {
+      {
+        std::lock_guard locker(m_lock);
+        if (r < 0) {
+          lderr(m_image_ctx.cct) << "failed to flush log entry"
+                                 << cpp_strerror(r) << dendl;
+          m_dirty_log_entries.push_front(log_entry);
+        } else {
+          ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty());
+          m_bytes_dirty -= log_entry->bytes_dirty();
+          sync_point_writer_flushed(log_entry->get_sync_point_entry());
+          ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
+                                     << dendl;
+        }
+        m_flush_ops_in_flight -= 1;
+        m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes;
+        wake_up();
+      }
+    });
+  /* Flush through lower cache before completing */
+  ctx = new LambdaContext(
+    [this, ctx](int r) {
+      if (r < 0) {
+        lderr(m_image_ctx.cct) << "failed to flush log entry"
+                               << cpp_strerror(r) << dendl;
+        ctx->complete(r);
+      } else {
+        m_image_writeback.aio_flush(ctx);
+      }
+    });
+
+  return new LambdaContext(
+    [this, log_entry, ctx](int r) {
+      m_image_ctx.op_work_queue->queue(new LambdaContext(
+        [this, log_entry, ctx](int r) {
+          ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                     << " " << *log_entry << dendl;
+          log_entry->writeback(m_image_writeback, ctx);
+        }), 0);
+    });
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::process_writeback_dirty_entries() {
+  CephContext *cct = m_image_ctx.cct;
+  bool all_clean = false;
+  int flushed = 0;
+
+  ldout(cct, 20) << "Look for dirty entries" << dendl;
+  {
+    DeferredContexts post_unlock;
+    std::shared_lock entry_reader_locker(m_entry_reader_lock);
+    while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
+      std::lock_guard locker(m_lock);
+      if (m_shutting_down) {
+        ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
+        /* Do flush complete only when all flush ops are finished */
+        all_clean = !m_flush_ops_in_flight;
+        break;
+      }
+      if (m_dirty_log_entries.empty()) {
+        ldout(cct, 20) << "Nothing new to flush" << dendl;
+        /* Do flush complete only when all flush ops are finished */
+        all_clean = !m_flush_ops_in_flight;
+        break;
+      }
+      auto candidate = m_dirty_log_entries.front();
+      bool flushable = can_flush_entry(candidate);
+      if (flushable) {
+        post_unlock.add(construct_flush_entry_ctx(candidate));
+        flushed++;
+        m_dirty_log_entries.pop_front();
+      } else {
+        ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
+        break;
+      }
+    }
+  }
+
+  if (all_clean) {
+    // TODO: all flusing complete
+  }
+}
+
 /* Update/persist the last flushed sync point in the log */
 template <typename I>
 void ReplicatedWriteLog<I>::persist_last_flushed_sync_gen()
@@ -1417,6 +1622,21 @@ bool ReplicatedWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointL
   return false;
 }
 
+template <typename I>
+void ReplicatedWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointLogEntry> log_entry)
+{
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+  ceph_assert(log_entry);
+  log_entry->writes_flushed++;
+
+  /* If this entry might be completely flushed, look closer */
+  if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) {
+    ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point="
+                               << *log_entry << dendl;
+    handle_flushed_sync_point(log_entry);
+  }
+}
+
 /* Make a new sync point and flush the previous during initialization, when there may or may
 * not be a previous sync point */
 template <typename I>
index 30e53bc6afd081c0fb98789f1dc1b7d6f9d98244..482d2f9af518375dbeb2df46e07ac08e2113cf5f 100644 (file)
@@ -35,6 +35,7 @@ class GenericLogEntry;
 
 typedef std::list<std::shared_ptr<WriteLogEntry>> WriteLogEntries;
 typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
+typedef std::list<std::shared_ptr<GenericWriteLogEntry>> GenericWriteLogEntries;
 
 /**** Write log entries end ****/
 
@@ -135,6 +136,7 @@ private:
   librbd::cache::rwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
 
   std::atomic<bool> m_initialized = {false};
+  std::atomic<bool> m_shutting_down = {false};
   PMEMobjpool *m_log_pool = nullptr;
   const char* m_rwl_pool_layout_name;
 
@@ -183,9 +185,14 @@ private:
   std::atomic<int> m_async_append_ops = {0};
   std::atomic<int> m_async_complete_ops = {0};
   std::atomic<int> m_async_null_flush_finish = {0};
+  std::atomic<int> m_async_process_work = {0};
 
   /* Acquire locks in order declared here */
 
+  /* Hold a read lock on m_entry_reader_lock to add readers to log entry
+   * bufs. Hold a write lock to prevent readers from being added (e.g. when
+   * removing log entrys from the map). No lock required to remove readers. */
+  mutable RWLock m_entry_reader_lock;
   /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
   mutable ceph::mutex m_deferred_dispatch_lock;
   /* Hold m_log_append_lock while appending or retiring log entries. */
@@ -201,6 +208,9 @@ private:
   bool m_barrier_in_progress = false;
   BlockGuardCell *m_barrier_cell = nullptr;
 
+  bool m_wake_up_requested = false;
+  bool m_wake_up_scheduled = false;
+  bool m_wake_up_enabled = true;
   bool m_appending = false;
   bool m_dispatching_deferred_ops = false;
 
@@ -216,6 +226,10 @@ private:
   std::shared_ptr<rwl::SyncPoint> m_current_sync_point = nullptr;
   bool m_persist_on_flush = false; /* If false, persist each write before completion */
 
+  int m_flush_ops_in_flight = 0;
+  int m_flush_bytes_in_flight = 0;
+  uint64_t m_lowest_flushing_sync_gen = 0;
+
   /* Writes that have left the block guard, but are waiting for resources */
   C_BlockIORequests m_deferred_ios;
   /* Throttle writes concurrently allocating & replicating */
@@ -240,9 +254,14 @@ private:
   void rwl_init(Context *on_finish, rwl::DeferredContexts &later);
   void update_image_cache_state(Context *on_finish);
   void wake_up();
+  void process_work();
 
+  bool can_flush_entry(const std::shared_ptr<rwl::GenericLogEntry> log_entry);
+  Context *construct_flush_entry_ctx(const std::shared_ptr<rwl::GenericLogEntry> log_entry);
   void persist_last_flushed_sync_gen();
   bool handle_flushed_sync_point(std::shared_ptr<rwl::SyncPointLogEntry> log_entry);
+  void sync_point_writer_flushed(std::shared_ptr<rwl::SyncPointLogEntry> log_entry);
+  void process_writeback_dirty_entries();
 
   void init_flush_new_sync_point(rwl::DeferredContexts &later);
   void new_sync_point(rwl::DeferredContexts &later);
index 16858b5053764b5f9ec0b374e14ed80c22925e61..9f72df7d77e6277e99ff54560277ca5b0404f0df 100644 (file)
@@ -3,6 +3,7 @@
 
 #include <iostream>
 #include "LogEntry.h"
+#include "librbd/cache/ImageWriteback.h"
 
 #define dout_subsys ceph_subsys_rbd_rwl
 #undef dout_prefix
@@ -46,6 +47,13 @@ std::ostream &operator<<(std::ostream &os,
   return entry.format(os);
 }
 
+bool GenericWriteLogEntry::can_writeback() {
+  return (this->completed &&
+          (ram_entry.sequenced ||
+           (sync_point_entry &&
+            sync_point_entry->completed)));
+}
+
 std::ostream& GenericWriteLogEntry::format(std::ostream &os) const {
   GenericLogEntry::format(os);
   os << ", "
@@ -56,9 +64,7 @@ std::ostream& GenericWriteLogEntry::format(std::ostream &os) const {
     os << "nullptr";
   }
   os << "], "
-     << "referring_map_entries=" << referring_map_entries << ", "
-     << "flushing=" << flushing << ", "
-     << "flushed=" << flushed;
+     << "referring_map_entries=" << referring_map_entries;
   return os;
 };
 
@@ -130,6 +136,17 @@ void WriteLogEntry::copy_pmem_bl(bufferlist *out_bl) {
   this->init_bl(cloned_bp, *out_bl);
 }
 
+void WriteLogEntry::writeback(librbd::cache::ImageWritebackInterface &image_writeback,
+                              Context *ctx) {
+  /* Pass a copy of the pmem buffer to ImageWriteback (which may hang on to the bl even after flush()). */
+  bufferlist entry_bl;
+  buffer::list entry_bl_copy;
+  copy_pmem_bl(&entry_bl_copy);
+  entry_bl_copy.begin(0).copy(write_bytes(), entry_bl);
+  image_writeback.aio_write({{ram_entry.image_offset_bytes, ram_entry.write_bytes}},
+                            std::move(entry_bl), 0, ctx);
+}
+
 std::ostream& WriteLogEntry::format(std::ostream &os) const {
   os << "(Write) ";
   GenericWriteLogEntry::format(os);
index b26fddb366f412c794c9022e03bd06d7766b093e..471102f6c5e837e987aea63df0cf31c28fec490c 100644 (file)
@@ -12,6 +12,7 @@
 
 namespace librbd {
 namespace cache {
+class ImageWritebackInterface;
 namespace rwl {
 
 class SyncPointLogEntry;
@@ -30,6 +31,19 @@ public:
   virtual ~GenericLogEntry() { };
   GenericLogEntry(const GenericLogEntry&) = delete;
   GenericLogEntry &operator=(const GenericLogEntry&) = delete;
+  virtual bool can_writeback() {
+    return false;
+  }
+  virtual inline unsigned int bytes_dirty() {
+    return 0;
+  };
+  virtual std::shared_ptr<SyncPointLogEntry> get_sync_point_entry() {
+    return nullptr;
+  }
+  virtual void writeback(librbd::cache::ImageWritebackInterface &image_writeback,
+                         Context *ctx) {
+    ceph_assert(false);
+  };
   virtual std::ostream& format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const GenericLogEntry &entry);
@@ -63,8 +77,6 @@ public:
 class GenericWriteLogEntry : public GenericLogEntry {
 public:
   uint32_t referring_map_entries = 0;
-  bool flushing = false;
-  bool flushed = false; /* or invalidated */
   std::shared_ptr<SyncPointLogEntry> sync_point_entry;
   GenericWriteLogEntry(std::shared_ptr<SyncPointLogEntry> sync_point_entry,
                        const uint64_t image_offset_bytes, const uint64_t write_bytes)
@@ -78,7 +90,7 @@ public:
     /* The valid bytes in this ops data buffer. Discard and WS override. */
     return ram_entry.write_bytes;
   };
-  virtual inline unsigned int bytes_dirty() {
+  inline unsigned int bytes_dirty() override {
     /* The bytes in the image this op makes dirty. Discard and WS override. */
     return write_bytes();
   };
@@ -90,6 +102,10 @@ public:
   }
   void inc_map_ref() { referring_map_entries++; }
   void dec_map_ref() { referring_map_entries--; }
+  bool can_writeback() override;
+  std::shared_ptr<SyncPointLogEntry> get_sync_point_entry() override {
+    return sync_point_entry;
+  }
   std::ostream &format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const GenericWriteLogEntry &entry);
@@ -136,6 +152,8 @@ public:
   buffer::list &get_pmem_bl();
   /* Constructs a new bl containing copies of pmem_bp */
   void copy_pmem_bl(bufferlist *out_bl);
+  void writeback(librbd::cache::ImageWritebackInterface &image_writeback,
+                 Context *ctx) override;
   std::ostream &format(std::ostream &os) const;
   friend std::ostream &operator<<(std::ostream &os,
                                   const WriteLogEntry &entry);
index d5de52a4c9ef6f7a964445b0a7a3b79ea3247d70..bd2ece22e21870bba37b5667b6a981b36c8a2e43 100644 (file)
@@ -141,6 +141,9 @@ namespace librbd {
 namespace cache {
 namespace rwl {
 
+const int IN_FLIGHT_FLUSH_WRITE_LIMIT = 64;
+const int IN_FLIGHT_FLUSH_BYTES_LIMIT = (1 * 1024 * 1024);
+
 /* Limit work between sync points */
 const uint64_t MAX_WRITES_PER_SYNC_POINT = 256;
 const uint64_t MAX_BYTES_PER_SYNC_POINT = (1024 * 1024 * 8);