random_block_manager/nvme_manager.cc
random_block_manager/nvmedevice.cc
journal/segmented_journal.cc
+ journal/segment_allocator.cc
journal.cc
../../../test/crimson/seastore/test_block.cc
${PROJECT_SOURCE_DIR}/src/os/Transaction.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "segment_allocator.h"
+
+#include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/segment_cleaner.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore::journal {
+
+static segment_nonce_t generate_nonce(
+ segment_seq_t seq,
+ const seastore_meta_t &meta)
+{
+ return ceph_crc32c(
+ seq,
+ reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
+ sizeof(meta.seastore_id.uuid));
+}
+
+SegmentAllocator::SegmentAllocator(
+ segment_type_t type,
+ SegmentProvider &sp,
+ SegmentManager &sm)
+ : type{type},
+ segment_provider{sp},
+ segment_manager{sm}
+{
+ ceph_assert(type != segment_type_t::NULL_SEG);
+ reset();
+}
+
+void SegmentAllocator::set_next_segment_seq(segment_seq_t seq)
+{
+ LOG_PREFIX(SegmentAllocator::set_next_segment_seq);
+ INFO("{} {} next_segment_seq={}",
+ type, get_device_id(), segment_seq_printer_t{seq});
+ assert(type == segment_seq_to_type(seq));
+ next_segment_seq = seq;
+}
+
+SegmentAllocator::open_ertr::future<journal_seq_t>
+SegmentAllocator::open()
+{
+ LOG_PREFIX(SegmentAllocator::open);
+ ceph_assert(!current_segment);
+ segment_seq_t new_segment_seq;
+ if (type == segment_type_t::JOURNAL) {
+ new_segment_seq = next_segment_seq++;
+ } else { // OOL
+ new_segment_seq = next_segment_seq;
+ }
+ assert(new_segment_seq == get_current_segment_seq());
+ ceph_assert(segment_seq_to_type(new_segment_seq) == type);
+ auto new_segment_id = segment_provider.get_segment(
+ get_device_id(), new_segment_seq);
+ return segment_manager.open(new_segment_id
+ ).handle_error(
+ open_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in SegmentAllocator::open open"
+ }
+ ).safe_then([this, FNAME, new_segment_seq](auto sref) {
+ // initialize new segment
+ journal_seq_t new_journal_tail;
+ if (type == segment_type_t::JOURNAL) {
+ new_journal_tail = segment_provider.get_journal_tail_target();
+ current_segment_nonce = generate_nonce(
+ new_segment_seq, segment_manager.get_meta());
+ } else { // OOL
+ new_journal_tail = NO_DELTAS;
+ assert(current_segment_nonce == 0);
+ }
+ segment_id_t segment_id = sref->get_segment_id();
+ auto header = segment_header_t{
+ new_segment_seq,
+ segment_id,
+ new_journal_tail,
+ current_segment_nonce};
+ INFO("{} {} writing header to new segment ... -- {}",
+ type, get_device_id(), header);
+
+ auto header_length = segment_manager.get_block_size();
+ bufferlist bl;
+ encode(header, bl);
+ bufferptr bp(ceph::buffer::create_page_aligned(header_length));
+ bp.zero();
+ auto iter = bl.cbegin();
+ iter.copy(bl.length(), bp.c_str());
+ bl.clear();
+ bl.append(bp);
+
+ ceph_assert(sref->get_write_ptr() == 0);
+ assert((unsigned)header_length == bl.length());
+ written_to = header_length;
+ auto new_journal_seq = journal_seq_t{
+ new_segment_seq,
+ paddr_t::make_seg_paddr(segment_id, written_to)};
+ return sref->write(0, bl
+ ).handle_error(
+ open_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in SegmentAllocator::open write"
+ }
+ ).safe_then([this,
+ FNAME,
+ new_journal_seq,
+ new_journal_tail,
+ sref=std::move(sref)]() mutable {
+ ceph_assert(!current_segment);
+ current_segment = std::move(sref);
+ if (type == segment_type_t::JOURNAL) {
+ segment_provider.update_journal_tail_committed(new_journal_tail);
+ }
+ DEBUG("{} {} rolled new segment id={}",
+ type, get_device_id(), current_segment->get_segment_id());
+ ceph_assert(new_journal_seq.segment_seq == get_current_segment_seq());
+ return new_journal_seq;
+ });
+ });
+}
+
+SegmentAllocator::roll_ertr::future<>
+SegmentAllocator::roll()
+{
+ ceph_assert(can_write());
+ return close_segment(true).safe_then([this] {
+ return open().discard_result();
+ });
+}
+
+SegmentAllocator::write_ret
+SegmentAllocator::write(ceph::bufferlist to_write)
+{
+ LOG_PREFIX(SegmentAllocator::write);
+ assert(can_write());
+ auto write_length = to_write.length();
+ auto write_start_offset = written_to;
+ auto write_start_seq = journal_seq_t{
+ get_current_segment_seq(),
+ paddr_t::make_seg_paddr(
+ current_segment->get_segment_id(), write_start_offset)
+ };
+ TRACE("{} {} {}~{}", type, get_device_id(), write_start_seq, write_length);
+ assert(write_length > 0);
+ assert((write_length % segment_manager.get_block_size()) == 0);
+ assert(!needs_roll(write_length));
+
+ auto write_result = write_result_t{
+ write_start_seq,
+ static_cast<seastore_off_t>(write_length)
+ };
+ written_to += write_length;
+ return current_segment->write(
+ write_start_offset, to_write
+ ).handle_error(
+ write_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in SegmentAllocator::write"
+ }
+ ).safe_then([write_result] {
+ return write_result;
+ });
+}
+
+SegmentAllocator::close_ertr::future<>
+SegmentAllocator::close()
+{
+ return [this] {
+ LOG_PREFIX(SegmentAllocator::close);
+ if (current_segment) {
+ return close_segment(false);
+ } else {
+ INFO("{} {} no current segment", type, get_device_id());
+ return close_segment_ertr::now();
+ }
+ }().finally([this] {
+ reset();
+ });
+}
+
+SegmentAllocator::close_segment_ertr::future<>
+SegmentAllocator::close_segment(bool is_rolling)
+{
+ LOG_PREFIX(SegmentAllocator::close_segment);
+ assert(can_write());
+ auto seg_to_close = std::move(current_segment);
+ auto close_segment_id = seg_to_close->get_segment_id();
+ INFO("{} {} close segment id={}, seq={}, written_to={}, nonce={}",
+ type, get_device_id(),
+ close_segment_id,
+ segment_seq_printer_t{get_current_segment_seq()},
+ written_to,
+ current_segment_nonce);
+ if (is_rolling) {
+ segment_provider.close_segment(close_segment_id);
+ }
+ return seg_to_close->close(
+ ).safe_then([seg_to_close=std::move(seg_to_close)] {
+ }).handle_error(
+ close_segment_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in SegmentAllocator::close_segment"
+ }
+ );
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "include/buffer.h"
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace crimson::os::seastore {
+ class SegmentProvider;
+}
+
+namespace crimson::os::seastore::journal {
+
+/**
+ * SegmentAllocator
+ *
+ * Maintain an available segment for writes.
+ */
+class SegmentAllocator {
+ using base_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+
+ public:
+ SegmentAllocator(segment_type_t type,
+ SegmentProvider &sp,
+ SegmentManager &sm);
+
+ device_id_t get_device_id() const {
+ return segment_manager.get_device_id();
+ }
+
+ seastore_off_t get_block_size() const {
+ return segment_manager.get_block_size();
+ }
+
+ extent_len_t get_max_write_length() const {
+ return segment_manager.get_segment_size() -
+ p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
+ size_t(segment_manager.get_block_size()));
+ }
+
+ device_segment_id_t get_num_segments() const {
+ return segment_manager.get_num_segments();
+ }
+
+ bool can_write() const {
+ return !!current_segment;
+ }
+
+ segment_nonce_t get_nonce() const {
+ assert(can_write());
+ return current_segment_nonce;
+ }
+
+ void set_next_segment_seq(segment_seq_t);
+
+ // returns true iff the current segment has insufficient space
+ bool needs_roll(std::size_t length) const {
+ assert(can_write());
+ auto write_capacity = current_segment->get_write_capacity();
+ return length + written_to > std::size_t(write_capacity);
+ }
+
+ // open for write
+ using open_ertr = base_ertr;
+ using open_ret = open_ertr::future<journal_seq_t>;
+ open_ret open();
+
+ // close the current segment and initialize next one
+ using roll_ertr = base_ertr;
+ roll_ertr::future<> roll();
+
+ // write the buffer, return the write result
+ //
+ // May be called concurrently, but writes may complete in any order.
+ // If rolling/opening, no write is allowed.
+ using write_ertr = base_ertr;
+ using write_ret = write_ertr::future<write_result_t>;
+ write_ret write(ceph::bufferlist to_write);
+
+ using close_ertr = base_ertr;
+ close_ertr::future<> close();
+
+ private:
+ void reset() {
+ current_segment.reset();
+ if (type == segment_type_t::JOURNAL) {
+ next_segment_seq = 0;
+ } else { // OOL
+ next_segment_seq = OOL_SEG_SEQ;
+ }
+ current_segment_nonce = 0;
+ written_to = 0;
+ }
+
+ // FIXME: remove the unnecessary is_rolling
+ using close_segment_ertr = base_ertr;
+ close_segment_ertr::future<> close_segment(bool is_rolling);
+
+ segment_seq_t get_current_segment_seq() const {
+ segment_seq_t ret;
+ if (type == segment_type_t::JOURNAL) {
+ assert(next_segment_seq != 0);
+ ret = next_segment_seq - 1;
+ } else { // OOL
+ ret = next_segment_seq;
+ }
+ assert(segment_seq_to_type(ret) == type);
+ return ret;
+ }
+
+ const segment_type_t type; // JOURNAL or OOL
+ SegmentProvider &segment_provider;
+ SegmentManager &segment_manager;
+ SegmentRef current_segment;
+ segment_seq_t next_segment_seq;
+ segment_nonce_t current_segment_nonce;
+ seastore_off_t written_to;
+};
+
+}
namespace crimson::os::seastore::journal {
-segment_nonce_t generate_nonce(
- segment_seq_t seq,
- const seastore_meta_t &meta)
-{
- return ceph_crc32c(
- seq,
- reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
- sizeof(meta.seastore_id.uuid));
-}
-
SegmentedJournal::SegmentedJournal(
SegmentManager &segment_manager,
ExtentReader &scanner,
SegmentProvider &segment_provider)
: segment_provider(segment_provider),
- journal_segment_manager(segment_manager, segment_provider),
+ journal_segment_allocator(segment_type_t::JOURNAL,
+ segment_provider,
+ segment_manager),
record_submitter(crimson::common::get_conf<uint64_t>(
"seastore_journal_iodepth_limit"),
crimson::common::get_conf<uint64_t>(
"seastore_journal_batch_flush_size"),
crimson::common::get_conf<double>(
"seastore_journal_batch_preferred_fullness"),
- journal_segment_manager),
+ journal_segment_allocator),
scanner(scanner)
{
register_metrics();
SegmentedJournal::open_for_write_ret SegmentedJournal::open_for_write()
{
LOG_PREFIX(Journal::open_for_write);
- INFO("device_id={}", journal_segment_manager.get_device_id());
- return journal_segment_manager.open();
+ INFO("device_id={}", journal_segment_allocator.get_device_id());
+ return journal_segment_allocator.open();
}
SegmentedJournal::close_ertr::future<> SegmentedJournal::close()
INFO("closing, committed_to={}",
record_submitter.get_committed_to());
metrics.clear();
- return journal_segment_manager.close();
+ return journal_segment_allocator.close();
}
SegmentedJournal::prep_replay_segments_fut
rt.second.journal_segment_seq;
});
- journal_segment_manager.set_segment_seq(
- segments.rbegin()->second.journal_segment_seq);
+ journal_segment_allocator.set_next_segment_seq(
+ segments.rbegin()->second.journal_segment_seq + 1);
std::for_each(
segments.begin(),
segments.end(),
[this, FNAME](auto &seg)
{
if (seg.first != seg.second.physical_segment_id ||
- seg.first.device_id() != journal_segment_manager.get_device_id() ||
+ seg.first.device_id() != journal_segment_allocator.get_device_id() ||
seg.second.get_type() != segment_type_t::JOURNAL) {
ERROR("illegal journal segment for replay -- {}", seg.second);
ceph_abort();
} else {
replay_from = paddr_t::make_seg_paddr(
from->first,
- journal_segment_manager.get_block_size());
+ journal_segment_allocator.get_block_size());
}
auto num_segments = segments.end() - from;
p.second.journal_segment_seq,
paddr_t::make_seg_paddr(
p.first,
- journal_segment_manager.get_block_size())
+ journal_segment_allocator.get_block_size())
};
return std::make_pair(ret, p.second);
});
return crimson::do_for_each(
boost::counting_iterator<device_segment_id_t>(0),
boost::counting_iterator<device_segment_id_t>(
- journal_segment_manager.get_num_segments()),
+ journal_segment_allocator.get_num_segments()),
[this, &ret](device_segment_id_t d_segment_id) {
segment_id_t segment_id{
- journal_segment_manager.get_device_id(),
+ journal_segment_allocator.get_device_id(),
d_segment_id};
return scanner.read_segment_header(
segment_id
);
}
-SegmentedJournal::JournalSegmentManager::JournalSegmentManager(
- SegmentManager& segment_manager,
- SegmentProvider& segment_provider)
- : segment_provider{segment_provider}, segment_manager{segment_manager}
-{
- reset();
-}
-
-SegmentedJournal::JournalSegmentManager::open_ret
-SegmentedJournal::JournalSegmentManager::open()
-{
- return roll().safe_then([this] {
- return get_current_write_seq();
- });
-}
-
-SegmentedJournal::JournalSegmentManager::close_ertr::future<>
-SegmentedJournal::JournalSegmentManager::close()
-{
- LOG_PREFIX(JournalSegmentManager::close);
- if (current_journal_segment) {
- INFO("segment_id={}, seq={}, written_to={}, nonce={}",
- current_journal_segment->get_segment_id(),
- get_segment_seq(),
- written_to,
- current_segment_nonce);
- } else {
- INFO("no current journal segment");
- }
-
- return (
- current_journal_segment ?
- current_journal_segment->close() :
- Segment::close_ertr::now()
- ).handle_error(
- close_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in JournalSegmentManager::close()"
- }
- ).finally([this] {
- reset();
- });
-}
-
-SegmentedJournal::JournalSegmentManager::roll_ertr::future<>
-SegmentedJournal::JournalSegmentManager::roll()
-{
- LOG_PREFIX(JournalSegmentManager::roll);
- auto old_segment_id = current_journal_segment ?
- current_journal_segment->get_segment_id() :
- NULL_SEG_ID;
- if (current_journal_segment) {
- INFO("closing segment {}, seq={}, written_to={}, nonce={}",
- old_segment_id,
- get_segment_seq(),
- written_to,
- current_segment_nonce);
- }
-
- return (
- current_journal_segment ?
- current_journal_segment->close() :
- Segment::close_ertr::now()
- ).safe_then([this] {
- auto new_segment_id = segment_provider->get_segment(
- get_device_id(), next_journal_segment_seq);
- return segment_manager.open(new_segment_id);
- }).safe_then([this](auto sref) {
- current_journal_segment = sref;
- return initialize_segment(*current_journal_segment);
- }).safe_then([this, old_segment_id] {
- if (old_segment_id != NULL_SEG_ID) {
- segment_provider.close_segment(old_segment_id);
- }
- }).handle_error(
- roll_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in JournalSegmentManager::roll"
- }
- );
-}
-
-SegmentedJournal::JournalSegmentManager::write_ret
-SegmentedJournal::JournalSegmentManager::write(ceph::bufferlist to_write)
-{
- LOG_PREFIX(JournalSegmentManager::write);
- auto write_length = to_write.length();
- auto write_start_seq = get_current_write_seq();
- TRACE("{}~{}", write_start_seq, write_length);
- assert(write_length > 0);
- assert((write_length % segment_manager.get_block_size()) == 0);
- assert(!needs_roll(write_length));
-
- auto write_start_offset = written_to;
- written_to += write_length;
- auto write_result = write_result_t{
- write_start_seq,
- static_cast<seastore_off_t>(write_length)
- };
- return current_journal_segment->write(
- write_start_offset, to_write
- ).handle_error(
- write_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in JournalSegmentManager::write"
- }
- ).safe_then([write_result] {
- return write_result;
- });
-}
-
-SegmentedJournal::JournalSegmentManager::initialize_segment_ertr::future<>
-SegmentedJournal::JournalSegmentManager::initialize_segment(Segment& segment)
-{
- LOG_PREFIX(JournalSegmentManager::initialize_segment);
- auto new_tail = segment_provider.get_journal_tail_target();
- // write out header
- ceph_assert(segment.get_write_ptr() == 0);
- bufferlist bl;
-
- segment_seq_t seq = next_journal_segment_seq++;
- current_segment_nonce = generate_nonce(
- seq, segment_manager.get_meta());
- auto header = segment_header_t{
- seq,
- segment.get_segment_id(),
- new_tail,
- current_segment_nonce};
- INFO("writing {} ...", header);
- ceph_assert(header.get_type() == segment_type_t::JOURNAL);
- encode(header, bl);
-
- bufferptr bp(
- ceph::buffer::create_page_aligned(
- segment_manager.get_block_size()));
- bp.zero();
- auto iter = bl.cbegin();
- iter.copy(bl.length(), bp.c_str());
- bl.clear();
- bl.append(bp);
-
- written_to = 0;
- return write(bl
- ).safe_then([this, new_tail](auto) {
- segment_provider.update_journal_tail_committed(new_tail);
- });
-}
-
SegmentedJournal::RecordBatch::add_pending_ret
SegmentedJournal::RecordBatch::add_pending(
record_t&& record,
std::size_t batch_capacity,
std::size_t batch_flush_size,
double preferred_fullness,
- JournalSegmentManager& jsm)
+ SegmentAllocator& jsa)
: io_depth_limit{io_depth},
preferred_fullness{preferred_fullness},
- journal_segment_manager{jsm},
+ journal_segment_allocator{jsa},
batches(new RecordBatch[io_depth + 1])
{
LOG_PREFIX(RecordSubmitter);
assert(write_pipeline);
auto expected_size = record_group_size_t(
record.size,
- journal_segment_manager.get_block_size()
+ journal_segment_allocator.get_block_size()
).get_encoded_length();
- auto max_record_length = journal_segment_manager.get_max_write_length();
+ auto max_record_length = journal_segment_allocator.get_max_write_length();
if (expected_size > max_record_length) {
ERROR("H{} {} exceeds max record size {}",
(void*)&handle, record, max_record_length);
increment_io();
auto num = p_batch->get_num_records();
auto [to_write, sizes] = p_batch->encode_batch(
- journal_committed_to, journal_segment_manager.get_nonce());
+ journal_committed_to, journal_segment_allocator.get_nonce());
DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
num, sizes, journal_committed_to, num_outstanding_io);
account_submission(num, sizes);
- std::ignore = journal_segment_manager.write(to_write
+ std::ignore = journal_segment_allocator.write(to_write
).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
TRACE("{} records, {}, write done with {}", num, sizes, write_result);
finish_submit_batch(p_batch, write_result);
increment_io();
auto [to_write, sizes] = p_current_batch->submit_pending_fast(
std::move(record),
- journal_segment_manager.get_block_size(),
+ journal_segment_allocator.get_block_size(),
journal_committed_to,
- journal_segment_manager.get_nonce());
+ journal_segment_allocator.get_nonce());
DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
(void*)&handle, sizes, journal_committed_to, num_outstanding_io);
account_submission(1, sizes);
- return journal_segment_manager.write(to_write
+ return journal_segment_allocator.write(to_write
).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
return record_locator_t{
write_result.start_seq.offset.add_offset(mdlength),
auto write_fut = p_current_batch->add_pending(
std::move(record),
handle,
- journal_segment_manager.get_block_size());
+ journal_segment_allocator.get_block_size());
if (do_flush) {
DEBUG("H{} added pending and flush", (void*)&handle);
flush_current_batch();
// can increment io depth
assert(!wait_submit_promise.has_value());
auto maybe_new_size = p_current_batch->can_batch(
- record, journal_segment_manager.get_block_size());
+ record, journal_segment_allocator.get_block_size());
if (!maybe_new_size.has_value() ||
(maybe_new_size->get_encoded_length() >
- journal_segment_manager.get_max_write_length())) {
+ journal_segment_allocator.get_max_write_length())) {
TRACE("H{} flush", (void*)&handle);
assert(p_current_batch->is_pending());
flush_current_batch();
return do_submit(std::move(record), handle);
- } else if (journal_segment_manager.needs_roll(
+ } else if (journal_segment_allocator.needs_roll(
maybe_new_size->get_encoded_length())) {
if (p_current_batch->is_pending()) {
TRACE("H{} flush and roll", (void*)&handle);
} else {
TRACE("H{} roll", (void*)&handle);
}
- return journal_segment_manager.roll(
+ return journal_segment_allocator.roll(
).safe_then([this, record=std::move(record), &handle]() mutable {
return do_submit(std::move(record), handle);
});
assert(state == state_t::FULL);
// cannot increment io depth
auto maybe_new_size = p_current_batch->can_batch(
- record, journal_segment_manager.get_block_size());
+ record, journal_segment_allocator.get_block_size());
if (!maybe_new_size.has_value() ||
(maybe_new_size->get_encoded_length() >
- journal_segment_manager.get_max_write_length()) ||
- journal_segment_manager.needs_roll(
+ journal_segment_allocator.get_max_write_length()) ||
+ journal_segment_allocator.needs_roll(
maybe_new_size->get_encoded_length())) {
if (!wait_submit_promise.has_value()) {
wait_submit_promise = seastar::promise<>();
#include "crimson/os/seastore/segment_cleaner.h"
#include "crimson/os/seastore/journal.h"
#include "crimson/os/seastore/extent_reader.h"
-#include "crimson/os/seastore/segment_manager.h"
#include "crimson/os/seastore/ordering_handle.h"
#include "crimson/os/seastore/seastore_types.h"
#include "crimson/osd/exceptions.h"
+#include "segment_allocator.h"
namespace crimson::os::seastore::journal {
}
private:
- class JournalSegmentManager {
- public:
- JournalSegmentManager(SegmentManager&, SegmentProvider&);
-
- using base_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
- extent_len_t get_max_write_length() const {
- return segment_manager.get_segment_size() -
- p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
- size_t(segment_manager.get_block_size()));
- }
-
- device_id_t get_device_id() const {
- return segment_manager.get_device_id();
- }
-
- device_segment_id_t get_num_segments() const {
- return segment_manager.get_num_segments();
- }
-
- seastore_off_t get_block_size() const {
- return segment_manager.get_block_size();
- }
-
- segment_nonce_t get_nonce() const {
- return current_segment_nonce;
- }
-
- segment_seq_t get_segment_seq() const {
- return next_journal_segment_seq - 1;
- }
-
- void set_segment_seq(segment_seq_t current_seq) {
- next_journal_segment_seq = (current_seq + 1);
- }
-
- using open_ertr = base_ertr;
- using open_ret = open_ertr::future<journal_seq_t>;
- open_ret open();
-
- using close_ertr = base_ertr;
- close_ertr::future<> close();
-
- // returns true iff the current segment has insufficient space
- bool needs_roll(std::size_t length) const {
- auto write_capacity = current_journal_segment->get_write_capacity();
- return length + written_to > std::size_t(write_capacity);
- }
-
- // close the current segment and initialize next one
- using roll_ertr = base_ertr;
- roll_ertr::future<> roll();
-
- // write the buffer, return the write result
- // May be called concurrently, writes may complete in any order.
- using write_ertr = base_ertr;
- using write_ret = write_ertr::future<write_result_t>;
- write_ret write(ceph::bufferlist to_write);
-
- private:
- journal_seq_t get_current_write_seq() const {
- assert(current_journal_segment);
- return journal_seq_t{
- get_segment_seq(),
- paddr_t::make_seg_paddr(current_journal_segment->get_segment_id(),
- written_to)
- };
- }
-
- void reset() {
- next_journal_segment_seq = 0;
- current_segment_nonce = 0;
- current_journal_segment.reset();
- written_to = 0;
- }
-
- // prepare segment for writes, writes out segment header
- using initialize_segment_ertr = base_ertr;
- initialize_segment_ertr::future<> initialize_segment(Segment&);
-
- SegmentProvider& segment_provider;
- SegmentManager& segment_manager;
-
- segment_seq_t next_journal_segment_seq;
- segment_nonce_t current_segment_nonce;
-
- SegmentRef current_journal_segment;
- seastore_off_t written_to;
- };
-
class RecordBatch {
enum class state_t {
EMPTY = 0,
//
// Set write_result_t::write_length to 0 if the record is not the first one
// in the batch.
- using add_pending_ertr = JournalSegmentManager::write_ertr;
+ using add_pending_ertr = SegmentAllocator::write_ertr;
using add_pending_ret = add_pending_ertr::future<record_locator_t>;
add_pending_ret add_pending(
record_t&&,
std::size_t batch_capacity,
std::size_t batch_flush_size,
double preferred_fullness,
- JournalSegmentManager&);
+ SegmentAllocator&);
grouped_io_stats get_record_batch_stats() const {
return stats.record_batch_stats;
void flush_current_batch();
- using submit_pending_ertr = JournalSegmentManager::write_ertr;
+ using submit_pending_ertr = SegmentAllocator::write_ertr;
using submit_pending_ret = submit_pending_ertr::future<
record_locator_t>;
submit_pending_ret submit_pending(
double preferred_fullness;
WritePipeline* write_pipeline = nullptr;
- JournalSegmentManager& journal_segment_manager;
+ SegmentAllocator& journal_segment_allocator;
// committed_to may be in a previous journal segment
journal_seq_t journal_committed_to = JOURNAL_SEQ_NULL;
};
SegmentProvider& segment_provider;
- JournalSegmentManager journal_segment_manager;
+ SegmentAllocator journal_segment_allocator;
RecordSubmitter record_submitter;
ExtentReader& scanner;
seastar::metrics::metric_group metrics;