journal/segmented_journal.cc
journal/segment_allocator.cc
journal/record_submitter.cc
+ journal/circular_journal_space.cc
journal.cc
device.cc
segment_manager_group.cc
#include "crimson/os/seastore/async_cleaner.h"
#include "crimson/os/seastore/journal/circular_bounded_journal.h"
#include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/journal/circular_journal_space.h"
SET_SUBSYS(seastore_journal);
namespace crimson::os::seastore::journal {
-std::ostream &operator<<(std::ostream &out,
- const CircularBoundedJournal::cbj_header_t &header)
-{
- return out << "cbj_header_t("
- << ", dirty_tail=" << header.dirty_tail
- << ", alloc_tail=" << header.alloc_tail
- << ")";
-}
-
CircularBoundedJournal::CircularBoundedJournal(
JournalTrimmer &trimmer,
RBMDevice* device,
const std::string &path)
- : trimmer(trimmer), device(device), path(path) {}
-
-ceph::bufferlist CircularBoundedJournal::encode_header()
-{
- bufferlist bl;
- encode(header, bl);
- auto header_crc_filler = bl.append_hole(sizeof(checksum_t));
- auto bliter = bl.cbegin();
- auto header_crc = bliter.crc32c(
- ceph::encoded_sizeof_bounded<cbj_header_t>(),
- -1);
- ceph_le32 header_crc_le;
- header_crc_le = header_crc;
- header_crc_filler.copy_in(
- sizeof(checksum_t),
- reinterpret_cast<const char *>(&header_crc_le));
- return bl;
-}
+ : trimmer(trimmer), path(path),
+ cjs(device),
+ record_submitter(crimson::common::get_conf<uint64_t>(
+ "seastore_journal_iodepth_limit"),
+ crimson::common::get_conf<uint64_t>(
+ "seastore_journal_batch_capacity"),
+ crimson::common::get_conf<Option::size_t>(
+ "seastore_journal_batch_flush_size"),
+ crimson::common::get_conf<double>(
+ "seastore_journal_batch_preferred_fullness"),
+ cjs)
+ {}
CircularBoundedJournal::open_for_mkfs_ret
CircularBoundedJournal::open_for_mkfs()
{
- LOG_PREFIX(CircularBoundedJournal::open_for_mkfs);
- assert(device);
- ceph::bufferlist bl;
- CircularBoundedJournal::cbj_header_t head;
- assert(device->get_journal_size());
- head.dirty_tail =
- journal_seq_t{0,
- convert_abs_addr_to_paddr(
- get_records_start(),
- device->get_device_id())};
- head.alloc_tail = head.dirty_tail;
- encode(head, bl);
- header = head;
- set_written_to(head.dirty_tail);
- initialized = true;
- DEBUG(
- "initialize header block in CircularBoundedJournal, length {}",
- bl.length());
- return write_header(
- ).safe_then([this]() {
- return open_for_mount();
- }).handle_error(
- open_for_mkfs_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error write_header"
- }
- );
+ return record_submitter.open(true
+ ).safe_then([this](auto ret) {
+ record_submitter.update_committed_to(get_written_to());
+ return open_for_mkfs_ret(
+ open_for_mkfs_ertr::ready_future_marker{},
+ get_written_to());
+ });
}
CircularBoundedJournal::open_for_mount_ret
CircularBoundedJournal::open_for_mount()
{
- ceph_assert(initialized);
- if (written_to.segment_seq == NULL_SEG_SEQ) {
- written_to.segment_seq = 0;
- }
- return open_for_mount_ret(
- open_for_mount_ertr::ready_future_marker{},
- get_written_to());
+ return record_submitter.open(false
+ ).safe_then([this](auto ret) {
+ record_submitter.update_committed_to(get_written_to());
+ return open_for_mount_ret(
+ open_for_mount_ertr::ready_future_marker{},
+ get_written_to());
+ });
}
CircularBoundedJournal::close_ertr::future<> CircularBoundedJournal::close()
{
- return write_header(
- ).safe_then([this]() -> close_ertr::future<> {
- initialized = false;
- return close_ertr::now();
- }).handle_error(
- open_for_mount_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error write_header"
- }
- );
+ return record_submitter.close();
}
-CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
- record_t &&record,
- OrderingHandle &handle)
+CircularBoundedJournal::submit_record_ret
+CircularBoundedJournal::submit_record(
+ record_t &&record,
+ OrderingHandle &handle)
{
LOG_PREFIX(CircularBoundedJournal::submit_record);
+ DEBUG("H{} {} start ...", (void*)&handle, record);
assert(write_pipeline);
- assert(written_to.segment_seq != NULL_SEG_SEQ);
- auto r_size = record_group_size_t(record.size, get_block_size());
- auto encoded_size = r_size.get_encoded_length();
- if (encoded_size > get_records_available_size()) {
- ERROR("record size {}, but available size {}",
- encoded_size, get_records_available_size());
- return crimson::ct_error::erange::make();
- }
- if (encoded_size + get_rbm_addr(get_written_to()) > get_journal_end()) {
- DEBUG("roll");
- paddr_t paddr = convert_abs_addr_to_paddr(
- get_records_start(),
- get_device_id());
- set_written_to(
- journal_seq_t{++written_to.segment_seq, paddr});
- if (encoded_size > get_records_available_size()) {
- ERROR("rolled, record size {}, but available size {}",
- encoded_size, get_records_available_size());
- return crimson::ct_error::erange::make();
- }
- }
-
- journal_seq_t j_seq = get_written_to();
- ceph::bufferlist to_write = encode_record(
- std::move(record), device->get_block_size(),
- j_seq, 0);
- assert(to_write.length() == encoded_size);
- auto target = get_rbm_addr(get_written_to());
- auto new_written_to = target + encoded_size;
- if (new_written_to >= get_journal_end()) {
- assert(new_written_to == get_journal_end());
- DEBUG("roll");
- paddr_t paddr = convert_abs_addr_to_paddr(
- get_records_start(),
- get_device_id());
- set_written_to(
- journal_seq_t{++written_to.segment_seq, paddr});
- } else {
- paddr_t paddr = convert_abs_addr_to_paddr(
- new_written_to,
- get_device_id());
- set_written_to(
- journal_seq_t{written_to.segment_seq, paddr});
- }
- DEBUG("{}, target {}", r_size, target);
-
- auto write_result = write_result_t{
- j_seq,
- encoded_size
- };
- auto write_fut = device_write_bl(target, to_write);
- return handle.enter(write_pipeline->device_submission
- ).then([write_fut = std::move(write_fut)]() mutable {
- return std::move(write_fut);
- }).safe_then([this, &handle] {
- return handle.enter(write_pipeline->finalize);
- }).safe_then([this, target,
- length=encoded_size,
- write_result,
- r_size,
- FNAME] {
- DEBUG("commit target {} used_size {} written length {}",
- target, get_records_used_size(), length);
-
- paddr_t paddr = convert_abs_addr_to_paddr(
- target + r_size.get_mdlength(),
- get_device_id());
- auto submit_result = record_locator_t{
- paddr,
- write_result
- };
- trimmer.set_journal_head(write_result.start_seq);
- return submit_result;
- });
+ return do_submit_record(std::move(record), handle);
}
-CircularBoundedJournal::write_ertr::future<> CircularBoundedJournal::device_write_bl(
- rbm_abs_addr offset, bufferlist &bl)
+CircularBoundedJournal::submit_record_ret
+CircularBoundedJournal::do_submit_record(
+ record_t &&record,
+ OrderingHandle &handle)
{
- LOG_PREFIX(CircularBoundedJournal::device_write_bl);
- auto length = bl.length();
- if (offset + length > get_journal_end()) {
- return crimson::ct_error::erange::make();
+ LOG_PREFIX(CircularBoundedJournal::do_submit_record);
+ if (!record_submitter.is_available()) {
+ DEBUG("H{} wait ...", (void*)&handle);
+ return record_submitter.wait_available(
+ ).safe_then([this, record=std::move(record), &handle]() mutable {
+ return do_submit_record(std::move(record), handle);
+ });
+ }
+ auto action = record_submitter.check_action(record.size);
+ if (action == RecordSubmitter::action_t::ROLL) {
+ return record_submitter.roll_segment(
+ ).safe_then([this, record=std::move(record), &handle]() mutable {
+ return do_submit_record(std::move(record), handle);
+ });
}
- DEBUG(
- "overwrite in CircularBoundedJournal, offset {}, length {}",
- offset,
- length);
- return device->writev(offset, bl
- ).handle_error(
- write_ertr::pass_further{},
- crimson::ct_error::assert_all{ "Invalid error device->write" }
- );
-}
-CircularBoundedJournal::read_header_ret
-CircularBoundedJournal::read_header()
-{
- LOG_PREFIX(CircularBoundedJournal::read_header);
- assert(device);
- auto bptr = bufferptr(ceph::buffer::create_page_aligned(
- device->get_block_size()));
- DEBUG("reading {}", device->get_journal_start());
- return device->read(device->get_journal_start(), bptr
- ).safe_then([bptr, FNAME]() mutable
- -> read_header_ret {
- bufferlist bl;
- bl.append(bptr);
- auto bp = bl.cbegin();
- cbj_header_t cbj_header;
- try {
- decode(cbj_header, bp);
- } catch (ceph::buffer::error &e) {
- ERROR("unable to read header block");
- return crimson::ct_error::enoent::make();
- }
- auto bliter = bl.cbegin();
- auto test_crc = bliter.crc32c(
- ceph::encoded_sizeof_bounded<cbj_header_t>(),
- -1);
- ceph_le32 recorded_crc_le;
- decode(recorded_crc_le, bliter);
- uint32_t recorded_crc = recorded_crc_le;
- if (test_crc != recorded_crc) {
- ERROR("error, header crc mismatch.");
- return read_header_ret(
- read_header_ertr::ready_future_marker{},
- std::nullopt);
- }
- return read_header_ret(
- read_header_ertr::ready_future_marker{},
- std::make_pair(cbj_header, bl)
- );
+ DEBUG("H{} submit {} ...",
+ (void*)&handle,
+ action == RecordSubmitter::action_t::SUBMIT_FULL ?
+ "FULL" : "NOT_FULL");
+ auto submit_fut = record_submitter.submit(std::move(record));
+ return handle.enter(write_pipeline->device_submission
+ ).then([submit_fut=std::move(submit_fut)]() mutable {
+ return std::move(submit_fut);
+ }).safe_then([FNAME, this, &handle](record_locator_t result) {
+ return handle.enter(write_pipeline->finalize
+ ).then([FNAME, this, result, &handle] {
+ DEBUG("H{} finish with {}", (void*)&handle, result);
+ auto new_committed_to = result.write_result.get_end_seq();
+ record_submitter.update_committed_to(new_committed_to);
+ return result;
+ });
});
}
* read records from last applied record prior to written_to, and replay
*/
LOG_PREFIX(CircularBoundedJournal::replay);
- return read_header(
+ return cjs.read_header(
).handle_error(
open_for_mount_ertr::pass_further{},
crimson::ct_error::assert_all{
}).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p)
mutable {
auto &[head, bl] = *p;
- header = head;
- DEBUG("header : {}", header);
- initialized = true;
+ cjs.set_cbj_header(head);
+ DEBUG("header : {}", cjs.get_cbj_header());
+ cjs.set_initialized(true);
return seastar::do_with(
std::move(delta_handler),
std::map<paddr_t, journal_seq_t>(),
}
return replay_ertr::make_ready_future<bool>(true);
};
- written_to.segment_seq = NULL_SEG_SEQ;
auto tail = get_dirty_tail() <= get_alloc_tail() ?
get_dirty_tail() : get_alloc_tail();
set_written_to(tail);
return d_handler(
offsets,
e,
- header.dirty_tail,
- header.alloc_tail,
+ get_dirty_tail(),
+ get_alloc_tail(),
modify_time
);
}
return scan_valid_record_delta(std::move(call_d_handler_if_valid), tail);
});
}).safe_then([this]() {
+ record_submitter.update_committed_to(get_written_to());
trimmer.update_journal_tails(
- header.dirty_tail,
- header.alloc_tail);
+ get_dirty_tail(),
+ get_alloc_tail());
});
});
}
assert(addr + read_length <= get_journal_end());
DEBUG("reading record from abs addr {} read length {}", addr, read_length);
auto bptr = bufferptr(ceph::buffer::create_page_aligned(read_length));
- return device->read(addr, bptr
+ return cjs.read(addr, bptr
).safe_then([this, addr, bptr, expected_seq, FNAME]() mutable
-> read_record_ret {
record_group_header_t h;
auto next_bptr = bufferptr(ceph::buffer::create_page_aligned(next_length));
DEBUG("reading record part 2 from abs addr {} read length {}",
next_addr, next_length);
- return device->read(next_addr, next_bptr
+ return cjs.read(next_addr, next_bptr
).safe_then([this, h, next_bptr=std::move(next_bptr), bl=std::move(bl)]() mutable {
bl.append(next_bptr);
return return_record(h, bl);
});
}
-CircularBoundedJournal::write_ertr::future<>
-CircularBoundedJournal::write_header()
-{
- LOG_PREFIX(CircularBoundedJournal::write_header);
- ceph::bufferlist bl = encode_header();
- ceph_assert(bl.length() <= get_block_size());
- DEBUG(
- "sync header of CircularBoundedJournal, length {}",
- bl.length());
- assert(device);
- auto iter = bl.begin();
- assert(bl.length() < get_block_size());
- bufferptr bp = bufferptr(ceph::buffer::create_page_aligned(get_block_size()));
- iter.copy(bl.length(), bp.c_str());
- return device->write(device->get_journal_start(), std::move(bp)
- ).handle_error(
- write_ertr::pass_further{},
- crimson::ct_error::assert_all{ "Invalid error device->write" }
- );
-}
seastar::future<> CircularBoundedJournal::finish_commit(transaction_type_t type) {
if (is_trim_transaction(type)) {
return update_journal_tail(
return seastar::now();
}
-
}
#include "crimson/os/seastore/random_block_manager.h"
#include "crimson/os/seastore/random_block_manager/rbm_device.h"
#include <list>
+#include "crimson/os/seastore/journal/record_submitter.h"
+#include "crimson/os/seastore/journal/circular_journal_space.h"
namespace crimson::os::seastore::journal {
replay_ret replay(delta_handler_t &&delta_handler) final;
- struct cbj_header_t;
- using write_ertr = submit_record_ertr;
- /*
- * device_write_bl
+ rbm_abs_addr get_rbm_addr(journal_seq_t seq) const {
+ return convert_paddr_to_abs_addr(seq.offset);
+ }
+
+ /**
*
- * @param device address to write
- * @param bufferlist to write
+ * CircularBoundedJournal write
+ *
+ * NVMe will support a large block write (< 512KB) with atomic write unit command.
+ * With this command, we expect that the most of incoming data can be stored
+ * as a single write call, which has lower overhead than existing
+ * way that uses a combination of system calls such as write() and sync().
*
*/
- write_ertr::future<> device_write_bl(rbm_abs_addr offset, ceph::bufferlist &bl);
+
+ seastar::future<> update_journal_tail(
+ journal_seq_t dirty,
+ journal_seq_t alloc) {
+ return cjs.update_journal_tail(dirty, alloc);
+ }
+ journal_seq_t get_dirty_tail() const {
+ return cjs.get_dirty_tail();
+ }
+ journal_seq_t get_alloc_tail() const {
+ return cjs.get_alloc_tail();
+ }
using read_ertr = crimson::errorator<
crimson::ct_error::input_output_error,
using read_record_ret = read_record_ertr::future<
std::optional<std::pair<record_group_header_t, bufferlist>>
>;
- using read_header_ertr = read_ertr;
- using read_header_ret = read_header_ertr::future<
- std::optional<std::pair<cbj_header_t, bufferlist>>
- >;
/*
* read_record
*
*
*/
read_record_ret read_record(paddr_t offset, segment_seq_t expected_seq);
- /*
- * read_header
- *
- * read header block from given absolute address
- *
- * @param absolute address
- *
- */
- read_header_ret read_header();
-
- ceph::bufferlist encode_header();
-
-
- /**
- * CircularBoundedJournal structure
- *
- * +-------------------------------------------------------+
- * | header | record | record | record | record | ... |
- * +-------------------------------------------------------+
- * ^-----------block aligned-----------------^
- * <----fixed---->
- */
-
-
- /**
- *
- * CircularBoundedJournal write
- *
- * NVMe will support a large block write (< 512KB) with atomic write unit command.
- * With this command, we expect that the most of incoming data can be stored
- * as a single write call, which has lower overhead than existing
- * way that uses a combination of system calls such as write() and sync().
- *
- */
-
- struct cbj_header_t {
- // start offset of CircularBoundedJournal in the device
- journal_seq_t dirty_tail;
- journal_seq_t alloc_tail;
-
- DENC(cbj_header_t, v, p) {
- DENC_START(1, 1, p);
- denc(v.dirty_tail, p);
- denc(v.alloc_tail, p);
- DENC_FINISH(p);
- }
- };
-
- /**
- *
- * Write position for CircularBoundedJournal
- *
- * | written to rbm | written length to CircularBoundedJournal | new write |
- * ----------------->------------------------------------------------>
- * ^ ^
- * applied_to written_to
- *
- */
-
- seastar::future<> update_journal_tail(
- journal_seq_t dirty,
- journal_seq_t alloc) {
- header.dirty_tail = dirty;
- header.alloc_tail = alloc;
- return write_header(
- ).handle_error(
- crimson::ct_error::assert_all{
- "encountered invalid error in update_journal_tail"
- });
- }
- journal_seq_t get_dirty_tail() const {
- return header.dirty_tail;
- }
- journal_seq_t get_alloc_tail() const {
- return header.alloc_tail;
- }
-
- write_ertr::future<> write_header();
read_record_ret return_record(record_group_header_t& header, bufferlist bl);
write_pipeline = _write_pipeline;
}
- journal_seq_t get_written_to() const {
- return written_to;
- }
- rbm_abs_addr get_rbm_addr(journal_seq_t seq) const {
- return convert_paddr_to_abs_addr(seq.offset);
- }
- void set_written_to(journal_seq_t seq) {
- rbm_abs_addr addr = convert_paddr_to_abs_addr(seq.offset);
- assert(addr >= get_records_start());
- assert(addr < get_journal_end());
- written_to = seq;
- }
device_id_t get_device_id() const {
- return device->get_device_id();
+ return cjs.get_device_id();
}
extent_len_t get_block_size() const {
- assert(device);
- return device->get_block_size();
+ return cjs.get_block_size();
}
- /*
- Size-related interfaces
- +---------------------------------------------------------+
- | header | record | record | record | record | ... |
- +---------------------------------------------------------+
- ^ ^ ^
- | | |
- get_journal_start | get_journal_end
- get_records_start
- <-- get_records_total_size + block_size -->
- <--------------- get_journal_size ------------------------>
- */
-
- size_t get_records_used_size() const {
- auto rbm_written_to = get_rbm_addr(get_written_to());
- auto rbm_tail = get_rbm_addr(get_dirty_tail());
- return rbm_written_to >= rbm_tail ?
- rbm_written_to - rbm_tail :
- rbm_written_to + get_records_total_size() + get_block_size()
- - rbm_tail;
- }
- size_t get_records_total_size() const {
- assert(device);
- // a block is for header and a block is reserved to denote the end
- return device->get_journal_size() - (2 * get_block_size());
- }
- rbm_abs_addr get_records_start() const {
- assert(device);
- return device->get_journal_start() + get_block_size();
+ rbm_abs_addr get_journal_end() const {
+ return cjs.get_journal_end();
}
- size_t get_records_available_size() const {
- return get_records_total_size() - get_records_used_size();
+
+ void set_written_to(journal_seq_t seq) {
+ cjs.set_written_to(seq);
}
- bool is_available_size(uint64_t size) {
- auto rbm_written_to = get_rbm_addr(get_written_to());
- auto rbm_tail = get_rbm_addr(get_dirty_tail());
- if (rbm_written_to > rbm_tail &&
- (get_journal_end() - rbm_written_to) < size &&
- size > (get_records_used_size() -
- (get_journal_end() - rbm_written_to))) {
- return false;
- }
- return get_records_available_size() >= size;
+
+ journal_seq_t get_written_to() {
+ return cjs.get_written_to();
}
- rbm_abs_addr get_journal_end() const {
- assert(device);
- return device->get_journal_start() + device->get_journal_size();
+
+ rbm_abs_addr get_records_start() const {
+ return cjs.get_records_start();
}
+
seastar::future<> finish_commit(transaction_type_t type) final;
using cbj_delta_handler_t = std::function<
cbj_delta_handler_t &&delta_handler,
journal_seq_t tail);
+ submit_record_ret do_submit_record(record_t &&record, OrderingHandle &handle);
+
private:
- cbj_header_t header;
JournalTrimmer &trimmer;
- RBMDevice* device;
std::string path;
WritePipeline *write_pipeline = nullptr;
/**
// should be in range [get_records_start(), get_journal_end())
// written_to.segment_seq is circulation seq to track
// the sequence to written records
- journal_seq_t written_to;
+ CircularJournalSpace cjs;
+ RecordSubmitter record_submitter;
};
-std::ostream &operator<<(std::ostream &out, const CircularBoundedJournal::cbj_header_t &header);
-
}
-WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal::CircularBoundedJournal::cbj_header_t)
-
-#if FMT_VERSION >= 90000
-template <> struct fmt::formatter<crimson::os::seastore::journal::CircularBoundedJournal::cbj_header_t> : fmt::ostream_formatter {};
-#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "circular_journal_space.h"
+
+#include <fmt/format.h>
+#include <fmt/os.h>
+
+#include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/async_cleaner.h"
+#include "crimson/os/seastore/journal/circular_bounded_journal.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore::journal {
+
+std::ostream &operator<<(std::ostream &out,
+ const CircularJournalSpace::cbj_header_t &header)
+{
+ return out << "cbj_header_t("
+ << ", dirty_tail=" << header.dirty_tail
+ << ", alloc_tail=" << header.alloc_tail
+ << ")";
+}
+
+CircularJournalSpace::CircularJournalSpace(RBMDevice * device) : device(device) {}
+
+bool CircularJournalSpace::needs_roll(std::size_t length) const {
+ if (length + get_rbm_addr(get_written_to()) > get_journal_end()) {
+ return true;
+ }
+ return false;
+}
+
+extent_len_t CircularJournalSpace::get_block_size() const {
+ return device->get_block_size();
+}
+
+CircularJournalSpace::roll_ertr::future<> CircularJournalSpace::roll() {
+ paddr_t paddr = convert_abs_addr_to_paddr(
+ get_records_start(),
+ get_device_id());
+ auto seq = get_written_to();
+ set_written_to(
+ journal_seq_t{++seq.segment_seq, paddr});
+ return roll_ertr::now();
+}
+
+CircularJournalSpace::write_ret
+CircularJournalSpace::write(ceph::bufferlist&& to_write) {
+ LOG_PREFIX(CircularJournalSpace::write);
+ assert(get_written_to().segment_seq != NULL_SEG_SEQ);
+ auto encoded_size = to_write.length();
+ if (encoded_size > get_records_available_size()) {
+ ceph_abort("should be impossible with EPM reservation");
+ }
+ assert(encoded_size + get_rbm_addr(get_written_to())
+ < get_journal_end());
+
+ journal_seq_t j_seq = get_written_to();
+ auto target = get_rbm_addr(get_written_to());
+ auto new_written_to = target + encoded_size;
+ assert(new_written_to < get_journal_end());
+ paddr_t paddr = convert_abs_addr_to_paddr(
+ new_written_to,
+ get_device_id());
+ set_written_to(
+ journal_seq_t{get_written_to().segment_seq, paddr});
+ DEBUG("{}, target {}", to_write.length(), target);
+
+ auto write_result = write_result_t{
+ j_seq,
+ encoded_size
+ };
+ return device_write_bl(target, to_write
+ ).safe_then([this, target,
+ length=encoded_size,
+ write_result,
+ FNAME] {
+ DEBUG("commit target {} used_size {} written length {}",
+ target, get_records_used_size(), length);
+ return write_result;
+ }).handle_error(
+ base_ertr::pass_further{},
+ crimson::ct_error::assert_all{ "Invalid error" }
+ );
+}
+
+CircularJournalSpace::open_ret CircularJournalSpace::open(bool is_mkfs) {
+ std::ostringstream oss;
+ oss << device_id_printer_t{get_device_id()};
+ print_name = oss.str();
+
+ if (is_mkfs) {
+ LOG_PREFIX(CircularJournalSpace::open);
+ assert(device);
+ ceph::bufferlist bl;
+ CircularJournalSpace::cbj_header_t head;
+ assert(device->get_journal_size());
+ head.dirty_tail =
+ journal_seq_t{0,
+ convert_abs_addr_to_paddr(
+ get_records_start(),
+ device->get_device_id())};
+ head.alloc_tail = head.dirty_tail;
+ encode(head, bl);
+ header = head;
+ set_written_to(head.dirty_tail);
+ initialized = true;
+ DEBUG(
+ "initialize header block in CircularJournalSpace length {}",
+ bl.length());
+ return write_header(
+ ).safe_then([this]() {
+ return open_ret(
+ open_ertr::ready_future_marker{},
+ get_written_to());
+ }).handle_error(
+ open_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error write_header"
+ }
+ );
+ }
+ ceph_assert(initialized);
+ if (written_to.segment_seq == NULL_SEG_SEQ) {
+ written_to.segment_seq = 0;
+ }
+ return open_ret(
+ open_ertr::ready_future_marker{},
+ get_written_to());
+}
+
+ceph::bufferlist CircularJournalSpace::encode_header()
+{
+ bufferlist bl;
+ encode(header, bl);
+ auto header_crc_filler = bl.append_hole(sizeof(checksum_t));
+ auto bliter = bl.cbegin();
+ auto header_crc = bliter.crc32c(
+ ceph::encoded_sizeof_bounded<cbj_header_t>(),
+ -1);
+ ceph_le32 header_crc_le;
+ header_crc_le = header_crc;
+ header_crc_filler.copy_in(
+ sizeof(checksum_t),
+ reinterpret_cast<const char *>(&header_crc_le));
+ return bl;
+}
+
+CircularJournalSpace::write_ertr::future<> CircularJournalSpace::device_write_bl(
+ rbm_abs_addr offset, bufferlist &bl)
+{
+ LOG_PREFIX(CircularJournalSpace::device_write_bl);
+ auto length = bl.length();
+ if (offset + length > get_journal_end()) {
+ return crimson::ct_error::erange::make();
+ }
+ DEBUG(
+ "overwrite in CircularJournalSpace, offset {}, length {}",
+ offset,
+ length);
+ return device->writev(offset, bl
+ ).handle_error(
+ write_ertr::pass_further{},
+ crimson::ct_error::assert_all{ "Invalid error device->write" }
+ );
+}
+
+CircularJournalSpace::read_header_ret
+CircularJournalSpace::read_header()
+{
+ LOG_PREFIX(CircularJournalSpace::read_header);
+ assert(device);
+ auto bptr = bufferptr(ceph::buffer::create_page_aligned(
+ device->get_block_size()));
+ DEBUG("reading {}", device->get_journal_start());
+ return device->read(device->get_journal_start(), bptr
+ ).safe_then([bptr, FNAME]() mutable
+ -> read_header_ret {
+ bufferlist bl;
+ bl.append(bptr);
+ auto bp = bl.cbegin();
+ cbj_header_t cbj_header;
+ try {
+ decode(cbj_header, bp);
+ } catch (ceph::buffer::error &e) {
+ ERROR("unable to read header block");
+ return crimson::ct_error::enoent::make();
+ }
+ auto bliter = bl.cbegin();
+ auto test_crc = bliter.crc32c(
+ ceph::encoded_sizeof_bounded<cbj_header_t>(),
+ -1);
+ ceph_le32 recorded_crc_le;
+ decode(recorded_crc_le, bliter);
+ uint32_t recorded_crc = recorded_crc_le;
+ if (test_crc != recorded_crc) {
+ ERROR("error, header crc mismatch.");
+ return read_header_ret(
+ read_header_ertr::ready_future_marker{},
+ std::nullopt);
+ }
+ return read_header_ret(
+ read_header_ertr::ready_future_marker{},
+ std::make_pair(cbj_header, bl)
+ );
+ });
+}
+
+CircularJournalSpace::write_ertr::future<>
+CircularJournalSpace::write_header()
+{
+ LOG_PREFIX(CircularJournalSpace::write_header);
+ ceph::bufferlist bl = encode_header();
+ ceph_assert(bl.length() <= get_block_size());
+ DEBUG(
+ "sync header of CircularJournalSpace, length {}",
+ bl.length());
+ assert(device);
+ auto iter = bl.begin();
+ assert(bl.length() < get_block_size());
+ bufferptr bp = bufferptr(ceph::buffer::create_page_aligned(get_block_size()));
+ iter.copy(bl.length(), bp.c_str());
+ return device->write(device->get_journal_start(), std::move(bp)
+ ).handle_error(
+ write_ertr::pass_further{},
+ crimson::ct_error::assert_all{ "Invalid error device->write" }
+ );
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include <optional>
+#include <seastar/core/circular_buffer.hh>
+#include <seastar/core/metrics.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "include/buffer.h"
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/journal.h"
+#include "crimson/os/seastore/random_block_manager.h"
+#include "crimson/os/seastore/random_block_manager/rbm_device.h"
+#include "crimson/os/seastore/journal/record_submitter.h"
+#include "crimson/os/seastore/async_cleaner.h"
+
+namespace crimson::os::seastore {
+ class SegmentProvider;
+ class JournalTrimmer;
+}
+
+namespace crimson::os::seastore::journal {
+
+class CircularBoundedJournal;
+class CircularJournalSpace : public JournalAllocator {
+
+ public:
+ const std::string& get_name() const final {
+ return print_name;
+ }
+
+ extent_len_t get_block_size() const final;
+
+ bool can_write() const final {
+ return (device != nullptr);
+ }
+
+ segment_nonce_t get_nonce() const final {
+ return 0;
+ }
+
+ bool needs_roll(std::size_t length) const final;
+
+ roll_ertr::future<> roll() final;
+
+ write_ret write(ceph::bufferlist&& to_write) final;
+
+ void update_modify_time(record_t& record) final {}
+
+ close_ertr::future<> close() final {
+ return write_header(
+ ).safe_then([this]() -> close_ertr::future<> {
+ initialized = false;
+ return close_ertr::now();
+ }).handle_error(
+ Journal::open_for_mount_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error write_header"
+ }
+ );
+ }
+
+ open_ret open(bool is_mkfs) final;
+
+ public:
+ CircularJournalSpace(RBMDevice * device);
+
+ struct cbj_header_t;
+ using write_ertr = Journal::submit_record_ertr;
+ /*
+ * device_write_bl
+ *
+ * @param device address to write
+ * @param bufferlist to write
+ *
+ */
+ write_ertr::future<> device_write_bl(rbm_abs_addr offset, ceph::bufferlist &bl);
+
+ using read_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error,
+ crimson::ct_error::invarg,
+ crimson::ct_error::enoent,
+ crimson::ct_error::erange>;
+ using read_header_ertr = read_ertr;
+ using read_header_ret = read_header_ertr::future<
+ std::optional<std::pair<cbj_header_t, bufferlist>>
+ >;
+ /*
+ * read_header
+ *
+ * read header block from given absolute address
+ *
+ * @param absolute address
+ *
+ */
+ read_header_ret read_header();
+
+ ceph::bufferlist encode_header();
+
+ write_ertr::future<> write_header();
+
+
+ /**
+ * CircularBoundedJournal structure
+ *
+ * +-------------------------------------------------------+
+ * | header | record | record | record | record | ... |
+ * +-------------------------------------------------------+
+ * ^-----------block aligned-----------------^
+ * <----fixed---->
+ */
+
+ struct cbj_header_t {
+ // start offset of CircularBoundedJournal in the device
+ journal_seq_t dirty_tail;
+ journal_seq_t alloc_tail;
+
+ DENC(cbj_header_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.dirty_tail, p);
+ denc(v.alloc_tail, p);
+ DENC_FINISH(p);
+ }
+ };
+
+ /**
+ *
+ * Write position for CircularBoundedJournal
+ *
+ * | written to rbm | written length to CircularBoundedJournal | new write |
+ * ----------------->------------------------------------------------>
+ * ^ ^
+ * applied_to written_to
+ *
+ */
+
+ journal_seq_t get_written_to() const {
+ return written_to;
+ }
+ rbm_abs_addr get_rbm_addr(journal_seq_t seq) const {
+ return convert_paddr_to_abs_addr(seq.offset);
+ }
+ void set_written_to(journal_seq_t seq) {
+ rbm_abs_addr addr = convert_paddr_to_abs_addr(seq.offset);
+ assert(addr >= get_records_start());
+ assert(addr < get_journal_end());
+ written_to = seq;
+ }
+ device_id_t get_device_id() const {
+ return device->get_device_id();
+ }
+
+ journal_seq_t get_dirty_tail() const {
+ return header.dirty_tail;
+ }
+ journal_seq_t get_alloc_tail() const {
+ return header.alloc_tail;
+ }
+
+ /*
+ Size-related interfaces
+ +---------------------------------------------------------+
+ | header | record | record | record | record | ... |
+ +---------------------------------------------------------+
+ ^ ^ ^
+ | | |
+ get_journal_start | get_journal_end
+ get_records_start
+ <-- get_records_total_size + block_size -->
+ <--------------- get_journal_size ------------------------>
+ */
+
+ size_t get_records_used_size() const {
+ auto rbm_written_to = get_rbm_addr(get_written_to());
+ auto rbm_tail = get_rbm_addr(get_dirty_tail());
+ return rbm_written_to >= rbm_tail ?
+ rbm_written_to - rbm_tail :
+ rbm_written_to + get_records_total_size() + get_block_size()
+ - rbm_tail;
+ }
+ size_t get_records_total_size() const {
+ assert(device);
+ // a block is for header and a block is reserved to denote the end
+ return device->get_journal_size() - (2 * get_block_size());
+ }
+ rbm_abs_addr get_records_start() const {
+ assert(device);
+ return device->get_journal_start() + get_block_size();
+ }
+ size_t get_records_available_size() const {
+ return get_records_total_size() - get_records_used_size();
+ }
+ bool is_available_size(uint64_t size) {
+ auto rbm_written_to = get_rbm_addr(get_written_to());
+ auto rbm_tail = get_rbm_addr(get_dirty_tail());
+ if (rbm_written_to > rbm_tail &&
+ (get_journal_end() - rbm_written_to) < size &&
+ size > (get_records_used_size() -
+ (get_journal_end() - rbm_written_to))) {
+ return false;
+ }
+ return get_records_available_size() >= size;
+ }
+ rbm_abs_addr get_journal_end() const {
+ assert(device);
+ return device->get_journal_start() + device->get_journal_size();
+ }
+
+ read_ertr::future<> read(
+ uint64_t offset,
+ bufferptr &bptr) {
+ assert(device);
+ return device->read(offset, bptr);
+ }
+
+ seastar::future<> update_journal_tail(
+ journal_seq_t dirty,
+ journal_seq_t alloc) {
+ header.dirty_tail = dirty;
+ header.alloc_tail = alloc;
+ return write_header(
+ ).handle_error(
+ crimson::ct_error::assert_all{
+ "encountered invalid error in update_journal_tail"
+ });
+ }
+
+ void set_initialized(bool init) {
+ initialized = init;
+ }
+
+ void set_cbj_header(cbj_header_t& head) {
+ header = head;
+ }
+
+ cbj_header_t get_cbj_header() {
+ return header;
+ }
+
+ private:
+ std::string print_name;
+ cbj_header_t header;
+ RBMDevice* device;
+ journal_seq_t written_to;
+ bool initialized = false;
+};
+
+std::ostream &operator<<(std::ostream &out, const CircularJournalSpace::cbj_header_t &header);
+
+}
+
+WRITE_CLASS_DENC_BOUNDED(crimson::os::seastore::journal::CircularJournalSpace::cbj_header_t)
+
+#if FMT_VERSION >= 90000
+template <> struct fmt::formatter<crimson::os::seastore::journal::CircularJournalSpace::cbj_header_t> : fmt::ostream_formatter {};
+#endif