From e97aa857c241a9694871de3d3a4079c4d9a120d3 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 25 Jul 2023 17:28:14 +0200 Subject: [PATCH] os/bluestore: fix bluefs log runway enospc With these changes, every call to log compaction will try to expand its runway in case of insufficient log space. async compaction will ignore the `log_forbidden_to_expand` atomic since we know it should't be harmful. In any other case, expansion of log will wait until compaction is completed. in order to ensure op_file_update_inc fits on disk we increase the size of logs as previously used in _maybe_extend_log. This means we too bring back _maybe_extend_log with a different usage. _maybe_extend_log increases the size of the log if the runway is less than the min runway and if the current transaction is too big to fit. Fixes: https://tracker.ceph.com/issues/58759 Signed-off-by: Pere Diaz Bou --- src/os/bluestore/BlueFS.cc | 157 ++++++++++++++-------------- src/os/bluestore/BlueFS.h | 7 +- src/os/bluestore/bluefs_types.cc | 19 ++++ src/os/bluestore/bluefs_types.h | 2 + src/test/objectstore/test_bluefs.cc | 9 +- 5 files changed, 110 insertions(+), 84 deletions(-) diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc index 8454ddaf8078c..19953b130d62a 100644 --- a/src/os/bluestore/BlueFS.cc +++ b/src/os/bluestore/BlueFS.cc @@ -20,15 +20,12 @@ using TOPNSPC::common::cmd_getval; using std::byte; using std::list; -using std::make_pair; using std::map; using std::ostream; -using std::pair; using std::set; using std::string; using std::to_string; using std::vector; -using std::chrono::duration; using std::chrono::seconds; using ceph::bufferlist; @@ -2745,14 +2742,19 @@ void BlueFS::_compact_log_async_LD_LNF_D() //also locks FW for new_writer // Part 0. // Lock the log and forbid its expansion and other compactions + // lock log's run-time structures for a while + log.lock.lock(); + + // Extend log in case of having a big transaction waiting before starting compaction. + _maybe_extend_log(); + // only one compaction allowed at one time bool old_is_comp = std::atomic_exchange(&log_is_compacting, true); if (old_is_comp) { dout(10) << __func__ << " ongoing" <file.get(); + old_log_jump_to = log_file->fnode.get_allocated(); bluefs_fnode_t fnode_tail; - uint64_t runway = log_file->fnode.get_allocated() - log.writer->get_effective_write_pos(); dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to << " need 0x" << cct->_conf->bluefs_max_log_runway << std::dec << dendl; int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint), @@ -2809,7 +2811,7 @@ void BlueFS::_compact_log_async_LD_LNF_D() //also locks FW for new_writer // TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations // then flush_bdev() will not be necessary _flush_bdev(); - _flush_and_sync_log_jump_D(old_log_jump_to, runway); + _flush_and_sync_log_jump_D(old_log_jump_to); // // Part 2. @@ -3054,54 +3056,68 @@ void BlueFS::_consume_dirty(uint64_t seq) } } -// Extends log if its free space is smaller then bluefs_min_log_runway. -// Returns space available *BEFORE* adding new space. Signed for additional <0 detection. -int64_t BlueFS::_maybe_extend_log() -{ +int64_t BlueFS::_maybe_extend_log() { + uint64_t runway = log.writer->file->fnode.get_allocated() - log.writer->get_effective_write_pos(); + // increasing the size of the log involves adding a OP_FILE_UPDATE_INC which its size will + // increase with respect the number of extents. bluefs_min_log_runway should cover the max size + // a log can get. + // inject new allocation in case log is too big + size_t expected_log_size = 0; + log.t.bound_encode(expected_log_size); + if (expected_log_size + cct->_conf->bluefs_min_log_runway > runway) { + _extend_log(expected_log_size + cct->_conf->bluefs_max_log_runway); + } else if (runway < cct->_conf->bluefs_min_log_runway) { + _extend_log(cct->_conf->bluefs_max_log_runway); + } + runway = log.writer->file->fnode.get_allocated() - log.writer->get_effective_write_pos(); + return runway; +} + +void BlueFS::_extend_log(uint64_t amount) { ceph_assert(ceph_mutex_is_locked(log.lock)); - // allocate some more space (before we run out)? - // BTW: this triggers `flush()` in the `page_aligned_appender` of `log.writer`. - int64_t runway = log.writer->file->fnode.get_allocated() - - log.writer->get_effective_write_pos(); - if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) { - dout(10) << __func__ << " allocating more log runway (0x" - << std::hex << runway << std::dec << " remaining)" << dendl; - /* - * Usually, when we are low on space in log, we just allocate new extent, - * put update op(log) to log and we are fine. - * Problem - it interferes with log compaction: - * New log produced in compaction will include - as last op - jump into some offset (anchor) of current log. - * It is assumed that log region (anchor - end) will contain all changes made by bluefs since - * full state capture into new log. - * Putting log update into (anchor - end) region is illegal, because any update there must be compatible with - * both logs, but old log is different then new log. - * - * Possible solutions: - * - stall extending log until we finish compacting and switch log (CURRENT) - * - re-run compaction with more runway for old log - * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs - */ - if (log_forbidden_to_expand.load() == true) { - return -EWOULDBLOCK; - } - vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode); - int r = _allocate( + std::unique_lock ll(log.lock, std::adopt_lock); + while (log_forbidden_to_expand.load() == true) { + log_cond.wait(ll); + } + ll.release(); + uint64_t allocated_before_extension = log.writer->file->fnode.get_allocated(); + vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode); + amount = round_up_to(amount, super.block_size); + int r = _allocate( vselector->select_prefer_bdev(log.writer->file->vselector_hint), - cct->_conf->bluefs_max_log_runway, + amount, 0, &log.writer->file->fnode); - ceph_assert(r == 0); - vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode); - log.t.op_file_update_inc(log.writer->file->fnode); + ceph_assert(r == 0); + dout(10) << "extended log by 0x" << std::hex << amount << " bytes " << dendl; + vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode); + + bluefs_transaction_t log_extend_transaction; + log_extend_transaction.seq = log.t.seq; + log_extend_transaction.uuid = log.t.uuid; + log_extend_transaction.op_file_update_inc(log.writer->file->fnode); + + bufferlist bl; + bl.reserve(super.block_size); + encode(log_extend_transaction, bl); + _pad_bl(bl, super.block_size); + log.writer->append(bl); + ceph_assert(allocated_before_extension >= log.writer->get_effective_write_pos()); + log.t.seq = log.seq_live; + + // before sync_core we advance the seq + { + std::unique_lock l(dirty.lock); + _log_advance_seq(); } - return runway; } -void BlueFS::_flush_and_sync_log_core(int64_t runway) +void BlueFS::_flush_and_sync_log_core() { ceph_assert(ceph_mutex_is_locked(log.lock)); dout(10) << __func__ << " " << log.t << dendl; + bufferlist bl; bl.reserve(super.block_size); encode(log.t, bl); @@ -3113,10 +3129,11 @@ void BlueFS::_flush_and_sync_log_core(int64_t runway) logger->inc(l_bluefs_log_write_count, 1); logger->inc(l_bluefs_logged_bytes, bl.length()); - if (true) { - ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss - // transaction will not fit extents before growth -> data loss on _replay - } + uint64_t runway = log.writer->file->fnode.get_allocated() - log.writer->get_effective_write_pos(); + // ensure runway is big enough, this should be taken care of by _maybe_extend_log, + // but let's keep this here just in case. + ceph_assert(bl.length() <= runway); + log.writer->append(bl); @@ -3185,31 +3202,15 @@ void BlueFS::_release_pending_allocations(vector>& to_rel int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq) { - int64_t available_runway; - do { - log.lock.lock(); - dirty.lock.lock(); - if (want_seq && want_seq <= dirty.seq_stable) { - dout(10) << __func__ << " want_seq " << want_seq << " <= seq_stable " - << dirty.seq_stable << ", done" << dendl; - dirty.lock.unlock(); - log.lock.unlock(); - return 0; - } - - available_runway = _maybe_extend_log(); - if (available_runway == -EWOULDBLOCK) { - // we are in need of adding runway, but we are during log-switch from compaction - dirty.lock.unlock(); - //instead log.lock.unlock() do move ownership - std::unique_lock ll(log.lock, std::adopt_lock); - while (log_forbidden_to_expand.load()) { - log_cond.wait(ll); - } - } else { - ceph_assert(available_runway >= 0); - } - } while (available_runway < 0); + log.lock.lock(); + dirty.lock.lock(); + if (want_seq && want_seq <= dirty.seq_stable) { + dout(10) << __func__ << " want_seq " << want_seq << " <= seq_stable " + << dirty.seq_stable << ", done" << dendl; + dirty.lock.unlock(); + log.lock.unlock(); + return 0; + } ceph_assert(want_seq == 0 || want_seq <= dirty.seq_live); // illegal to request seq that was not created yet uint64_t seq =_log_advance_seq(); @@ -3218,7 +3219,8 @@ int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq) to_release.swap(dirty.pending_release); dirty.lock.unlock(); - _flush_and_sync_log_core(available_runway); + _maybe_extend_log(); + _flush_and_sync_log_core(); _flush_bdev(log.writer); logger->set(l_bluefs_log_bytes, log.writer->file->fnode.size); //now log.lock is no longer needed @@ -3232,8 +3234,7 @@ int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq) } // Flushes log and immediately adjusts log_writer pos. -int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to, - int64_t available_runway) +int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to) { ceph_assert(ceph_mutex_is_locked(log.lock)); @@ -3246,7 +3247,7 @@ int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to, vector> to_release(dirty.pending_release.size()); to_release.swap(dirty.pending_release); dirty.lock.unlock(); - _flush_and_sync_log_core(available_runway); + _flush_and_sync_log_core(); dout(10) << __func__ << " jumping log offset from 0x" << std::hex << log.writer->pos << " -> 0x" << jump_to << std::dec << dendl; diff --git a/src/os/bluestore/BlueFS.h b/src/os/bluestore/BlueFS.h index adfc8eb0a235b..4c89baea3a6c1 100644 --- a/src/os/bluestore/BlueFS.h +++ b/src/os/bluestore/BlueFS.h @@ -453,15 +453,14 @@ private: #endif int64_t _maybe_extend_log(); - void _extend_log(); + void _extend_log(uint64_t amount); uint64_t _log_advance_seq(); void _consume_dirty(uint64_t seq); void _clear_dirty_set_stable_D(uint64_t seq_stable); void _release_pending_allocations(std::vector>& to_release); - void _flush_and_sync_log_core(int64_t available_runway); - int _flush_and_sync_log_jump_D(uint64_t jump_to, - int64_t available_runway); + void _flush_and_sync_log_core(); + int _flush_and_sync_log_jump_D(uint64_t jump_to); int _flush_and_sync_log_LD(uint64_t want_seq = 0); uint64_t _estimate_transaction_size(bluefs_transaction_t* t); diff --git a/src/os/bluestore/bluefs_types.cc b/src/os/bluestore/bluefs_types.cc index c8d2ede7bed92..70c8a4fbf1c56 100644 --- a/src/os/bluestore/bluefs_types.cc +++ b/src/os/bluestore/bluefs_types.cc @@ -4,6 +4,7 @@ #include #include "bluefs_types.h" #include "common/Formatter.h" +#include "include/denc.h" #include "include/uuid.h" #include "include/stringify.h" @@ -218,6 +219,23 @@ std::ostream& operator<<(std::ostream& out, const bluefs_fnode_delta_t& delta) // bluefs_transaction_t +DENC_HELPERS +void bluefs_transaction_t::bound_encode(size_t &s) const { + uint32_t crc = op_bl.crc32c(-1); + DENC_START(1, 1, s); + denc(uuid, s); + denc_varint(seq, s); + // not using bufferlist encode method, as it merely copies the bufferptr and not + // contents, meaning we're left with fragmented target bl + __u32 len = op_bl.length(); + denc(len, s); + for (auto& it : op_bl.buffers()) { + s += it.length(); + } + denc(crc, s); + DENC_FINISH(s); +} + void bluefs_transaction_t::encode(bufferlist& bl) const { uint32_t crc = op_bl.crc32c(-1); @@ -282,3 +300,4 @@ ostream& operator<<(ostream& out, const bluefs_transaction_t& t) << " crc 0x" << t.op_bl.crc32c(-1) << std::dec << ")"; } + diff --git a/src/os/bluestore/bluefs_types.h b/src/os/bluestore/bluefs_types.h index d5d8ee5a62826..b0ce7c5c9d38d 100644 --- a/src/os/bluestore/bluefs_types.h +++ b/src/os/bluestore/bluefs_types.h @@ -308,6 +308,7 @@ struct bluefs_transaction_t { encode(delta, op_bl); file.reset_delta(); } + void op_file_remove(uint64_t ino) { using ceph::encode; encode((__u8)OP_FILE_REMOVE, op_bl); @@ -328,6 +329,7 @@ struct bluefs_transaction_t { op_bl.claim_append(from.op_bl); } + void bound_encode(size_t &s) const; void encode(ceph::buffer::list& bl) const; void decode(ceph::buffer::list::const_iterator& p); void dump(ceph::Formatter *f) const; diff --git a/src/test/objectstore/test_bluefs.cc b/src/test/objectstore/test_bluefs.cc index 75496a89d2c39..6d3ff1218a437 100644 --- a/src/test/objectstore/test_bluefs.cc +++ b/src/test/objectstore/test_bluefs.cc @@ -1459,8 +1459,9 @@ TEST(BlueFS, test_log_runway_2) { ASSERT_EQ(0, fs.mount()); ASSERT_EQ(0, fs.maybe_verify_layout({ BlueFS::BDEV_DB, false, false })); // longer transaction than current runway - std::string longdir(max_log_runway * 2, 'a'); - std::string longfile(max_log_runway * 2, 'b'); + size_t name_length = max_log_runway * 2; + std::string longdir(name_length, 'a'); + std::string longfile(name_length, 'b'); { BlueFS::FileWriter *h; ASSERT_EQ(0, fs.mkdir(longdir)); @@ -1492,6 +1493,10 @@ TEST(BlueFS, test_log_runway_2) { ASSERT_EQ(file_size, 9); fs.stat(longdir, longfile, &file_size, &mtime); ASSERT_EQ(file_size, 6); + + std::vector ls_longdir; + fs.readdir(longdir, &ls_longdir); + ASSERT_EQ(ls_longdir.front(), longfile); } TEST(BlueFS, test_log_runway_3) { -- 2.39.5