]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd/cache/pwl: fix reorder when flush cache-data to osd.
authorJianpeng Ma <jianpeng.ma@intel.com>
Mon, 1 Nov 2021 01:25:52 +0000 (09:25 +0800)
committerJianpeng Ma <jianpeng.ma@intel.com>
Mon, 1 Nov 2021 01:25:52 +0000 (09:25 +0800)
Consider the following workload:
writeA(0, 4096)
writeB(0, 512).
pwl can makre sure writeA persist to cache before writeB.
But when flush to osd, it use async-read to read data from cache and in
the callback function they issue write to osd.
So although we by order issue aio-read(4096), aio-read(512). But we
can't make sure the return order.
If aio-read(512) firstly return, the write order to next layer is
writeB(0, 512)
writeA(0, 4096).
This is wrong from the user point.

To avoid this occur, we should firstly read all data from cache. And
then send write by order.

Fiexs: https://tracker.ceph.com/issues/52511

Tested-by: Feng Hualong <hualong.feng@intel.com>
Signed-off-by: Jianpeng Ma <jianpeng.ma@intel.com>
src/librbd/cache/pwl/AbstractWriteLog.cc
src/librbd/cache/pwl/AbstractWriteLog.h
src/librbd/cache/pwl/rwl/WriteLog.cc
src/librbd/cache/pwl/rwl/WriteLog.h
src/librbd/cache/pwl/ssd/WriteLog.cc
src/librbd/cache/pwl/ssd/WriteLog.h

index c01d2d411f9da45a8bc90691a2ea219ff75f9db2..421d5dd8bc49ed5a0914ba57042afc31c39ff747 100644 (file)
@@ -1687,13 +1687,16 @@ void AbstractWriteLog<I>::process_writeback_dirty_entries() {
   CephContext *cct = m_image_ctx.cct;
   bool all_clean = false;
   int flushed = 0;
+  bool has_write_entry = false;
 
   ldout(cct, 20) << "Look for dirty entries" << dendl;
   {
     DeferredContexts post_unlock;
+    GenericLogEntries entries_to_flush;
+
     std::shared_lock entry_reader_locker(m_entry_reader_lock);
+    std::lock_guard locker(m_lock);
     while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
-      std::lock_guard locker(m_lock);
       if (m_shutting_down) {
         ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
         /* Do flush complete only when all flush ops are finished */
@@ -1709,14 +1712,18 @@ void AbstractWriteLog<I>::process_writeback_dirty_entries() {
       auto candidate = m_dirty_log_entries.front();
       bool flushable = can_flush_entry(candidate);
       if (flushable) {
-        post_unlock.add(construct_flush_entry_ctx(candidate));
+        entries_to_flush.push_back(candidate);
         flushed++;
+        if (!has_write_entry)
+          has_write_entry = candidate->is_write_entry();
         m_dirty_log_entries.pop_front();
       } else {
         ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
         break;
       }
     }
+
+    construct_flush_entries(entries_to_flush, post_unlock, has_write_entry);
   }
 
   if (all_clean) {
index c982d2631fe3f407e6cd009c10f9f71811e039a0..128c00d99d82a2217b3aff210e63cc4482dcc860 100644 (file)
@@ -388,10 +388,9 @@ protected:
   virtual void persist_last_flushed_sync_gen() {}
   virtual void reserve_cache(C_BlockIORequestT *req, bool &alloc_succeeds,
                              bool &no_space) {}
-  virtual Context *construct_flush_entry_ctx(
-      const std::shared_ptr<pwl::GenericLogEntry> log_entry) {
-    return nullptr;
-  }
+  virtual void construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+                                       DeferredContexts &post_unlock,
+                                       bool has_write_entry) = 0;
   virtual uint64_t get_max_extent() {
     return 0;
   }
index bf4bd1557b031bf2d8b8a9a9a636e700c5d73483..f635015329a6c073f49a1ac8b0f744d3742bf059 100644 (file)
@@ -576,23 +576,27 @@ bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
 }
 
 template <typename I>
-Context* WriteLog<I>::construct_flush_entry_ctx(
-    std::shared_ptr<GenericLogEntry> log_entry) {
+void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+                                         DeferredContexts &post_unlock,
+                                         bool has_write_entry) {
   bool invalidating = this->m_invalidating; // snapshot so we behave consistently
-  Context *ctx = this->construct_flush_entry(log_entry, invalidating);
 
-  if (invalidating) {
-    return ctx;
-  }
-  return new LambdaContext(
-    [this, log_entry, ctx](int r) {
-      m_image_ctx.op_work_queue->queue(new LambdaContext(
+  for (auto &log_entry : entries_to_flush) {
+    Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+    if (!invalidating) {
+      ctx = new LambdaContext(
         [this, log_entry, ctx](int r) {
-          ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                     << " " << *log_entry << dendl;
-          log_entry->writeback(this->m_image_writeback, ctx);
-        }), 0);
-    });
+         m_image_ctx.op_work_queue->queue(new LambdaContext(
+           [this, log_entry, ctx](int r) {
+             ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                        << " " << *log_entry << dendl;
+             log_entry->writeback(this->m_image_writeback, ctx);
+           }), 0);
+        });
+   }
+   post_unlock.add(ctx);
+  }
 }
 
 const unsigned long int ops_flushed_together = 4;
index dabee07d742aa1cf5f7afd6d2e8f3483aa7dda24..5083a2568d497b7dc61168bd25a6b7e6effdecf3 100644 (file)
@@ -102,8 +102,9 @@ protected:
   void setup_schedule_append(
       pwl::GenericLogOperationsVector &ops, bool do_early_flush,
       C_BlockIORequestT *req) override;
-  Context *construct_flush_entry_ctx(
-        const std::shared_ptr<pwl::GenericLogEntry> log_entry) override;
+  void construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+                               DeferredContexts &post_unlock,
+                               bool has_write_entry) override;
   bool initialize_pool(Context *on_finish, pwl::DeferredContexts &later) override;
   void write_data_to_buffer(
       std::shared_ptr<pwl::WriteLogEntry> ws_entry,
index 550123924de0f0698b862c01d1f8e6b31c46ae65..9d887227e96d677d897a6e6aa26ec9c734a68155 100644 (file)
@@ -521,49 +521,87 @@ void WriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops) {
 }
 
 template <typename I>
-Context* WriteLog<I>::construct_flush_entry_ctx(
-    std::shared_ptr<GenericLogEntry> log_entry) {
+void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+                                         DeferredContexts &post_unlock,
+                                         bool has_write_entry) {
   // snapshot so we behave consistently
   bool invalidating = this->m_invalidating;
 
-  Context *ctx = this->construct_flush_entry(log_entry, invalidating);
-
-  if (invalidating) {
-    return ctx;
-  }
-  if (log_entry->is_write_entry()) {
-    bufferlist *read_bl_ptr = new bufferlist;
-    ctx = new LambdaContext(
-      [this, log_entry, read_bl_ptr, ctx](int r) {
-        bufferlist captured_entry_bl;
-        captured_entry_bl.claim_append(*read_bl_ptr);
-        delete read_bl_ptr;
-        m_image_ctx.op_work_queue->queue(new LambdaContext(
-          [this, log_entry, entry_bl=move(captured_entry_bl), ctx](int r) {
-            auto captured_entry_bl = std::move(entry_bl);
-            ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                       << " " << *log_entry << dendl;
-            log_entry->writeback_bl(this->m_image_writeback, ctx,
-                                    std::move(captured_entry_bl));
-          }), 0);
-      });
-    ctx = new LambdaContext(
-      [this, log_entry, read_bl_ptr, ctx](int r) {
-        auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
-        write_entry->inc_bl_refs();
-        aio_read_data_block(std::move(write_entry), read_bl_ptr, ctx);
-      });
-    return ctx;
+  if (invalidating || !has_write_entry) {
+    for (auto &log_entry : entries_to_flush) {
+      Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+      if (!invalidating) {
+        ctx = new LambdaContext(
+        [this, log_entry, ctx](int r) {
+          m_image_ctx.op_work_queue->queue(new LambdaContext(
+           [this, log_entry, ctx](int r) {
+             ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                         << " " << *log_entry << dendl;
+             log_entry->writeback(this->m_image_writeback, ctx);
+           }), 0);
+       });
+      }
+      post_unlock.add(ctx);
+    }
   } else {
-    return new LambdaContext(
-      [this, log_entry, ctx](int r) {
-        m_image_ctx.op_work_queue->queue(new LambdaContext(
-          [this, log_entry, ctx](int r) {
-            ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                       << " " << *log_entry << dendl;
-            log_entry->writeback(this->m_image_writeback, ctx);
-          }), 0);
+    int count = entries_to_flush.size();
+    std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries;
+    std::vector<bufferlist *> read_bls;
+    std::vector<Context *> contexts;
+
+    log_entries.reserve(count);
+    read_bls.reserve(count);
+    contexts.reserve(count);
+
+    for (auto &log_entry : entries_to_flush) {
+      // log_entry already removed from m_dirty_log_entries and
+      // in construct_flush_entry() it will inc(m_flush_ops_in_flight).
+      // We call this func here to make ops can track.
+      Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+      if (log_entry->is_write_entry()) {
+       bufferlist *bl = new bufferlist;
+       auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
+       write_entry->inc_bl_refs();
+       log_entries.push_back(write_entry);
+       read_bls.push_back(bl);
+      }
+      contexts.push_back(ctx);
+    }
+
+    Context *ctx = new LambdaContext(
+      [this, entries_to_flush, read_bls, contexts](int r) {
+        int i = 0, j = 0;
+
+       for (auto &log_entry : entries_to_flush) {
+          Context *ctx = contexts[j++];
+
+         if (log_entry->is_write_entry()) {
+           bufferlist captured_entry_bl;
+
+           captured_entry_bl.claim_append(*read_bls[i]);
+           delete read_bls[i++];
+
+           m_image_ctx.op_work_queue->queue(new LambdaContext(
+             [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
+               auto captured_entry_bl = std::move(entry_bl);
+               ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                          << " " << *log_entry << dendl;
+               log_entry->writeback_bl(this->m_image_writeback, ctx,
+                                       std::move(captured_entry_bl));
+             }), 0);
+         } else {
+             m_image_ctx.op_work_queue->queue(new LambdaContext(
+               [this, log_entry, ctx](int r) {
+                 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                             << " " << *log_entry << dendl;
+                 log_entry->writeback(this->m_image_writeback, ctx);
+               }), 0);
+         }
+       }
       });
+
+    aio_read_data_blocks(log_entries, read_bls, ctx);
   }
 }
 
index 8df2057b6d4ee753ffaeb223c8b9b1c506886549..69cc366628f24333f7f173eb9a9192f9703ab063 100644 (file)
@@ -122,8 +122,9 @@ private:
   bool has_sync_point_logs(GenericLogOperations &ops);
   void append_op_log_entries(GenericLogOperations &ops);
   void alloc_op_log_entries(GenericLogOperations &ops);
-  Context* construct_flush_entry_ctx(
-      std::shared_ptr<GenericLogEntry> log_entry);
+  void construct_flush_entries(pwl::GenericLogEntries entires_to_flush,
+                               DeferredContexts &post_unlock,
+                               bool has_write_entry) override;
   void append_ops(GenericLogOperations &ops, Context *ctx,
                   uint64_t* new_first_free_entry);
   void write_log_entries(GenericLogEntriesVector log_entries,