]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore/bluefs: Refactor flush_and_sync_log
authorAdam Kupczyk <akupczyk@redhat.com>
Sat, 5 Jun 2021 06:55:14 +0000 (08:55 +0200)
committerIgor Fedotov <igor.fedotov@croit.io>
Tue, 27 Jun 2023 10:51:24 +0000 (13:51 +0300)
This refactor prepares flush_and_sync_log and compact_log_async for fine-grain locks in BlueFS.
There is no new logic introduced, but refactor is accompanied by some new comments.

Signed-off-by: Adam Kupczyk <akupczyk@redhat.com>
(cherry picked from commit 05703cccee205ef4541448917a9926d55bb57274)

src/os/bluestore/BlueFS.cc
src/os/bluestore/BlueFS.h

index f462fa886481fb91340bc5ac16958330b9702e92..bf52f5cefc3badd5b09feb5896661a71e9969e4d 100644 (file)
@@ -2378,6 +2378,7 @@ void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
 
   // 1. allocate new log space and jump to it.
   old_log_jump_to = log_file->fnode.get_allocated();
+  uint64_t runway = log_file->fnode.get_allocated() - log_writer->get_effective_write_pos();
   dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
            << " need 0x" << (old_log_jump_to + cct->_conf->bluefs_max_log_runway) << std::dec << dendl;
   int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint),
@@ -2393,9 +2394,12 @@ void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
   log_t.op_file_update(log_file->fnode);
   log_t.op_jump(log_seq, old_log_jump_to);
 
-  flush_bdev();  // FIXME?
+  // we need to flush all bdev because we will be streaming all dirty files to log
+  // TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations
+  // then flush_bdev() will not be necessary
+  flush_bdev();
 
-  _flush_and_sync_log(l, 0, old_log_jump_to);
+  _flush_and_sync_log_jump(old_log_jump_to, runway);
 
   // 2. prepare compacted log
   bluefs_transaction_t t;
@@ -2527,38 +2531,17 @@ void BlueFS::_pad_bl(bufferlist& bl)
   }
 }
 
-
-int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
-                               uint64_t want_seq,
-                               uint64_t jump_to)
+// Adds to log_t file modifications mentioned in `dirty_files`.
+// Note: some bluefs ops may have already been stored in log_t transaction.
+uint64_t BlueFS::_consume_dirty()
 {
-  while (log_flushing) {
-    dout(10) << __func__ << " want_seq " << want_seq
-            << " log is currently flushing, waiting" << dendl;
-    ceph_assert(!jump_to);
-    log_cond.wait(l);
-  }
-  if (want_seq && want_seq <= log_seq_stable) {
-    dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
-            << log_seq_stable << ", done" << dendl;
-    ceph_assert(!jump_to);
-    return 0;
-  }
-  if (log_t.empty() && dirty_files.empty()) {
-    dout(10) << __func__ << " want_seq " << want_seq
-            << " " << log_t << " not dirty, dirty_files empty, no-op" << dendl;
-    ceph_assert(!jump_to);
-    return 0;
-  }
-
-  vector<interval_set<uint64_t>> to_release(pending_release.size());
-  to_release.swap(pending_release);
-
+  //acquire new seq
+  // this will became log_seq_stable once we write
   uint64_t seq = log_t.seq = ++log_seq;
-  ceph_assert(want_seq == 0 || want_seq <= seq);
   log_t.uuid = super.uuid;
 
   // log dirty files
+  // we just incremented log_seq. It is now illegal to add to dirty_files[log_seq]
   auto lsi = dirty_files.find(seq);
   if (lsi != dirty_files.end()) {
     dout(20) << __func__ << " " << lsi->second.size() << " dirty_files" << dendl;
@@ -2567,21 +2550,37 @@ int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
       log_t.op_file_update_inc(f.fnode);
     }
   }
+  return seq;
+}
 
-  dout(10) << __func__ << " " << log_t << dendl;
-  ceph_assert(!log_t.empty());
-
+// Extends log if its free space is smaller then bluefs_min_log_runway.
+// Returns space available *BEFORE* adding new space. Signed for additional <0 detection.
+int64_t BlueFS::_maybe_extend_log()
+{
   // allocate some more space (before we run out)?
   // BTW: this triggers `flush()` in the `page_aligned_appender` of `log_writer`.
   int64_t runway = log_writer->file->fnode.get_allocated() -
     log_writer->get_effective_write_pos();
-  bool just_expanded_log = false;
   if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) {
     dout(10) << __func__ << " allocating more log runway (0x"
             << std::hex << runway << std::dec  << " remaining)" << dendl;
-    while (new_log_writer) {
-      dout(10) << __func__ << " waiting for async compaction" << dendl;
-      log_cond.wait(l);
+    /*
+     * Usually, when we are low on space in log, we just allocate new extent,
+     * put update op(log) to log and we are fine.
+     * Problem - it interferes with log compaction:
+     * New log produced in compaction will include - as last op - jump into some offset (anchor) of current log.
+     * It is assumed that log region (anchor - end) will contain all changes made by bluefs since
+     * full state capture into new log.
+     * Putting log update into (anchor - end) region is illegal, because any update there must be compatible with
+     * both logs, but old log is different then new log.
+     *
+     * Possible solutions:
+     * - stall extending log until we finish compacting and switch log (CURRENT)
+     * - re-run compaction with more runway for old log
+     * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs
+     */
+    if (new_log_writer) {
+      return -EWOULDBLOCK;
     }
     vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
     int r = _allocate(
@@ -2591,8 +2590,13 @@ int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
     ceph_assert(r == 0);
     vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
     log_t.op_file_update_inc(log_writer->file->fnode);
-    just_expanded_log = true;
   }
+  return runway;
+}
+
+void BlueFS::_flush_and_sync_log_core(int64_t runway)
+{
+  dout(10) << __func__ << " " << log_t << dendl;
 
   bufferlist bl;
   bl.reserve(super.block_size);
@@ -2604,38 +2608,29 @@ int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
 
   logger->inc(l_bluefs_logged_bytes, bl.length());
 
-  if (just_expanded_log) {
+  if (true) {
     ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss
+                                        // transaction will not fit extents before growth -> data loss on _replay
   }
 
   log_writer->append(bl);
 
   log_t.clear();
   log_t.seq = 0;  // just so debug output is less confusing
-  log_flushing = true;
 
   int r = _flush(log_writer, true);
   ceph_assert(r == 0);
+}
 
-  if (jump_to) {
-    dout(10) << __func__ << " jumping log offset from 0x" << std::hex
-            << log_writer->pos << " -> 0x" << jump_to << std::dec << dendl;
-    log_writer->pos = jump_to;
-    vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
-    log_writer->file->fnode.size = jump_to;
-    vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
-  }
-
-  _flush_bdev_safely(log_writer);
-
-  log_flushing = false;
-  log_cond.notify_all();
-
+// Clears dirty_files up to (including) seq_stable.
+void BlueFS::_clear_dirty_set_stable(uint64_t seq)
+{
   // clean dirty files
   if (seq > log_seq_stable) {
     log_seq_stable = seq;
     dout(20) << __func__ << " log_seq_stable " << log_seq_stable << dendl;
 
+    // undirty all files that were already streamed to log
     auto p = dirty_files.begin();
     while (p != dirty_files.end()) {
       if (p->first > log_seq_stable) {
@@ -2661,7 +2656,10 @@ int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
              << " already >= out seq " << seq
              << ", we lost a race against another log flush, done" << dendl;
   }
+}
 
+void BlueFS::_release_pending_allocations(vector<interval_set<uint64_t>>& to_release)
+{
   for (unsigned i = 0; i < to_release.size(); ++i) {
     if (!to_release[i].empty()) {
       /* OK, now we have the guarantee alloc[i] won't be null. */
@@ -2681,9 +2679,67 @@ int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
       }
     }
   }
+}
+
+int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
+                              uint64_t want_seq)
+{
+  if (want_seq && want_seq <= log_seq_stable) {
+    dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
+            << log_seq_stable << ", done" << dendl;
+    return 0;
+  }
+  int64_t available_runway;
+  do {
+    available_runway = _maybe_extend_log();
+    if (available_runway == -EWOULDBLOCK) {
+      while (new_log_writer) {
+       dout(10) << __func__ << " waiting for async compaction" << dendl;
+       log_cond.wait(l);
+      }
+    }
+  } while (available_runway == -EWOULDBLOCK);
+
+  ceph_assert(want_seq == 0 || want_seq <= log_seq + 1); // illegal to request seq that was not created yet
+
+  uint64_t seq = _consume_dirty();
+  vector<interval_set<uint64_t>> to_release(pending_release.size());
+  to_release.swap(pending_release);
+
+  _flush_and_sync_log_core(available_runway);
+  _flush_bdev_safely(log_writer);
+
+  _clear_dirty_set_stable(seq);
+  _release_pending_allocations(to_release);
 
   _update_logger_stats();
+  return 0;
+}
+
+// Flushes log and immediately adjusts log_writer pos.
+int BlueFS::_flush_and_sync_log_jump(uint64_t jump_to,
+                                    int64_t available_runway)
+{
+  ceph_assert(jump_to);
+  uint64_t seq = _consume_dirty();
+  vector<interval_set<uint64_t>> to_release(pending_release.size());
+  to_release.swap(pending_release);
+
+  _flush_and_sync_log_core(available_runway);
+
+  dout(10) << __func__ << " jumping log offset from 0x" << std::hex
+          << log_writer->pos << " -> 0x" << jump_to << std::dec << dendl;
+  log_writer->pos = jump_to;
+  vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
+  log_writer->file->fnode.size = jump_to;
+  vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
 
+  _flush_bdev_safely(log_writer);
+
+  _clear_dirty_set_stable(seq);
+  _release_pending_allocations(to_release);
+
+  _update_logger_stats();
   return 0;
 }
 
index 988c7e37f9e98a184f47f906291f6bb5451228f1..3a162b402e09bcba9ee052e0c9db63b07276d99e 100644 (file)
@@ -398,9 +398,18 @@ private:
   void wait_for_aio(FileWriter *h);  // safe to call without a lock
 #endif
 
+  int64_t _maybe_extend_log();
+  void _extend_log();
+  uint64_t _consume_dirty();
+  void _clear_dirty_set_stable(uint64_t seq_stable);
+  void _release_pending_allocations(std::vector<interval_set<uint64_t>>& to_release);
+
+  void _flush_and_sync_log_core(int64_t available_runway);
+  int _flush_and_sync_log_jump(uint64_t jump_to,
+                              int64_t available_runway);
   int _flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
-                         uint64_t want_seq = 0,
-                         uint64_t jump_to = 0);
+                         uint64_t want_seq = 0);
+
   uint64_t _estimate_log_size();
   bool _should_compact_log();