]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd/cache/pwl: fix reorder when flush cache-data to osd.
authorJianpeng Ma <jianpeng.ma@intel.com>
Mon, 1 Nov 2021 01:25:52 +0000 (09:25 +0800)
committerDeepika Upadhyay <dupadhya@redhat.com>
Fri, 5 Nov 2021 09:22:02 +0000 (14:52 +0530)
Consider the following workload:
writeA(0, 4096)
writeB(0, 512).
pwl can makre sure writeA persist to cache before writeB.
But when flush to osd, it use async-read to read data from cache and in
the callback function they issue write to osd.
So although we by order issue aio-read(4096), aio-read(512). But we
can't make sure the return order.
If aio-read(512) firstly return, the write order to next layer is
writeB(0, 512)
writeA(0, 4096).
This is wrong from the user point.

To avoid this occur, we should firstly read all data from cache. And
then send write by order.

Fiexs: https://tracker.ceph.com/issues/52511

Tested-by: Feng Hualong <hualong.feng@intel.com>
Signed-off-by: Jianpeng Ma <jianpeng.ma@intel.com>
(cherry picked from commit 1fc3be248097eb6087560c22374c1f924bfe0735)

src/librbd/cache/pwl/AbstractWriteLog.cc
src/librbd/cache/pwl/AbstractWriteLog.h
src/librbd/cache/pwl/rwl/WriteLog.cc
src/librbd/cache/pwl/rwl/WriteLog.h
src/librbd/cache/pwl/ssd/WriteLog.cc
src/librbd/cache/pwl/ssd/WriteLog.h

index 7b8aa43025047bed2b9b08f3e3903f10913e40f3..54d4f4d1a8e58d228bd61e161a913972d11efe80 100644 (file)
@@ -1686,13 +1686,16 @@ void AbstractWriteLog<I>::process_writeback_dirty_entries() {
   CephContext *cct = m_image_ctx.cct;
   bool all_clean = false;
   int flushed = 0;
+  bool has_write_entry = false;
 
   ldout(cct, 20) << "Look for dirty entries" << dendl;
   {
     DeferredContexts post_unlock;
+    GenericLogEntries entries_to_flush;
+
     std::shared_lock entry_reader_locker(m_entry_reader_lock);
+    std::lock_guard locker(m_lock);
     while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
-      std::lock_guard locker(m_lock);
       if (m_shutting_down) {
         ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
         /* Do flush complete only when all flush ops are finished */
@@ -1708,14 +1711,18 @@ void AbstractWriteLog<I>::process_writeback_dirty_entries() {
       auto candidate = m_dirty_log_entries.front();
       bool flushable = can_flush_entry(candidate);
       if (flushable) {
-        post_unlock.add(construct_flush_entry_ctx(candidate));
+        entries_to_flush.push_back(candidate);
         flushed++;
+        if (!has_write_entry)
+          has_write_entry = candidate->is_write_entry();
         m_dirty_log_entries.pop_front();
       } else {
         ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
         break;
       }
     }
+
+    construct_flush_entries(entries_to_flush, post_unlock, has_write_entry);
   }
 
   if (all_clean) {
index c982d2631fe3f407e6cd009c10f9f71811e039a0..128c00d99d82a2217b3aff210e63cc4482dcc860 100644 (file)
@@ -388,10 +388,9 @@ protected:
   virtual void persist_last_flushed_sync_gen() {}
   virtual void reserve_cache(C_BlockIORequestT *req, bool &alloc_succeeds,
                              bool &no_space) {}
-  virtual Context *construct_flush_entry_ctx(
-      const std::shared_ptr<pwl::GenericLogEntry> log_entry) {
-    return nullptr;
-  }
+  virtual void construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+                                       DeferredContexts &post_unlock,
+                                       bool has_write_entry) = 0;
   virtual uint64_t get_max_extent() {
     return 0;
   }
index 80a83510c3163dd077705f08cf32b189946343a9..56bf3851c7b98e7ca510f3f0bde181bd2bc26139 100644 (file)
@@ -575,23 +575,27 @@ bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
 }
 
 template <typename I>
-Context* WriteLog<I>::construct_flush_entry_ctx(
-    std::shared_ptr<GenericLogEntry> log_entry) {
+void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+                                         DeferredContexts &post_unlock,
+                                         bool has_write_entry) {
   bool invalidating = this->m_invalidating; // snapshot so we behave consistently
-  Context *ctx = this->construct_flush_entry(log_entry, invalidating);
 
-  if (invalidating) {
-    return ctx;
-  }
-  return new LambdaContext(
-    [this, log_entry, ctx](int r) {
-      m_image_ctx.op_work_queue->queue(new LambdaContext(
+  for (auto &log_entry : entries_to_flush) {
+    Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+    if (!invalidating) {
+      ctx = new LambdaContext(
         [this, log_entry, ctx](int r) {
-          ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                     << " " << *log_entry << dendl;
-          log_entry->writeback(this->m_image_writeback, ctx);
-        }), 0);
-    });
+         m_image_ctx.op_work_queue->queue(new LambdaContext(
+           [this, log_entry, ctx](int r) {
+             ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                        << " " << *log_entry << dendl;
+             log_entry->writeback(this->m_image_writeback, ctx);
+           }), 0);
+        });
+   }
+   post_unlock.add(ctx);
+  }
 }
 
 const unsigned long int ops_flushed_together = 4;
index dabee07d742aa1cf5f7afd6d2e8f3483aa7dda24..5083a2568d497b7dc61168bd25a6b7e6effdecf3 100644 (file)
@@ -102,8 +102,9 @@ protected:
   void setup_schedule_append(
       pwl::GenericLogOperationsVector &ops, bool do_early_flush,
       C_BlockIORequestT *req) override;
-  Context *construct_flush_entry_ctx(
-        const std::shared_ptr<pwl::GenericLogEntry> log_entry) override;
+  void construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+                               DeferredContexts &post_unlock,
+                               bool has_write_entry) override;
   bool initialize_pool(Context *on_finish, pwl::DeferredContexts &later) override;
   void write_data_to_buffer(
       std::shared_ptr<pwl::WriteLogEntry> ws_entry,
index e669dd6ce0020531a7f1eb2f7f0addd14f80d6e7..fda9d8187e176767f3893a55b01cdb178f91e343 100644 (file)
@@ -520,49 +520,87 @@ void WriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops) {
 }
 
 template <typename I>
-Context* WriteLog<I>::construct_flush_entry_ctx(
-    std::shared_ptr<GenericLogEntry> log_entry) {
+void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flush,
+                                         DeferredContexts &post_unlock,
+                                         bool has_write_entry) {
   // snapshot so we behave consistently
   bool invalidating = this->m_invalidating;
 
-  Context *ctx = this->construct_flush_entry(log_entry, invalidating);
-
-  if (invalidating) {
-    return ctx;
-  }
-  if (log_entry->is_write_entry()) {
-    bufferlist *read_bl_ptr = new bufferlist;
-    ctx = new LambdaContext(
-      [this, log_entry, read_bl_ptr, ctx](int r) {
-        bufferlist captured_entry_bl;
-        captured_entry_bl.claim_append(*read_bl_ptr);
-        delete read_bl_ptr;
-        m_image_ctx.op_work_queue->queue(new LambdaContext(
-          [this, log_entry, entry_bl=move(captured_entry_bl), ctx](int r) {
-            auto captured_entry_bl = std::move(entry_bl);
-            ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                       << " " << *log_entry << dendl;
-            log_entry->writeback_bl(this->m_image_writeback, ctx,
-                                    std::move(captured_entry_bl));
-          }), 0);
-      });
-    ctx = new LambdaContext(
-      [this, log_entry, read_bl_ptr, ctx](int r) {
-        auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
-        write_entry->inc_bl_refs();
-        aio_read_data_block(std::move(write_entry), read_bl_ptr, ctx);
-      });
-    return ctx;
+  if (invalidating || !has_write_entry) {
+    for (auto &log_entry : entries_to_flush) {
+      Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+      if (!invalidating) {
+        ctx = new LambdaContext(
+        [this, log_entry, ctx](int r) {
+          m_image_ctx.op_work_queue->queue(new LambdaContext(
+           [this, log_entry, ctx](int r) {
+             ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                         << " " << *log_entry << dendl;
+             log_entry->writeback(this->m_image_writeback, ctx);
+           }), 0);
+       });
+      }
+      post_unlock.add(ctx);
+    }
   } else {
-    return new LambdaContext(
-      [this, log_entry, ctx](int r) {
-        m_image_ctx.op_work_queue->queue(new LambdaContext(
-          [this, log_entry, ctx](int r) {
-            ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                       << " " << *log_entry << dendl;
-            log_entry->writeback(this->m_image_writeback, ctx);
-          }), 0);
+    int count = entries_to_flush.size();
+    std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries;
+    std::vector<bufferlist *> read_bls;
+    std::vector<Context *> contexts;
+
+    log_entries.reserve(count);
+    read_bls.reserve(count);
+    contexts.reserve(count);
+
+    for (auto &log_entry : entries_to_flush) {
+      // log_entry already removed from m_dirty_log_entries and
+      // in construct_flush_entry() it will inc(m_flush_ops_in_flight).
+      // We call this func here to make ops can track.
+      Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+      if (log_entry->is_write_entry()) {
+       bufferlist *bl = new bufferlist;
+       auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
+       write_entry->inc_bl_refs();
+       log_entries.push_back(write_entry);
+       read_bls.push_back(bl);
+      }
+      contexts.push_back(ctx);
+    }
+
+    Context *ctx = new LambdaContext(
+      [this, entries_to_flush, read_bls, contexts](int r) {
+        int i = 0, j = 0;
+
+       for (auto &log_entry : entries_to_flush) {
+          Context *ctx = contexts[j++];
+
+         if (log_entry->is_write_entry()) {
+           bufferlist captured_entry_bl;
+
+           captured_entry_bl.claim_append(*read_bls[i]);
+           delete read_bls[i++];
+
+           m_image_ctx.op_work_queue->queue(new LambdaContext(
+             [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
+               auto captured_entry_bl = std::move(entry_bl);
+               ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                          << " " << *log_entry << dendl;
+               log_entry->writeback_bl(this->m_image_writeback, ctx,
+                                       std::move(captured_entry_bl));
+             }), 0);
+         } else {
+             m_image_ctx.op_work_queue->queue(new LambdaContext(
+               [this, log_entry, ctx](int r) {
+                 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                             << " " << *log_entry << dendl;
+                 log_entry->writeback(this->m_image_writeback, ctx);
+               }), 0);
+         }
+       }
       });
+
+    aio_read_data_blocks(log_entries, read_bls, ctx);
   }
 }
 
index d96436c988510bac9f0dc3bcdcb0dad043afed39..5a0b2305d98bce42e5136aceae678aa57eb0131a 100644 (file)
@@ -122,8 +122,9 @@ private:
   bool has_sync_point_logs(GenericLogOperations &ops);
   void append_op_log_entries(GenericLogOperations &ops);
   void alloc_op_log_entries(GenericLogOperations &ops);
-  Context* construct_flush_entry_ctx(
-      std::shared_ptr<GenericLogEntry> log_entry);
+  void construct_flush_entries(pwl::GenericLogEntries entires_to_flush,
+                               DeferredContexts &post_unlock,
+                               bool has_write_entry) override;
   void append_ops(GenericLogOperations &ops, Context *ctx,
                   uint64_t* new_first_free_entry);
   void write_log_entries(GenericLogEntriesVector log_entries,