]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: fix bluefs log runway enospc 50216/head
authorPere Diaz Bou <pere-altea@hotmail.com>
Tue, 25 Jul 2023 15:28:14 +0000 (17:28 +0200)
committerPere Diaz Bou <pere-altea@hotmail.com>
Fri, 11 Aug 2023 08:38:57 +0000 (10:38 +0200)
With these changes, every call to log compaction will try to expand its
runway in case of insufficient log space. async compaction will ignore
the `log_forbidden_to_expand` atomic since we know it should't be
harmful. In any other case, expansion of log will wait until compaction
is completed.

in order to ensure op_file_update_inc fits on disk we increase the size
of logs as previously used in _maybe_extend_log. This means we too bring
back _maybe_extend_log with a different usage.

_maybe_extend_log increases the size of the log if the runway is less
than the min runway and if the current transaction is too big to fit.

Fixes: https://tracker.ceph.com/issues/58759
Signed-off-by: Pere Diaz Bou <pere-altea@hotmail.com>
src/os/bluestore/BlueFS.cc
src/os/bluestore/BlueFS.h
src/os/bluestore/bluefs_types.cc
src/os/bluestore/bluefs_types.h
src/test/objectstore/test_bluefs.cc

index 8454ddaf8078c01fe62158014a57a97ba5beb1c4..19953b130d62a4dc739949d6c797ca4b3283a7f7 100644 (file)
@@ -20,15 +20,12 @@ using TOPNSPC::common::cmd_getval;
 
 using std::byte;
 using std::list;
-using std::make_pair;
 using std::map;
 using std::ostream;
-using std::pair;
 using std::set;
 using std::string;
 using std::to_string;
 using std::vector;
-using std::chrono::duration;
 using std::chrono::seconds;
 
 using ceph::bufferlist;
@@ -2745,14 +2742,19 @@ void BlueFS::_compact_log_async_LD_LNF_D() //also locks FW for new_writer
   // Part 0.
   // Lock the log and forbid its expansion and other compactions
 
+  // lock log's run-time structures for a while
+  log.lock.lock();
+
+  // Extend log in case of having a big transaction waiting before starting compaction.
+  _maybe_extend_log();
+
   // only one compaction allowed at one time
   bool old_is_comp = std::atomic_exchange(&log_is_compacting, true);
   if (old_is_comp) {
     dout(10) << __func__ << " ongoing" <<dendl;
+    log.lock.unlock();
     return;
   }
-  // lock log's run-time structures for a while
-  log.lock.lock();
   auto t0 = mono_clock::now();
 
   // Part 1.
@@ -2763,7 +2765,7 @@ void BlueFS::_compact_log_async_LD_LNF_D() //also locks FW for new_writer
   // During that, no one else can write to log, otherwise we risk jumping backwards.
   // We need to sync log, because we are injecting discontinuity, and writer is not prepared for that.
 
-  //signal _maybe_extend_log that expansion of log is temporary inacceptable
+  //signal _extend_log that expansion of log is temporary inacceptable
   bool old_forbidden = atomic_exchange(&log_forbidden_to_expand, true);
   ceph_assert(old_forbidden == false);
 
@@ -2779,9 +2781,9 @@ void BlueFS::_compact_log_async_LD_LNF_D() //also locks FW for new_writer
 
   // 1.1 allocate new log extents and store them at fnode_tail
   File *log_file = log.writer->file.get();
+
   old_log_jump_to = log_file->fnode.get_allocated();
   bluefs_fnode_t fnode_tail;
-  uint64_t runway = log_file->fnode.get_allocated() - log.writer->get_effective_write_pos();
   dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
            << " need 0x" << cct->_conf->bluefs_max_log_runway << std::dec << dendl;
   int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint),
@@ -2809,7 +2811,7 @@ void BlueFS::_compact_log_async_LD_LNF_D() //also locks FW for new_writer
   // TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations
   // then flush_bdev() will not be necessary
   _flush_bdev();
-  _flush_and_sync_log_jump_D(old_log_jump_to, runway);
+  _flush_and_sync_log_jump_D(old_log_jump_to);
 
   //
   // Part 2.
@@ -3054,54 +3056,68 @@ void BlueFS::_consume_dirty(uint64_t seq)
   }
 }
 
-// Extends log if its free space is smaller then bluefs_min_log_runway.
-// Returns space available *BEFORE* adding new space. Signed for additional <0 detection.
-int64_t BlueFS::_maybe_extend_log()
-{
+int64_t BlueFS::_maybe_extend_log() {
+  uint64_t runway = log.writer->file->fnode.get_allocated() - log.writer->get_effective_write_pos();
+  // increasing the size of the log involves adding a OP_FILE_UPDATE_INC which its size will 
+  // increase with respect the number of extents. bluefs_min_log_runway should cover the max size 
+  // a log can get.
+  // inject new allocation in case log is too big
+  size_t expected_log_size = 0;
+  log.t.bound_encode(expected_log_size);
+  if (expected_log_size + cct->_conf->bluefs_min_log_runway > runway) {
+    _extend_log(expected_log_size + cct->_conf->bluefs_max_log_runway);
+  } else if (runway < cct->_conf->bluefs_min_log_runway) {
+    _extend_log(cct->_conf->bluefs_max_log_runway);
+  }
+  runway = log.writer->file->fnode.get_allocated() - log.writer->get_effective_write_pos();
+  return runway;
+}
+
+void BlueFS::_extend_log(uint64_t amount) {
   ceph_assert(ceph_mutex_is_locked(log.lock));
-  // allocate some more space (before we run out)?
-  // BTW: this triggers `flush()` in the `page_aligned_appender` of `log.writer`.
-  int64_t runway = log.writer->file->fnode.get_allocated() -
-    log.writer->get_effective_write_pos();
-  if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) {
-    dout(10) << __func__ << " allocating more log runway (0x"
-            << std::hex << runway << std::dec  << " remaining)" << dendl;
-    /*
-     * Usually, when we are low on space in log, we just allocate new extent,
-     * put update op(log) to log and we are fine.
-     * Problem - it interferes with log compaction:
-     * New log produced in compaction will include - as last op - jump into some offset (anchor) of current log.
-     * It is assumed that log region (anchor - end) will contain all changes made by bluefs since
-     * full state capture into new log.
-     * Putting log update into (anchor - end) region is illegal, because any update there must be compatible with
-     * both logs, but old log is different then new log.
-     *
-     * Possible solutions:
-     * - stall extending log until we finish compacting and switch log (CURRENT)
-     * - re-run compaction with more runway for old log
-     * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs
-     */
-    if (log_forbidden_to_expand.load() == true) {
-      return -EWOULDBLOCK;
-    }
-    vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
-    int r = _allocate(
+  std::unique_lock<ceph::mutex> ll(log.lock, std::adopt_lock);
+  while (log_forbidden_to_expand.load() == true) {
+    log_cond.wait(ll);
+  }
+  ll.release();
+  uint64_t allocated_before_extension = log.writer->file->fnode.get_allocated();
+  vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
+  amount = round_up_to(amount, super.block_size);
+  int r = _allocate(
       vselector->select_prefer_bdev(log.writer->file->vselector_hint),
-      cct->_conf->bluefs_max_log_runway,
+      amount,
       0,
       &log.writer->file->fnode);
-    ceph_assert(r == 0);
-    vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
-    log.t.op_file_update_inc(log.writer->file->fnode);
+  ceph_assert(r == 0);
+  dout(10) << "extended log by 0x" << std::hex << amount << " bytes " << dendl;
+  vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
+
+  bluefs_transaction_t log_extend_transaction;
+  log_extend_transaction.seq = log.t.seq;
+  log_extend_transaction.uuid = log.t.uuid;
+  log_extend_transaction.op_file_update_inc(log.writer->file->fnode);
+
+  bufferlist bl;
+  bl.reserve(super.block_size);
+  encode(log_extend_transaction, bl);
+  _pad_bl(bl, super.block_size);
+  log.writer->append(bl);
+  ceph_assert(allocated_before_extension >= log.writer->get_effective_write_pos());
+  log.t.seq = log.seq_live;
+
+  // before sync_core we advance the seq
+  {
+    std::unique_lock<ceph::mutex> l(dirty.lock);
+    _log_advance_seq();
   }
-  return runway;
 }
 
-void BlueFS::_flush_and_sync_log_core(int64_t runway)
+void BlueFS::_flush_and_sync_log_core()
 {
   ceph_assert(ceph_mutex_is_locked(log.lock));
   dout(10) << __func__ << " " << log.t << dendl;
 
+
   bufferlist bl;
   bl.reserve(super.block_size);
   encode(log.t, bl);
@@ -3113,10 +3129,11 @@ void BlueFS::_flush_and_sync_log_core(int64_t runway)
   logger->inc(l_bluefs_log_write_count, 1);
   logger->inc(l_bluefs_logged_bytes, bl.length());
 
-  if (true) {
-    ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss
-                                        // transaction will not fit extents before growth -> data loss on _replay
-  }
+  uint64_t runway = log.writer->file->fnode.get_allocated() - log.writer->get_effective_write_pos();
+  // ensure runway is big enough, this should be taken care of by _maybe_extend_log,
+  // but let's keep this here just in case.
+  ceph_assert(bl.length() <= runway); 
+
 
   log.writer->append(bl);
 
@@ -3185,31 +3202,15 @@ void BlueFS::_release_pending_allocations(vector<interval_set<uint64_t>>& to_rel
 
 int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq)
 {
-  int64_t available_runway;
-  do {
-    log.lock.lock();
-    dirty.lock.lock();
-    if (want_seq && want_seq <= dirty.seq_stable) {
-      dout(10) << __func__ << " want_seq " << want_seq << " <= seq_stable "
-              << dirty.seq_stable << ", done" << dendl;
-      dirty.lock.unlock();
-      log.lock.unlock();
-      return 0;
-    }
-
-    available_runway = _maybe_extend_log();
-    if (available_runway == -EWOULDBLOCK) {
-      // we are in need of adding runway, but we are during log-switch from compaction
-      dirty.lock.unlock();
-      //instead log.lock.unlock() do move ownership
-      std::unique_lock<ceph::mutex> ll(log.lock, std::adopt_lock);
-      while (log_forbidden_to_expand.load()) {
-       log_cond.wait(ll);
-      }
-    } else {
-      ceph_assert(available_runway >= 0);
-    }
-  } while (available_runway < 0);
+  log.lock.lock();
+  dirty.lock.lock();
+  if (want_seq && want_seq <= dirty.seq_stable) {
+    dout(10) << __func__ << " want_seq " << want_seq << " <= seq_stable "
+      << dirty.seq_stable << ", done" << dendl;
+    dirty.lock.unlock();
+    log.lock.unlock();
+    return 0;
+  }
   
   ceph_assert(want_seq == 0 || want_seq <= dirty.seq_live); // illegal to request seq that was not created yet
   uint64_t seq =_log_advance_seq();
@@ -3218,7 +3219,8 @@ int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq)
   to_release.swap(dirty.pending_release);
   dirty.lock.unlock();
 
-  _flush_and_sync_log_core(available_runway);
+  _maybe_extend_log();
+  _flush_and_sync_log_core();
   _flush_bdev(log.writer);
   logger->set(l_bluefs_log_bytes, log.writer->file->fnode.size);
   //now log.lock is no longer needed
@@ -3232,8 +3234,7 @@ int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq)
 }
 
 // Flushes log and immediately adjusts log_writer pos.
-int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to,
-                                    int64_t available_runway)
+int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to)
 {
   ceph_assert(ceph_mutex_is_locked(log.lock));
 
@@ -3246,7 +3247,7 @@ int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to,
   vector<interval_set<uint64_t>> to_release(dirty.pending_release.size());
   to_release.swap(dirty.pending_release);
   dirty.lock.unlock();
-  _flush_and_sync_log_core(available_runway);
+  _flush_and_sync_log_core();
 
   dout(10) << __func__ << " jumping log offset from 0x" << std::hex
           << log.writer->pos << " -> 0x" << jump_to << std::dec << dendl;
index adfc8eb0a235b940a455395442dbf65efd3c7e7e..4c89baea3a6c1c16899156d56150f0df5cccbd42 100644 (file)
@@ -453,15 +453,14 @@ private:
 #endif
 
   int64_t _maybe_extend_log();
-  void _extend_log();
+  void _extend_log(uint64_t amount);
   uint64_t _log_advance_seq();
   void _consume_dirty(uint64_t seq);
   void _clear_dirty_set_stable_D(uint64_t seq_stable);
   void _release_pending_allocations(std::vector<interval_set<uint64_t>>& to_release);
 
-  void _flush_and_sync_log_core(int64_t available_runway);
-  int _flush_and_sync_log_jump_D(uint64_t jump_to,
-                              int64_t available_runway);
+  void _flush_and_sync_log_core();
+  int _flush_and_sync_log_jump_D(uint64_t jump_to);
   int _flush_and_sync_log_LD(uint64_t want_seq = 0);
 
   uint64_t _estimate_transaction_size(bluefs_transaction_t* t);
index c8d2ede7bed92abfb7f561283e03e9e7f84d08da..70c8a4fbf1c5645382825572d1bbd210a491d8ce 100644 (file)
@@ -4,6 +4,7 @@
 #include <algorithm>
 #include "bluefs_types.h"
 #include "common/Formatter.h"
+#include "include/denc.h"
 #include "include/uuid.h"
 #include "include/stringify.h"
 
@@ -218,6 +219,23 @@ std::ostream& operator<<(std::ostream& out, const bluefs_fnode_delta_t& delta)
 
 // bluefs_transaction_t
 
+DENC_HELPERS
+void bluefs_transaction_t::bound_encode(size_t &s) const {
+  uint32_t crc = op_bl.crc32c(-1);
+  DENC_START(1, 1, s);
+  denc(uuid, s);
+  denc_varint(seq, s);
+  // not using bufferlist encode method, as it merely copies the bufferptr and not
+  // contents, meaning we're left with fragmented target bl
+  __u32 len = op_bl.length();
+  denc(len, s);
+  for (auto& it : op_bl.buffers()) {
+    s += it.length();
+  }
+  denc(crc, s);
+  DENC_FINISH(s);
+}
+
 void bluefs_transaction_t::encode(bufferlist& bl) const
 {
   uint32_t crc = op_bl.crc32c(-1);
@@ -282,3 +300,4 @@ ostream& operator<<(ostream& out, const bluefs_transaction_t& t)
             << " crc 0x" << t.op_bl.crc32c(-1)
             << std::dec << ")";
 }
+
index d5d8ee5a62826f979d8ad9d7106c15c45abc54a0..b0ce7c5c9d38d7905af6008a7f396122d02bead4 100644 (file)
@@ -308,6 +308,7 @@ struct bluefs_transaction_t {
     encode(delta, op_bl);
     file.reset_delta();
   }
+
   void op_file_remove(uint64_t ino) {
     using ceph::encode;
     encode((__u8)OP_FILE_REMOVE, op_bl);
@@ -328,6 +329,7 @@ struct bluefs_transaction_t {
     op_bl.claim_append(from.op_bl);
   }
 
+  void bound_encode(size_t &s) const;
   void encode(ceph::buffer::list& bl) const;
   void decode(ceph::buffer::list::const_iterator& p);
   void dump(ceph::Formatter *f) const;
index 75496a89d2c39c5889c954aa8bc8c512cb0051af..6d3ff1218a437576dd452b43dadefbc3610cd88f 100644 (file)
@@ -1459,8 +1459,9 @@ TEST(BlueFS, test_log_runway_2) {
   ASSERT_EQ(0, fs.mount());
   ASSERT_EQ(0, fs.maybe_verify_layout({ BlueFS::BDEV_DB, false, false }));
   // longer transaction than current runway
-  std::string longdir(max_log_runway * 2, 'a');
-  std::string longfile(max_log_runway * 2, 'b');
+  size_t name_length = max_log_runway * 2;
+  std::string longdir(name_length, 'a');
+  std::string longfile(name_length, 'b');
   {
     BlueFS::FileWriter *h;
     ASSERT_EQ(0, fs.mkdir(longdir));
@@ -1492,6 +1493,10 @@ TEST(BlueFS, test_log_runway_2) {
   ASSERT_EQ(file_size, 9);
   fs.stat(longdir, longfile, &file_size, &mtime);
   ASSERT_EQ(file_size, 6);
+
+  std::vector<std::string> ls_longdir;
+  fs.readdir(longdir, &ls_longdir);
+  ASSERT_EQ(ls_longdir.front(), longfile);
 }
 
 TEST(BlueFS, test_log_runway_3) {