]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd/cache/pwl: Using BlockGuard control overlap ops order when flush to osd.
authorJianpeng Ma <jianpeng.ma@intel.com>
Mon, 29 Nov 2021 07:16:21 +0000 (15:16 +0800)
committerJianpeng Ma <jianpeng.ma@intel.com>
Mon, 29 Nov 2021 07:16:52 +0000 (15:16 +0800)
In process of tests, we met some inconsistent-data problem. Test case
mainly use write,then discard to detect data consistent.
W/o pwl, write/discard are synchronous ops. After write, data already
located into osd. But w/ pwl, we use asynchronous api to send ops to
osd.
Although we mare sure send order. But send-order don't makre sure
complete order. This mean pwl keep order of write/discard. But it
don't keep the same semantics which use synchronous api. W/ pwl, it make
synchronous to asynchronous. For normal ops, it's not problem. But if
connected-commands w/ overlap, it make data inconsistent.
So we use BlockGuard to solve this issue.

Fixes: https://tracker.ceph.com/issues/49876
Fixes: https://tracker.ceph.com/issues/53108
Signed-off-by: Jianpeng Ma <jianpeng.ma@intel.com>
src/librbd/cache/pwl/AbstractWriteLog.cc
src/librbd/cache/pwl/AbstractWriteLog.h
src/librbd/cache/pwl/LogEntry.h
src/librbd/cache/pwl/rwl/WriteLog.cc
src/librbd/cache/pwl/ssd/WriteLog.cc

index 7f61ce47ab2ffad082169fa7a5392bb5225b4b8b..981571bbec9cc8d7410175f439f2a776fb91b32e 100644 (file)
@@ -44,6 +44,9 @@ AbstractWriteLog<I>::AbstractWriteLog(
     plugin::Api<I>& plugin_api)
   : m_builder(builder),
     m_write_log_guard(image_ctx.cct),
+    m_flush_guard(image_ctx.cct),
+    m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name(
+      "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))),
     m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name(
       "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
     m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name(
@@ -1627,21 +1630,34 @@ bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_e
 }
 
 template <typename I>
-Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
-                                                      bool invalidating) {
-  CephContext *cct = m_image_ctx.cct;
+void AbstractWriteLog<I>::detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
+                                                    GuardedRequestFunctionContext *guarded_ctx) {
+  ldout(m_image_ctx.cct, 20) << dendl;
 
-  ldout(cct, 20) << "" << dendl;
-  ceph_assert(m_entry_reader_lock.is_locked());
-  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
-  if (!m_flush_ops_in_flight ||
-      (log_entry->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
-    m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
+  BlockExtent extent;
+  if (log_entry->is_sync_point()) {
+    extent = block_extent(whole_volume_extent());
+  } else {
+    extent = log_entry->ram_entry.block_extent();
   }
-  m_flush_ops_in_flight += 1;
-  m_flush_ops_will_send += 1;
-  /* For write same this is the bytes affected by the flush op, not the bytes transferred */
-  m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
+
+  auto req = GuardedRequest(extent, guarded_ctx, false);
+  BlockGuardCell *cell = nullptr;
+
+  {
+    std::lock_guard locker(m_flush_guard_lock);
+    m_flush_guard.detain(req.block_extent, &req, &cell);
+  }
+  if (cell) {
+    req.guard_ctx->cell = cell;
+    m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
+  }
+}
+
+template <typename I>
+Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
+                                                      bool invalidating) {
+  ldout(m_image_ctx.cct, 20) << "" << dendl;
 
   /* Flush write completion action */
   utime_t writeback_start_time = ceph_clock_now();
@@ -1672,7 +1688,24 @@ Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEn
     });
   /* Flush through lower cache before completing */
   ctx = new LambdaContext(
-    [this, ctx](int r) {
+    [this, ctx, log_entry](int r) {
+      {
+
+        WriteLogGuard::BlockOperations block_reqs;
+       BlockGuardCell *detained_cell = nullptr;
+
+       std::lock_guard locker{m_flush_guard_lock};
+       m_flush_guard.release(log_entry->m_cell, &block_reqs);
+
+       for (auto &req : block_reqs) {
+         m_flush_guard.detain(req.block_extent, &req, &detained_cell);
+         if (detained_cell) {
+           req.guard_ctx->cell = detained_cell;
+           m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
+         }
+        }
+      }
+
       if (r < 0) {
         lderr(m_image_ctx.cct) << "failed to flush log entry"
                                << cpp_strerror(r) << dendl;
@@ -1724,6 +1757,18 @@ void AbstractWriteLog<I>::process_writeback_dirty_entries() {
         if (!has_write_entry)
           has_write_entry = candidate->is_write_entry();
         m_dirty_log_entries.pop_front();
+
+       // To track candidate, we should add m_flush_ops_in_flight in here
+       {
+         if (!m_flush_ops_in_flight ||
+             (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
+           m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number;
+         }
+         m_flush_ops_in_flight += 1;
+         m_flush_ops_will_send += 1;
+         /* For write same this is the bytes affected by the flush op, not the bytes transferred */
+         m_flush_bytes_in_flight += candidate->ram_entry.write_bytes;
+       }
       } else {
         ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
         break;
index 864f1e8e6313b0e89237cc736bd6094466dd9300..e66562234026ea498ce51a809ed46d4d4c28bddd 100644 (file)
@@ -178,6 +178,9 @@ private:
 
   bool m_persist_on_write_until_flush = true;
 
+  pwl::WriteLogGuard m_flush_guard;
+  mutable ceph::mutex m_flush_guard_lock;
+
  /* Debug counters for the places m_async_op_tracker is used */
   std::atomic<int> m_async_complete_ops = {0};
   std::atomic<int> m_async_null_flush_finish = {0};
@@ -225,7 +228,6 @@ private:
   void detain_guarded_request(C_BlockIORequestT *request,
                               pwl::GuardedRequestFunctionContext *guarded_ctx,
                               bool is_barrier);
-
   void perf_start(const std::string name);
   void perf_stop();
   void log_perf();
@@ -348,6 +350,8 @@ protected:
       std::shared_ptr<pwl::GenericLogEntry> log_entry) = 0;
   Context *construct_flush_entry(
       const std::shared_ptr<pwl::GenericLogEntry> log_entry, bool invalidating);
+  void detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
+                                  GuardedRequestFunctionContext *guarded_ctx);
   void process_writeback_dirty_entries();
   bool can_retire_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
 
index a5889a13bc79d9463d2e5918d1c27707215fcd8b..78eb4a6de6acb14c3ed119f9772104e12571d547 100644 (file)
@@ -27,6 +27,7 @@ public:
   WriteLogCacheEntry *cache_entry = nullptr;
   uint64_t log_entry_index = 0;
   bool completed = false;
+  BlockGuardCell* m_cell = nullptr;
   GenericLogEntry(uint64_t image_offset_bytes = 0, uint64_t write_bytes = 0)
     : ram_entry(image_offset_bytes, write_bytes) {
   };
index 22c8e8ee668e9e5dfef8f5ec88c30b32efaa853a..d5b5f712bb1996deaeb05453d441729c48ae777c 100644 (file)
@@ -582,21 +582,28 @@ void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flus
   bool invalidating = this->m_invalidating; // snapshot so we behave consistently
 
   for (auto &log_entry : entries_to_flush) {
-    Context *ctx = this->construct_flush_entry(log_entry, invalidating);
-
-    if (!invalidating) {
-      ctx = new LambdaContext(
-        [this, log_entry, ctx](int r) {
-         m_image_ctx.op_work_queue->queue(new LambdaContext(
-           [this, log_entry, ctx](int r) {
-             ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                        << " " << *log_entry << dendl;
-             log_entry->writeback(this->m_image_writeback, ctx);
-             this->m_flush_ops_will_send -= 1;
-           }), 0);
-        });
-   }
-   post_unlock.add(ctx);
+    GuardedRequestFunctionContext *guarded_ctx =
+      new GuardedRequestFunctionContext([this, log_entry, invalidating]
+        (GuardedRequestFunctionContext &guard_ctx) {
+          log_entry->m_cell = guard_ctx.cell;
+          Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+         if (!invalidating) {
+           ctx = new LambdaContext(
+             [this, log_entry, ctx](int r) {
+             m_image_ctx.op_work_queue->queue(new LambdaContext(
+               [this, log_entry, ctx](int r) {
+                 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                            << " " << *log_entry << dendl;
+                 log_entry->writeback(this->m_image_writeback, ctx);
+                 this->m_flush_ops_will_send -= 1;
+             }), 0);
+         });
+       }
+
+       ctx->complete(0);
+    });
+   this->detain_flush_guard_request(log_entry, guarded_ctx);
   }
 }
 
index 18ac2adffddb76ac812d9574ce014180d111d502..3046a7edf6137daa96c0ba109d85c9087855874c 100644 (file)
@@ -552,82 +552,90 @@ void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flus
 
   if (invalidating || !has_write_entry) {
     for (auto &log_entry : entries_to_flush) {
-      Context *ctx = this->construct_flush_entry(log_entry, invalidating);
-
-      if (!invalidating) {
-        ctx = new LambdaContext(
-        [this, log_entry, ctx](int r) {
-          m_image_ctx.op_work_queue->queue(new LambdaContext(
-           [this, log_entry, ctx](int r) {
-             ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                         << " " << *log_entry << dendl;
-             log_entry->writeback(this->m_image_writeback, ctx);
-             this->m_flush_ops_will_send -= 1;
-           }), 0);
-       });
-      }
-      post_unlock.add(ctx);
+      GuardedRequestFunctionContext *guarded_ctx =
+        new GuardedRequestFunctionContext([this, log_entry, invalidating]
+          (GuardedRequestFunctionContext &guard_ctx) {
+            log_entry->m_cell = guard_ctx.cell;
+            Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+            if (!invalidating) {
+              ctx = new LambdaContext([this, log_entry, ctx](int r) {
+                m_image_ctx.op_work_queue->queue(new LambdaContext(
+                 [this, log_entry, ctx](int r) {
+                   ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                               << " " << *log_entry << dendl;
+                   log_entry->writeback(this->m_image_writeback, ctx);
+                   this->m_flush_ops_will_send -= 1;
+                 }), 0);
+             });
+            }
+            ctx->complete(0);
+        });
+      this->detain_flush_guard_request(log_entry, guarded_ctx);
     }
   } else {
     int count = entries_to_flush.size();
-    std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries;
+    std::vector<std::shared_ptr<GenericWriteLogEntry>> write_entries;
     std::vector<bufferlist *> read_bls;
-    std::vector<Context *> contexts;
 
-    log_entries.reserve(count);
+    write_entries.reserve(count);
     read_bls.reserve(count);
-    contexts.reserve(count);
 
     for (auto &log_entry : entries_to_flush) {
-      // log_entry already removed from m_dirty_log_entries and
-      // in construct_flush_entry() it will inc(m_flush_ops_in_flight).
-      // We call this func here to make ops can track.
-      Context *ctx = this->construct_flush_entry(log_entry, invalidating);
       if (log_entry->is_write_entry()) {
        bufferlist *bl = new bufferlist;
        auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
        write_entry->inc_bl_refs();
-       log_entries.push_back(write_entry);
+       write_entries.push_back(write_entry);
        read_bls.push_back(bl);
       }
-      contexts.push_back(ctx);
     }
 
     Context *ctx = new LambdaContext(
-      [this, entries_to_flush, read_bls, contexts](int r) {
-        int i = 0, j = 0;
+      [this, entries_to_flush, read_bls](int r) {
+        int i = 0;
+       GuardedRequestFunctionContext *guarded_ctx = nullptr;
 
        for (auto &log_entry : entries_to_flush) {
-          Context *ctx = contexts[j++];
-
          if (log_entry->is_write_entry()) {
            bufferlist captured_entry_bl;
-
            captured_entry_bl.claim_append(*read_bls[i]);
            delete read_bls[i++];
 
-           m_image_ctx.op_work_queue->queue(new LambdaContext(
-             [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
-               auto captured_entry_bl = std::move(entry_bl);
-               ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                          << " " << *log_entry << dendl;
-               log_entry->writeback_bl(this->m_image_writeback, ctx,
-                                       std::move(captured_entry_bl));
-               this->m_flush_ops_will_send -= 1;
-             }), 0);
+           guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl]
+              (GuardedRequestFunctionContext &guard_ctx) {
+                log_entry->m_cell = guard_ctx.cell;
+                Context *ctx = this->construct_flush_entry(log_entry, false);
+
+               m_image_ctx.op_work_queue->queue(new LambdaContext(
+                 [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
+                   auto captured_entry_bl = std::move(entry_bl);
+                   ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                              << " " << *log_entry << dendl;
+                   log_entry->writeback_bl(this->m_image_writeback, ctx,
+                                            std::move(captured_entry_bl));
+                   this->m_flush_ops_will_send -= 1;
+                 }), 0);
+             });
          } else {
-             m_image_ctx.op_work_queue->queue(new LambdaContext(
-               [this, log_entry, ctx](int r) {
-                 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                             << " " << *log_entry << dendl;
-                 log_entry->writeback(this->m_image_writeback, ctx);
-                 this->m_flush_ops_will_send -= 1;
-               }), 0);
+           guarded_ctx = new GuardedRequestFunctionContext([this, log_entry]
+              (GuardedRequestFunctionContext &guard_ctx) {
+                log_entry->m_cell = guard_ctx.cell;
+                Context *ctx = this->construct_flush_entry(log_entry, false);
+               m_image_ctx.op_work_queue->queue(new LambdaContext(
+                 [this, log_entry, ctx](int r) {
+                   ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                               << " " << *log_entry << dendl;
+                   log_entry->writeback(this->m_image_writeback, ctx);
+                   this->m_flush_ops_will_send -= 1;
+                 }), 0);
+            });
          }
+          this->detain_flush_guard_request(log_entry, guarded_ctx);
        }
       });
 
-    aio_read_data_blocks(log_entries, read_bls, ctx);
+    aio_read_data_blocks(write_entries, read_bls, ctx);
   }
 }