From: Adam Kupczyk Date: Mon, 10 Mar 2025 12:15:52 +0000 (+0000) Subject: os/bluestore: Refactor of WAL-v2 BlueFS X-Git-Tag: testing/wip-vshankar-testing-20250411.090237-debug~28^2~14 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=d1703dc9eb36a61a03030aca5e45f04ce29c06d6;p=ceph-ci.git os/bluestore: Refactor of WAL-v2 BlueFS Main changes: - remove bluefs_wal_header_t; we no longer use denc directly, instead we read length and marker directly - get rid of wal_limit; this is replaced with splitting WAL_V2=>{WAL_V2,WAL_V2_FIN}. WAL_V2 means there could be more wal data, WAL_V2_FIN means file was orderly closed = no more data. - modified wal_marker_t (previously WALMarker) to be calculated in endiness-agnostic way. - replaced _wal_update_size with _wal_index_file; it does a similar function, but is less convoluted now - simplified _wal_read - now BlueFS::close_writer() flushes data - BlueRocksEnv's BlueRocksWritableFile::truncate() now does the action Signed-off-by: Adam Kupczyk --- diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc index d95694226c8..e4301b0aa2e 100644 --- a/src/os/bluestore/BlueFS.cc +++ b/src/os/bluestore/BlueFS.cc @@ -1705,7 +1705,9 @@ int BlueFS::_replay(bool noop, bool to_stdout) { bluefs_fnode_t fnode; decode(fnode, p); - ceph_assert(fnode.type == bluefs_node_type::REGULAR || fnode.type == bluefs_node_type::WAL_V2); + ceph_assert(fnode.type == bluefs_node_type::REGULAR || + fnode.type == bluefs_node_type::WAL_V2 || + fnode.type == bluefs_node_type::WAL_V2_FIN); dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_file_update " << " " << fnode << " " << dendl; if (unlikely(to_stdout)) { @@ -1778,9 +1780,9 @@ int BlueFS::_replay(bool noop, bool to_stdout) vselector->sub_usage(f->vselector_hint, fnode); } fnode.claim_extents(delta.extents); - fnode.size = delta.size; - fnode.wal_limit = delta.wal_limit; - fnode.wal_size = delta.wal_size; + fnode.size = delta.size; + fnode.type = delta.type; + fnode.wal_size = delta.wal_size; dout(20) << __func__ << " 0x" << std::hex << pos << std::dec << ": op_file_update_inc produced " << " " << fnode << " " << dendl; @@ -1859,11 +1861,7 @@ int BlueFS::_replay(bool noop, bool to_stdout) for (const auto &[filename, file] : nodes.file_map) { if (file->is_new_wal()) { - dout(5) << __func__ << " " << file << " " << file->refs << dendl; - if (file->refs == 0) { - continue; - } - _wal_update_size(file, file->fnode.size); + _wal_index_file(file); } } } @@ -2377,183 +2375,189 @@ int64_t BlueFS::_read_random( return ret; } -void BlueFS::_wal_update_size(FileRef file, uint64_t increment) { - using WALLength = File::WALFlush::WALLength; - - file->is_wal_read_loaded = true; - file->wal_flushes.clear(); - - uint64_t flush_offset = 0; - dout(20) - << fmt::format( - "{} updating WAL file {} for range {:#x}~{:#x} limit is {:#x}", - __func__, file->fnode.ino, flush_offset, increment, file->fnode.wal_limit) - << dendl; - ceph_assert(file->wal_flushes.empty()); - - FileReader *h = new FileReader(file, cct->_conf->bluefs_max_prefetch, false, true); - - size_t header_size = File::WALFlush::header_size(); - - uint64_t flush_end = flush_offset + increment; - while (flush_offset < file->fnode.wal_limit) { - // read first part of wal flush - bufferlist bl; - bluefs_wal_header_t header; - - uint64_t read_result = (uint64_t)_read(h, flush_offset, header_size, &bl, nullptr); - if (read_result < header_size) { - dout(20) << fmt::format("{} cannot read wal header, most likely we are out of bounds. flush_offset={:#X}", __func__, flush_offset) << dendl; - break; - } - - dout(30) << __func__ << " result \n"; - bl.hexdump(*_dout); - *_dout << dendl; - auto buffer_iterator = bl.cbegin(); - try { - decode(header, buffer_iterator); - } catch(ceph::buffer::error& e) { - // EOF or corruption - dout(30) << fmt::format("couldn't decode wal flush header at offset {:#x}: {}", flush_offset, e.what()) << dendl; - break; - } +std::ostream& operator<<( + std::ostream& out, + const BlueFS::File::wal_flush_t& w) { + out << fmt::format("[wal:{:#x}~{:x} -> file:{:#x}~{:x}/{:x}/{:x}]", + w.wal_offset, w.wal_length, + w.file_offset, w.header_len, w.wal_length, w.tailer_len); + return out; +} - WALLength flush_length = header.flush_length; - dout(20) << __func__ << " flush_length " << flush_length << dendl; - File::WALFlush new_flush(flush_offset, flush_length); +std::ostream& operator<<( + std::ostream& out, + const BlueFS::File::wal_flush_t::wal_marker_t& m) { + out << fmt::format("0x{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}", + m.v[0], m.v[1], m.v[2], m.v[3], m.v[4], m.v[5], m.v[6], m.v[7]); + return out; +} - // read marker - bl.clear(); - uint64_t marker_offset = new_flush.get_marker_offset(); - read_result = _read(h, marker_offset, new_flush.tail_size(), &bl, nullptr); - if (read_result < new_flush.tail_size()) { - dout(20) << fmt::format("{} cannot read marker, most likely we are out of bounds. flush_offset={:#X}, marker_offset={:#X}", __func__, flush_offset, marker_offset) << dendl; - break; - } - uint64_t marker; - buffer_iterator = bl.cbegin(); - decode(marker, buffer_iterator); - if (marker != File::WALFlush::generate_hashed_marker(super.osd_uuid, file->fnode.ino)) { - // EOF or corruption - dout(30) << fmt::format("reached eof or marker corruption {:#x}", flush_offset) << dendl; +void BlueFS::_wal_index_file( + FileRef file) +{ + file->wal_marker = File::wal_flush_t::generate_hashed_marker(super.uuid, file->fnode.ino); + dout(10) << file << " required marker=#" << std::hex << file->wal_marker << std::dec << dendl; + uint64_t scan_ofs = 0; + uint64_t wal_ofs = 0; + File::wal_flush_t flush; + bool envelope_good; + uint64_t file_size = file->fnode.size; + FileReader *h = new FileReader(file, 4096, false, true); + ceph_assert(h); + while(scan_ofs < file->fnode.allocated) { + envelope_good = _read_wal_flush(h, scan_ofs, wal_ofs, &flush); + dout(20) << "envelope " << (envelope_good ? "good " : "bad ") << flush << dendl; + if (envelope_good) { + h->file->wal_flushes.push_back(flush); + wal_ofs += flush.wal_length; + scan_ofs += flush.wal_length + flush.header_len + flush.tailer_len; + if (file->fnode.type == WAL_V2_FIN) { + if (scan_ofs > file_size) { + dout(5) << "valid envelope but exceeds file size." << dendl; + } + if (scan_ofs >= file_size) { + // that's it, done + break; + } + // more envelopes expected + continue; + } else { + // WAL_V2, might continue + continue; + } + } else { + if (scan_ofs < file_size) { + // we want to accept failed envelope if we are below confirmed wal size + h->file->wal_flushes.push_back(flush); + } break; } + } + file->is_wal_read_loaded = true; + delete h; +} - uint64_t increase = new_flush.end_offset() - new_flush.offset; - dout(20) << fmt::format("{} adding flush {:#x}~{:#x}", __func__, flush_offset, new_flush.length) << dendl; - file->wal_flushes.push_back(new_flush); - if (flush_offset >= flush_end) { - dout(20) << fmt::format("{} recovering flush {:#x}~{:#x}", __func__, flush_offset, new_flush.length) << dendl; - file->fnode.wal_size += new_flush.length; - file->fnode.size += increase; - vselector->add_usage(file->vselector_hint, increase); +// If off is above wal file size, return -1. +// On success set wal_flush_t data, return amount of bytes readable from the envelope. +// In case that some flush envelopes are corrupted, construct replacement; +// make it up so the total content in flushes matches file size. +// Replacement flush are considered valid in context of this function. +int BlueFS::_wal_seek_to( + FileReader *h, + uint64_t lookup_ofs, + File::wal_flush_t* fl_out) +{ + //1st, lets check if we already have such offset + uint64_t wal_ofs = 0; + uint64_t file_ofs = 0; + for (const auto& fl: h->file->wal_flushes) { + ceph_assert(fl.wal_offset == wal_ofs); + ceph_assert(fl.file_offset == file_ofs); + if (fl.wal_offset + fl.wal_length <= lookup_ofs) { + wal_ofs += fl.wal_length; + file_ofs += fl.header_len + fl.wal_length + fl.tailer_len; + continue; } - - flush_offset += increase; + ceph_assert(fl.wal_offset <= lookup_ofs); + *fl_out = fl; + return fl.wal_offset + fl.wal_length - lookup_ofs; } - - // if we read less it might mean corruption - if (flush_offset < flush_end) { - dout(20) << fmt::format("{} read less than expected {:#x} bytes", __func__, flush_offset) << dendl; + if (wal_ofs == lookup_ofs) { + // asking exactly for EOF + return 0; } - ceph_assert(flush_offset >= flush_end); + return -1; +} - delete h; +bool BlueFS::_read_wal_flush( + FileReader *h, + uint64_t file_ofs, + uint64_t wal_ofs, + File::wal_flush_t* fl) +{ + ceph_le64 flush_length_le; + File::wal_flush_t::wal_marker_t wal_marker; + static_assert(File::wal_flush_t::header_size() == sizeof(flush_length_le)); + int64_t r = _read(h, file_ofs, File::wal_flush_t::header_size(), nullptr, (char*)&flush_length_le); + if (r != File::wal_flush_t::header_size()) goto fail; + r = _read(h, file_ofs + File::wal_flush_t::header_size() + flush_length_le, + File::wal_flush_t::tail_size(), nullptr, (char*)&wal_marker); + if (r != File::wal_flush_t::tail_size()) goto fail; + if (0 != memcmp(&wal_marker, &h->file->wal_marker, 8)) goto fail; + dout(20) << __func__ << " read.len=0x" << std::hex << flush_length_le << " read.marker=#" + << wal_marker << std::dec << dendl; + fl->wal_offset = wal_ofs; + fl->file_offset = file_ofs; + fl->header_len = File::wal_flush_t::header_size(); + fl->tailer_len = File::wal_flush_t::tail_size(); + fl->wal_length = flush_length_le; + dout(20) << __func__ << " envelope: " << *fl << dendl; + return true; + fail: + dout(20) << __func__ << "read.len=0x" << std::hex << flush_length_le << " read.marker=#" << wal_marker + << " required marker=#" << h->file->wal_marker << std::dec << dendl; + // technically, we could scan for missing flushes.... + fl->wal_offset = wal_ofs; + fl->file_offset = file_ofs; + fl->header_len = 0; + fl->tailer_len = 0; + fl->wal_length = file_ofs < h->file->fnode.size + ? h->file->fnode.size - file_ofs + : h->file->fnode.allocated - file_ofs; + dout(10) << __func__ << " failed to find envelope, created artificial: " << *fl << dendl; + return false; } int64_t BlueFS::_read_wal( FileReader *h, ///< [in] read from here - uint64_t off, ///< [in] offset - size_t len, ///< [in] this many bytes + uint64_t off_req, ///< [in] offset + size_t len_req, ///< [in] this many bytes bufferlist *outbl, ///< [out] optional: reference the result here char *out) ///< [out] optional: or copy it here { ceph_assert(h->file->is_wal_read_loaded); - dout(20) << __func__ << " h " << h << " offset: 0x" - << off << std::hex << "~" << len << std::hex << dendl; + dout(10) << __func__ << " h " << h << " offset: 0x" + << off_req << std::hex << "~" << len_req << std::hex << dendl; if (outbl) { outbl->clear(); } - - int64_t ret = 0; - - // WAL data is wrapped in an envelope that has a format of [length of flush, payload, file ino] - // wal_data_logical_offset points to the offset of the payload we are currently in. - uint64_t wal_data_logical_offset = 0; - - - // save previous position as buffer pos is treated difffernt on regular files - uint64_t previous_pos = h->buf.pos; - - uint64_t remaining_len = len; - auto flush_iterator = h->file->wal_flushes.begin(); - while (remaining_len > 0 && flush_iterator != h->file->wal_flushes.end()) { - uint64_t flush_offset = flush_iterator->offset; - uint64_t flush_length = flush_iterator->length; - dout(25) << fmt::format("{} flush_offset={:#x} flush_length={:#x}", __func__, flush_offset, flush_length) << dendl; - - if (flush_length == 0) { - if (remaining_len > 0) { - dout(5) << __func__ << " flush_length 0: reading less then required " - << ret << "<" << len - ret << dendl; - } + uint64_t off = off_req; + int64_t r = 0; + while (off < off_req + len_req) { + File::wal_flush_t fl; + int64_t readable = _wal_seek_to(h, off, &fl); + if (readable == 0) { + // we apparently read everything break; } - // if we won't find offset here, go ahead - bool in_range = wal_data_logical_offset < off + len && wal_data_logical_offset + flush_length > off; - ceph_assert(wal_data_logical_offset < off+len); - if (!in_range) { - if (off >= wal_data_logical_offset + flush_length) { - // move to next flush - // TODO(pere): do we check "ino" here too? - wal_data_logical_offset += flush_length; - flush_iterator++; - continue; - } - } - - uint64_t payload_offset = flush_iterator->get_payload_offset(); - - uint64_t skip_front = 0; - if(wal_data_logical_offset < off) { - // offset is in this flush chunk so if we are before we move forward - skip_front = off - wal_data_logical_offset; + if (readable < 0) { + dout(10) << fmt::format("{} invalid wal flush", __func__) << dendl; + break; } - payload_offset += skip_front; - wal_data_logical_offset += skip_front; - - dout(20) << fmt::format("{} payload_offset is {:#X} after skipping {:#X} bytes", __func__, payload_offset, skip_front) << dendl; - - uint64_t data_to_read_from_flush = std::min(flush_length-skip_front, remaining_len); - bufferlist payload; - dout(25) << fmt::format("{} data to read from flush = {:#X}", __func__, data_to_read_from_flush) << dendl; - _read(h, payload_offset, data_to_read_from_flush, &payload, nullptr); - - if (out) { - auto p = payload.begin(); - p.copy(data_to_read_from_flush, out); - out += data_to_read_from_flush; + readable = std::min(uint64_t(readable), off_req + len_req - off); + dout(20) << fmt::format("{} wal:{:#x}~{:#x} -> file:{:#x}~{:#x}/{:#x}/{:#x}", + __func__, fl.wal_offset, fl.wal_length, fl.file_offset, fl.header_len, fl.wal_length, fl.tailer_len) << dendl; + ceph_assert(fl.wal_offset <= off && off < fl.wal_offset + fl.wal_length); + uint64_t file_off = fl.file_offset + fl.header_len + (off - fl.wal_offset); + bufferlist res; + r = _read(h, file_off, readable, outbl ? &res : nullptr, + out ? out + (off - off_req) : nullptr); + if (r < 0) { + dout(10) << fmt::format("{} read failed with {:#d}", __func__, r) << dendl; + break; } if (outbl) { - outbl->claim_append(payload); + outbl->append(res);//claim_append(res); } - flush_iterator++; - - remaining_len -= data_to_read_from_flush; - wal_data_logical_offset += data_to_read_from_flush; - ret += data_to_read_from_flush; + off += r; } - if (remaining_len > 0) { - dout(20) << __func__ << " reading less than required, missing: " << remaining_len << dendl; + // even if we had read error, prefer returning read size of successful reads, + // let next iteration get error + if ((r >= 0) || (off - off_req > 0)) { + r = off - off_req; + dout(20) << __func__ << std::hex << " got 0x" << r << std::dec << dendl; } - - dout(20) << __func__ << std::hex - << " got 0x" << ret - << std::dec << dendl; - ceph_assert(!outbl || (int)outbl->length() == ret); - h->buf.pos = previous_pos + ret; - return ret; + return r; } int64_t BlueFS::_read( @@ -3693,11 +3697,10 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer( dout(20) << " leaving 0x" << std::hex << buffer.length() << std::dec << " unflushed" << dendl; } - unsigned padding_len = 0; // Append padding to fill block - const unsigned tail = bl.length() & ~super.block_mask(); + unsigned tail = bl.length() & ~super.block_mask(); if (tail) { - padding_len = super.block_size - tail; + unsigned padding_len = super.block_size - tail; dout(20) << __func__ << " caching tail of 0x" << std::hex << tail << " and padding block with 0x" << padding_len @@ -3708,7 +3711,6 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer( // Otherwise a costly rebuild could happen in e.g. `KernelDevice`. buffer_appender.append_zero(padding_len); buffer.splice(buffer.length() - padding_len, padding_len, &bl); - // Deep copy the tail here. This allows to avoid costlier copy on // bufferlist rebuild in e.g. `KernelDevice` and minimizes number // of memory allocations. @@ -3720,7 +3722,6 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer( } else { tail_block.clear(); } - return bl; } @@ -3762,7 +3763,27 @@ void BlueFS::flush_range(FileWriter *h, uint64_t offset, uint64_t length)/*_WF*/ { _maybe_check_vselector_LNF(); std::unique_lock hl(h->lock); - _flush_range_F(h, offset, length); + if (h->file->is_new_wal()) { + //For wal files disregard offset and length and just flush current envelope. + _flush_wal_F(h); + } else { + _flush_range_F(h, offset, length); + } +} + +int BlueFS::_flush_wal_F(FileWriter *h) +{ + ceph_assert(ceph_mutex_is_locked(h->lock)); + ceph_assert(h->file->is_new_wal()); + uint64_t content_length = h->get_buffer_length() - File::wal_flush_t::header_size(); + h->append((char*)&h->file->wal_marker.v[0], File::wal_flush_t::tail_size()); + uint64_t offset = h->pos; + + h->file->fnode.wal_size += content_length; + ceph_le64 flush_length_le(content_length); + h->wal_header_filler.copy_in(File::wal_flush_t::header_size(), (char*)&flush_length_le); + uint64_t length = File::wal_flush_t::header_size() + content_length + File::wal_flush_t::tail_size(); + return _flush_range_F(h, offset, length); } int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length) @@ -3772,11 +3793,6 @@ int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length) ceph_assert(h->file->num_readers.load() == 0); ceph_assert(h->file->fnode.ino > 1); - if (h->file->is_new_wal()) { - // WALFlush::WALLength is already appended at the start of first append_try_flush - // update length, offset is already updated with correct position - length += File::WALFlush::tail_size(); - } uint64_t end = offset + length; dout(10) << __func__ << " " << h << " pos 0x" << std::hex << h->pos @@ -3835,19 +3851,6 @@ int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length) h->file->is_dirty = true; } } - - if (h->file->is_new_wal()) { - // create WAL flush envelope - uint64_t flush_size = length - File::WALFlush::extra_envelope_size_on_front_and_tail(); - ceph_assert(h->get_wal_header_filler() != nullptr); - bluefs_wal_header_t(flush_size).encode(*h->get_wal_header_filler()); - h->set_wal_header_filler(nullptr); - - h->append(h->file->wal_marker); - h->file->fnode.wal_size += flush_size; - h->file->fnode.wal_limit = h->file->fnode.get_allocated(); - } - dout(20) << __func__ << " file now, unflushed " << h->file->fnode << dendl; int res = _flush_data(h, offset, length, buffered); logger->tinc(l_bluefs_flush_lat, mono_clock::now() - t0); @@ -3978,14 +3981,9 @@ void BlueFS::append_try_flush(FileWriter *h, const char* buf, size_t len)/*_WF_L bool flushed_sum = false; { std::unique_lock hl(h->lock); - if (h->file->is_new_wal() && h->get_buffer_length() == 0) { - size_t size = 0; - bluefs_wal_header_t().bound_encode(size); - bufferlist::contiguous_filler filler = h->append_hole(size); - h->set_wal_header_filler(std::make_unique(bufferlist::contiguous_filler(filler))); + h->wal_header_filler = h->append_hole(File::wal_flush_t::header_size()); } - size_t max_size = 1ull << 30; // cap to 1GB while (len > 0) { bool need_flush = true; @@ -4051,7 +4049,12 @@ int BlueFS::_flush_F(FileWriter *h, bool force, bool *flushed) << std::hex << offset << "~" << length << std::dec << " to " << h->file->fnode << dendl; ceph_assert(h->pos <= h->file->fnode.size); - int r = _flush_range_F(h, offset, length); + int r; + if (h->file->is_new_wal()) { + r = _flush_wal_F(h); + } else { + r = _flush_range_F(h, offset, length); + } if (flushed) { *flushed = true; } @@ -4106,6 +4109,14 @@ int BlueFS::truncate(FileWriter *h, uint64_t offset)/*_WF_L*/ if (r < 0) return r; } + if (h->file->is_new_wal()) { + // We cannot do random truncation. + // But it is observed that RocksDB truncates WALs right at the end of written data. + ceph_assert(offset == h->file->fnode.wal_size || offset == 0); + if (offset == h->file->fnode.wal_size) { + offset = h->file->fnode.size; + } + } if (offset > fnode.size) { ceph_abort_msg("truncate up not supported"); } @@ -4114,16 +4125,6 @@ int BlueFS::truncate(FileWriter *h, uint64_t offset)/*_WF_L*/ { std::lock_guard ll(log.lock); std::lock_guard dl(dirty.lock); - if (h->file->is_new_wal()) { - // This assumption comes from reading logs of rocksdb+bluefs where a WAL file follows this pattern: - // 1. create wal - // 2. open_for_write - // 3. close_writer - // 4. truncate -> fnode.size - // 5. unlink - ceph_assert(h->file->fnode.size == offset || offset == 0); - h->file->fnode.wal_limit = offset; - } bool changed_extents = false; vselector->sub_usage(h->file->vselector_hint, fnode); uint64_t x_off = 0; @@ -4171,11 +4172,16 @@ int BlueFS::truncate(FileWriter *h, uint64_t offset)/*_WF_L*/ return 0; } -int BlueFS::fsync(FileWriter *h, bool force_dirty)/*_WF_WD_WLD_WLNF_WNF*/ +int BlueFS::fsync(FileWriter *h)/*_WF_WD_WLD_WLNF_WNF*/ +{ + std::unique_lock hl(h->lock); + return _fsync(h, false); +} + +int BlueFS::_fsync(FileWriter *h, bool force_dirty)/*_F_D_LD_LNF_NF*/ { auto t0 = mono_clock::now(); _maybe_check_vselector_LNF(); - std::unique_lock hl(h->lock); uint64_t old_dirty_seq = 0; { dout(10) << __func__ << " " << h << " " << h->file->fnode @@ -4442,9 +4448,6 @@ int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len)/*_LF*/ }); if (r < 0) return r; - if (f->is_new_wal()) { - f->fnode.wal_limit = f->fnode.get_allocated(); - } log.t.op_file_update_inc(f->fnode); f->is_dirty = true; } @@ -4557,15 +4560,14 @@ int BlueFS::open_for_write( dout(20) << __func__ << " mapping " << dirname << "/" << filename << " vsel_hint " << file->vselector_hint << dendl; - *h = _create_writer(file); - if (boost::algorithm::ends_with(filename, ".log")) { bool use_wal_v2 = cct->_conf.get_val("bluefs_wal_v2"); if (use_wal_v2) { file->fnode.type = WAL_V2; file->is_wal_read_loaded = false; - file->wal_marker = File::WALFlush::generate_hashed_marker(super.osd_uuid, file->fnode.ino); + file->wal_marker = File::wal_flush_t::generate_hashed_marker(super.uuid, file->fnode.ino); + dout(20) << "wal v2 marker=#" << std::hex << file->wal_marker << std::dec << dendl; } (*h)->writer_type = BlueFS::WRITER_WAL; if (logger && !overwrite) { @@ -4589,7 +4591,6 @@ int BlueFS::open_for_write( } } - dout(10) << __func__ << " h " << *h << " on " << file->fnode << dendl; return 0; } @@ -4630,13 +4631,18 @@ void BlueFS::_close_writer(FileWriter *h) } void BlueFS::close_writer(FileWriter *h) { - if (h->file->is_new_wal()) { - // we force fsync by forcing dirty flag - fsync(h, true); - } - + dout(10) << __func__ << " h=" << h << " file=" << h->file << dendl; { - std::lock_guard l(h->lock); + std::unique_lock hl(h->lock); + bool force_dirty = false; + if (h->file->is_new_wal()) { + ceph_assert(h->file->fnode.type == WAL_V2); + h->file->fnode.type = WAL_V2_FIN; + // we force fsync by forcing dirty flag + force_dirty = true; + } + // don't lose data added since last fsync() + _fsync(h, force_dirty); _drain_writer(h); } delete h; @@ -4688,9 +4694,11 @@ int BlueFS::open_for_read( return -ENOENT; } File *file = q->second.get(); - + if (file->is_new_wal() && !file->is_wal_read_loaded) { + _wal_index_file(file); + } *h = new FileReader(file, random ? 4096 : cct->_conf->bluefs_max_prefetch, - random, false); + random, file->is_new_wal()); dout(10) << __func__ << " h " << *h << " on " << file->fnode << dendl; return 0; } diff --git a/src/os/bluestore/BlueFS.h b/src/os/bluestore/BlueFS.h index 4c365eb497b..c2349601813 100644 --- a/src/os/bluestore/BlueFS.h +++ b/src/os/bluestore/BlueFS.h @@ -270,34 +270,35 @@ public: MEMPOOL_CLASS_HELPERS(); /* - * WAL files in bluefs have a different format from normal ones. In order to not flush metadata + * WAL v2 files in bluefs have a different format from normal ones. In order to not flush metadata * for every write we make to data extents, we create a package/envelope around the real data - * that includes Length of the data we want to flush and a marker that identifies the flush. + * that includes length of the data we want to flush and a marker that identifies the flush. * - * The format on disk will look something like: - * legend = l = length of flush, d = data, m = marker, x=unused/used bvy other file, each character will be a byte + * The format on disk is: + * legend: l = length of flush, d = data, m = marker; each character represents one byte * - * flush 0 l==24 flush 1 l==4 flush 2 l==12 - * v v v - * llll llll dddd dddd dddd dddd dddd dddd mmmm mmmm xxxx xxx xxxx llll llll dddd mmmm mmmm xxxx llll llll dddd dddd dddd mmmm mmmm + * flush 0 l==24 flush 1 l==4 flush 2 l==12 + * v v v + * llll llll dddd dddd dddd dddd dddd dddd mmmm mmmm llll llll dddd mmmm mmmm llll llll dddd dddd dddd mmmm mmmm * */ - struct WALFlush { - typedef uint64_t WALMarker; - typedef uint64_t WALLength; - - uint64_t offset = 0; // offset of start of flush, it should be length offset - uint64_t length = 0; - - WALFlush(uint64_t offset, uint64_t length) : offset(offset), length(length) {} - + struct wal_flush_t { + typedef struct wal_marker_t { + uint8_t v[8] = {0}; + } wal_marker_t; + typedef uint64_t wal_length_t; + uint64_t file_offset = 0; + uint64_t wal_offset = 0; // offset of start of flush, it should be length offset + uint32_t header_len = 0; + uint32_t tailer_len = 0; + uint64_t wal_length = 0; static constexpr size_t header_size() { - return bluefs_wal_header_t::size(); + return sizeof(wal_length_t); } static constexpr size_t tail_size() { - return sizeof(WALMarker); + return sizeof(wal_marker_t); } uint64_t end_offset() { @@ -305,29 +306,33 @@ public: } uint64_t get_payload_offset() { - return offset + header_size(); + return wal_offset + header_size(); } uint64_t get_marker_offset() { - return get_payload_offset() + length; + return get_payload_offset() + wal_length; } static constexpr uint64_t extra_envelope_size_on_front_and_tail() { return header_size() + tail_size(); } - static uint64_t generate_hashed_marker(uuid_d uuid, uint64_t ino) { - char uuid_copy[16]; + static wal_marker_t generate_hashed_marker(uuid_d uuid, uint64_t ino) { + wal_marker_t m; + uint8_t uuid_copy[16]; memcpy(uuid_copy, uuid.bytes(), 16); - uint64_t* blocks_of_64 = (uint64_t*)&uuid_copy[0]; - for (size_t i = 0; i < (sizeof(uuid_copy) / sizeof(uint64_t)); i++) { - blocks_of_64[i] ^= ino; + uint64_t hashed_ino = ino; + hashed_ino ^= hashed_ino << 5; + hashed_ino ^= hashed_ino << 11; + hashed_ino ^= hashed_ino << 23; + // use hashed ino in a endiness-agnostic way + for (int i = 0; i < 8; i++) { + m.v[i] = uuid_copy[i] ^ uuid_copy[8 + i] ^ (hashed_ino >> (8 * i)); } - return ceph_str_hash(CEPH_STR_HASH_RJENKINS, &uuid_copy[0], sizeof(uuid_copy)); + return m; } }; - bluefs_fnode_t fnode; int refs; uint64_t dirty_seq; @@ -346,12 +351,13 @@ public: _replay, device_migrate_to_existing, device_migrate_to_new */ ceph::mutex lock = ceph::make_mutex("BlueFS::File::lock"); - bool is_wal_read_loaded; // mark whether the WAL file is ready to be read as wal_update_size was called - std::vector wal_flushes; // to keep track of the amount of flushes we performed on a WAL file + bool is_wal_read_loaded; // Before reading from WALv2 all flush envelopes must be located. + // The flag indicates whether `wal_flushes` is initialized. + std::vector wal_flushes; // to keep track of the amount of flushes we performed on a WAL file // so that we can easily recalculate real data offsets. // On "replay" this should be refilled in order to append data // correctly. Nevertheless, replayed wal file most probably won't be reused - uint64_t wal_marker; + wal_flush_t::wal_marker_t wal_marker; private: FRIEND_MAKE_REF(File); @@ -377,7 +383,8 @@ public: public: bool is_new_wal() { - return fnode.type == WAL_V2; + // checks for both WAL_V2 and WAL_V2_FIN + return (fnode.type == WAL_V2 || fnode.type == WAL_V2_FIN); } }; @@ -406,8 +413,8 @@ public: FileRef file; uint64_t pos = 0; ///< start offset for buffer - ceph::buffer::list buffer; ///< new data to write (at end of file) private: + ceph::buffer::list buffer; ///< new data to write (at end of file) ceph::buffer::list tail_block; ///< existing partial block at end of file, if any public: unsigned get_buffer_length() const { @@ -419,7 +426,7 @@ public: const unsigned length, const bluefs_super_t& super); ceph::buffer::list::page_aligned_appender buffer_appender; //< for const char* only - std::unique_ptr wal_header_filler; // To encode bluefs_wal_header_t we need to save the location of the header we want to fill + bufferlist::contiguous_filler wal_header_filler; public: int writer_type = 0; ///< WRITER_* int write_hint = WRITE_LIFE_NOT_SET; @@ -431,7 +438,7 @@ public: FileWriter(FileRef f) : file(std::move(f)), buffer_appender(buffer.get_page_aligned_appender( - g_conf()->bluefs_alloc_size / CEPH_PAGE_SIZE)), wal_header_filler(nullptr) { + g_conf()->bluefs_alloc_size / CEPH_PAGE_SIZE)), wal_header_filler() { ++file->num_writers; iocv.fill(nullptr); dirty_devs.fill(false); @@ -471,26 +478,10 @@ public: buffer_appender.append_zero(len); } - void append(uint64_t value) { - uint64_t l0 = get_buffer_length(); - ceph_assert(l0 + sizeof(value) <= std::numeric_limits::max()); - bufferlist encoded; - encode(value, encoded); - buffer_appender.append(encoded); - } - bufferlist::contiguous_filler append_hole(uint64_t len) { return buffer.append_hole(len); } - void set_wal_header_filler(std::unique_ptr filler) { - wal_header_filler.swap(filler); - } - - bufferlist::contiguous_filler* get_wal_header_filler() { - return wal_header_filler.get(); - } - uint64_t get_effective_write_pos() { return pos + buffer.length(); } @@ -682,6 +673,8 @@ private: int _flush_range_F(FileWriter *h, uint64_t offset, uint64_t length); int _flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered); int _flush_F(FileWriter *h, bool force, bool *flushed = nullptr); + int _flush_wal_F(FileWriter *h); + int _fsync(FileWriter *h, bool force_dirty); uint64_t _flush_special(FileWriter *h); #ifdef HAVE_LIBAIO @@ -735,14 +728,23 @@ private: void _flush_bdev(); // this is safe to call without a lock void _flush_bdev(std::array& dirty_bdevs); // this is safe to call without a lock - void _wal_update_size(FileRef file, uint64_t increment); - int64_t _read_wal( FileReader *h, ///< [in] read from here uint64_t offset, ///< [in] offset size_t len, ///< [in] this many bytes ceph::buffer::list *outbl, ///< [out] optional: reference the result here char *out); ///< [out] optional: or copy it here + void _wal_index_file( + FileRef file); + int _wal_seek_to( + FileReader *h, ///< [in] wal-file to read + uint64_t off, ///< [in] offset in wal datastream + File::wal_flush_t* fl);///< [out] set wal envelope params + bool _read_wal_flush( + FileReader *h, ///< [in] wal-file to read + uint64_t file_ofs, ///< [in] offset to expect envelope + uint64_t wal_ofs, ///< [in] respective offset in wal datastream + File::wal_flush_t* fl);///< [out] set wal envelope params int64_t _read( FileReader *h, ///< [in] read from here uint64_t offset, ///< [in] offset @@ -843,7 +845,7 @@ public: FileReader **h, bool random = false); - // data added after last fsync() is lost + void close_writer(FileWriter *h); int rename(std::string_view old_dir, std::string_view old_file, @@ -892,7 +894,7 @@ public: void append_try_flush(FileWriter *h, const char* buf, size_t len); void flush_range(FileWriter *h, uint64_t offset, uint64_t length); - int fsync(FileWriter *h, bool force_dirty = false); + int fsync(FileWriter *h); int64_t read(FileReader *h, uint64_t offset, size_t len, ceph::buffer::list *outbl, char *out) { // no need to hold the global lock here; we only touch h and diff --git a/src/os/bluestore/BlueRocksEnv.cc b/src/os/bluestore/BlueRocksEnv.cc index 7cbe0a1d121..fc4f60e1fc9 100644 --- a/src/os/bluestore/BlueRocksEnv.cc +++ b/src/os/bluestore/BlueRocksEnv.cc @@ -213,19 +213,14 @@ class BlueRocksWritableFile : public rocksdb::WritableFile { // size due to whole pages writes. The behavior is undefined if called // with other writes to follow. rocksdb::Status Truncate(uint64_t size) override { - // we mirror the posix env, which does nothing here; instead, it - // truncates to the final size on close. whatever! + int r = fs->truncate(h, size); + if (r < 0) { + return err_to_status(r); + } return rocksdb::Status::OK(); - //int r = fs->truncate(h, size); - // return err_to_status(r); } rocksdb::Status Close() override { - - int r = fs->truncate(h, h->pos); - if (r < 0) { - return err_to_status(r); - } fs->fsync(h); return rocksdb::Status::OK(); } diff --git a/src/os/bluestore/bluefs_types.cc b/src/os/bluestore/bluefs_types.cc index 892c157e30d..6940b092fbe 100644 --- a/src/os/bluestore/bluefs_types.cc +++ b/src/os/bluestore/bluefs_types.cc @@ -282,7 +282,6 @@ bluefs_fnode_delta_t* bluefs_fnode_t::make_delta(bluefs_fnode_delta_t* delta) { delta->extents.clear(); delta->type = type; - delta->wal_limit = wal_limit; delta->wal_size = wal_size; if (allocated_commited < allocated) { uint64_t x_off = 0; @@ -332,10 +331,12 @@ ostream& operator<<(ostream& out, const bluefs_fnode_t& file) << " allocated " << std::hex << file.allocated << std::dec << " alloc_commit " << std::hex << file.allocated_commited << std::dec << " extents " << file.extents; - if (file.type == WAL_V2) { - out << " wal_limit " << file.wal_limit << std::hex; - out << " wal_size " << file.wal_size << std::hex; - out << " type WAL_V2 " << std::dec; + if (file.type == WAL_V2 || file.type == WAL_V2_FIN) { + out << " wal_size 0x" << std::hex << file.wal_size << std::dec << std::hex; + if (file.type == WAL_V2) + out << " type WAL_V2 " << std::dec; + if (file.type == WAL_V2_FIN) + out << " type WAL_V2_FIN " << std::dec; } out << ")"; return out; @@ -345,12 +346,20 @@ ostream& operator<<(ostream& out, const bluefs_fnode_t& file) std::ostream& operator<<(std::ostream& out, const bluefs_fnode_delta_t& delta) { - return out << "delta(ino " << delta.ino - << " size 0x" << std::hex << delta.size << std::dec - << " mtime " << delta.mtime - << " offset " << std::hex << delta.offset << std::dec - << " extents " << delta.extents - << ")"; + out << "delta(ino " << delta.ino + << " size 0x" << std::hex << delta.size << std::dec + << " mtime " << delta.mtime + << " offset " << std::hex << delta.offset << std::dec + << " extents " << delta.extents; + if (delta.type == WAL_V2 || delta.type == WAL_V2_FIN) { + out << " wal_size 0x" << std::hex << delta.wal_size << std::dec << std::hex; + if (delta.type == WAL_V2) + out << " type WAL_V2" << std::dec; + if (delta.type == WAL_V2_FIN) + out << " type WAL_V2_FIN" << std::dec; + } + out << ")"; + return out; } // bluefs_transaction_t @@ -430,30 +439,3 @@ ostream& operator<<(ostream& out, const bluefs_transaction_t& t) << " crc 0x" << t.op_bl.crc32c(-1) << std::dec << ")"; } - -void bluefs_wal_header_t::bound_encode(size_t &s) const { - s += 1; // version - s += 1; // compat - s += 4; // size - denc(flush_length, s); -} - -void bluefs_wal_header_t::encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(flush_length, bl); - ENCODE_FINISH(bl); -} - -void bluefs_wal_header_t::encode(bufferlist::contiguous_filler& filler_in) const { - ENCODE_START_FILLER(1, 1, filler_in); - ceph_le64 flush_length_le(flush_length); - filler_in.copy_in(sizeof(flush_length_le), (char*)&flush_length_le); - ENCODE_FINISH_FILLER(); -} - -void bluefs_wal_header_t::decode(bufferlist::const_iterator& p) -{ - DECODE_START(1, p); - decode(flush_length, p); - DECODE_FINISH(p); -} diff --git a/src/os/bluestore/bluefs_types.h b/src/os/bluestore/bluefs_types.h index 610ba35f240..ad1e5a31f3c 100644 --- a/src/os/bluestore/bluefs_types.h +++ b/src/os/bluestore/bluefs_types.h @@ -36,7 +36,8 @@ WRITE_CLASS_DENC(bluefs_extent_t) enum bluefs_node_type { REGULAR = 0, - WAL_V2 = 1, + WAL_V2 = 1, // WAL_V2 that is open for write + WAL_V2_FIN = 2, // WAL_V2 that we are done writing to; there is no data in [onode.size ... allocated) NODE_TYPE_END = 0x100, }; @@ -82,63 +83,52 @@ struct bluefs_fnode_delta_t { // Used for consistency checking. // only relevant in case of wal node - uint64_t wal_limit; - uint64_t wal_size; uint8_t type = REGULAR; - + uint64_t wal_size; // The size of payload in the file; size = wal_size + n * envelope_size mempool::bluefs::vector extents; DENC_HELPERS void bound_encode(size_t& p) const { - _denc_friend(*this, p); + uint8_t version = 1, compat = 1; + if (type == WAL_V2 || type == WAL_V2_FIN) { + version = 2; + compat = 2; + } + DENC_START_UNSAFE(version, compat, p); + _denc_friend(*this, p, version); + DENC_FINISH(p); } void encode(ceph::buffer::list::contiguous_appender& p) const { DENC_DUMP_PRE(bluefs_fnode_t); - _denc_friend(*this, p); + uint8_t version = 1, compat = 1; + if (type == WAL_V2 || type == WAL_V2_FIN) { + version = 2; + compat = 2; + } + DENC_START_UNSAFE(version, compat, p); + _denc_friend(*this, p, version); + DENC_FINISH(p); } void decode(ceph::buffer::ptr::const_iterator& p) { - DENC_START_UNSAFE(2, 2, p); - denc_varint(ino, p); - denc_varint(size, p); - denc(mtime, p); - denc(offset, p); - denc(extents, p); - - if (struct_v >= 2) { - denc(type, p); - denc(wal_limit, p); - denc(wal_size, p); - } + DENC_START_UNSAFE(2, !"value unused in decode", p); + _denc_friend(*this, p, struct_v); DENC_FINISH(p); - } template friend std::enable_if_t>> - _denc_friend(T& v, P& p) { - uint8_t version = 1, compat = 1; - if (v.type == WAL_V2) { - version = 2; - compat = 2; - } - DENC_START_UNSAFE(version, compat, p); - + _denc_friend(T& v, P& p, __u8& struct_v) { denc_varint(v.ino, p); denc_varint(v.size, p); denc(v.mtime, p); denc(v.offset, p); denc(v.extents, p); - if (struct_v >= 2) { - denc(v.type, p); - denc(v.wal_limit, p); - denc(v.wal_size, p); + denc_varint(v.type, p); + denc_varint(v.wal_size, p); } - - DENC_FINISH(p); - } }; WRITE_CLASS_DENC(bluefs_fnode_delta_t) @@ -159,17 +149,16 @@ struct bluefs_fnode_t { uint64_t allocated; uint64_t allocated_commited; uint8_t type = REGULAR; - uint64_t wal_limit; // EOF of wal, this limit represents upper limit of fnode.size, not upper limit of wal_size - uint64_t wal_size; // Amount of payload bytes in WAL(not including envelope data), there could be more on power off instances, in range of wal_size~wal_limit + uint64_t wal_size; // Amount of payload bytes in WAL(not including envelope data), there could be more on power off instances, in range of fnode.size~wal_limit - bluefs_fnode_t() : ino(0), size(0), allocated(0), allocated_commited(0), wal_limit(0), wal_size(0) {} + bluefs_fnode_t() : ino(0), size(0), allocated(0), allocated_commited(0), wal_size(0) {} bluefs_fnode_t(uint64_t _ino, uint64_t _size, utime_t _mtime) : - ino(_ino), size(_size), mtime(_mtime), allocated(0), allocated_commited(0), wal_limit(0), wal_size(0) {} + ino(_ino), size(_size), mtime(_mtime), allocated(0), allocated_commited(0), wal_size(0) {} bluefs_fnode_t(const bluefs_fnode_t& other) : ino(other.ino), size(other.size), mtime(other.mtime), allocated(other.allocated), allocated_commited(other.allocated_commited), - wal_limit(other.wal_limit), + type(other.type), wal_size(other.wal_size) { clone_extents(other); } @@ -191,53 +180,46 @@ struct bluefs_fnode_t { DENC_HELPERS void bound_encode(size_t& p) const { - _denc_friend(*this, p); + uint8_t version = 1, compat = 1; + if (type == WAL_V2 || type == WAL_V2_FIN) { + version = 2; + compat = 2; + } + DENC_START_UNSAFE(version, compat, p); + _denc_friend(*this, p, version); + DENC_FINISH(p); } void encode(ceph::buffer::list::contiguous_appender& p) const { DENC_DUMP_PRE(bluefs_fnode_t); - _denc_friend(*this, p); + uint8_t version = 1, compat = 1; + if (type == WAL_V2 || type == WAL_V2_FIN) { + version = 2; + compat = 2; + } + DENC_START_UNSAFE(version, compat, p); + _denc_friend(*this, p, version); + DENC_FINISH(p); } void decode(ceph::buffer::ptr::const_iterator& p) { - DENC_START_COMPAT_2(2, 2, p); - denc_varint(ino, p); - denc_varint(size, p); - denc(mtime, p); - denc(__unused__, p); - denc(extents, p); - if (struct_v >= 2) { - denc(type, p); - denc(wal_limit, p); - denc(wal_size, p); - } - if (struct_v == 1) { - type = REGULAR; - } + DENC_START_UNSAFE(2, !"value unused in decode", p); + _denc_friend(*this, p, struct_v); DENC_FINISH(p); recalc_allocated(); } template friend std::enable_if_t>> - _denc_friend(T& v, P& p) { - - uint8_t version = 1, compat = 1; - if (v.type == WAL_V2) { - version = 2; - compat = 2; - } - DENC_START_UNSAFE(version, compat, p); + _denc_friend(T& v, P& p, __u8& struct_v) { denc_varint(v.ino, p); denc_varint(v.size, p); denc(v.mtime, p); denc(v.__unused__, p); denc(v.extents, p); if (struct_v >= 2) { - denc(v.type, p); - denc(v.wal_limit, p); - denc(v.wal_size, p); + denc_varint(v.type, p); + denc_varint(v.wal_size, p); } - DENC_FINISH(p); } void reset_delta() { allocated_commited = allocated; @@ -459,19 +441,4 @@ WRITE_CLASS_ENCODER(bluefs_transaction_t) std::ostream& operator<<(std::ostream& out, const bluefs_transaction_t& t); - - -struct bluefs_wal_header_t { - uint64_t flush_length; - - bluefs_wal_header_t() : flush_length(0) {} - bluefs_wal_header_t(uint64_t flush_length) : flush_length(flush_length) {} - static constexpr size_t size() { return (sizeof(__u8)*2) + sizeof(uint32_t) + sizeof(uint64_t); } - void bound_encode(size_t &s) const; - void encode(ceph::buffer::list& bl) const; - void encode(ceph::buffer::list::contiguous_filler& filler_in) const; - void decode(ceph::buffer::list::const_iterator& p); -}; -WRITE_CLASS_ENCODER(bluefs_wal_header_t) - #endif