std::size_t batch_capacity,
std::size_t batch_flush_size,
double preferred_fullness,
- SegmentAllocator& sa)
+ JournalAllocator& ja)
: io_depth_limit{io_depth},
preferred_fullness{preferred_fullness},
- segment_allocator{sa},
+ journal_allocator{ja},
batches(new RecordBatch[io_depth + 1])
{
LOG_PREFIX(RecordSubmitter);
#ifndef NDEBUG
if (ret) {
// unconditional invariants
- ceph_assert(segment_allocator.can_write());
+ ceph_assert(journal_allocator.can_write());
ceph_assert(p_current_batch != nullptr);
ceph_assert(!p_current_batch->is_submitting());
// the current batch accepts a further write
if (!p_current_batch->is_empty()) {
auto submit_length =
p_current_batch->get_submit_size().get_encoded_length();
- ceph_assert(!segment_allocator.needs_roll(submit_length));
+ ceph_assert(!journal_allocator.needs_roll(submit_length));
}
// I'm not rolling
}
{
assert(is_available());
auto eval = p_current_batch->evaluate_submit(
- rsize, segment_allocator.get_block_size());
- if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
+ rsize, journal_allocator.get_block_size());
+ if (journal_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
return action_t::ROLL;
} else if (eval.is_full) {
return action_t::SUBMIT_FULL;
return roll_segment_ertr::now();
} else {
// start rolling in background
- std::ignore = segment_allocator.roll(
+ std::ignore = journal_allocator.roll(
).safe_then([FNAME, this] {
// good
DEBUG("{} rolling done, available", get_name());
LOG_PREFIX(RecordSubmitter::submit);
ceph_assert(is_available());
assert(check_action(record.size) != action_t::ROLL);
- segment_allocator.get_provider().update_modify_time(
- segment_allocator.get_segment_id(),
- record.modify_time,
- record.extents.size());
+ journal_allocator.update_modify_time(record);
auto eval = p_current_batch->evaluate_submit(
- record.size, segment_allocator.get_block_size());
+ record.size, journal_allocator.get_block_size());
bool needs_flush = (
state == state_t::IDLE ||
eval.submit_size.get_fullness() > preferred_fullness ||
increment_io();
auto [to_write, sizes] = p_current_batch->submit_pending_fast(
std::move(record),
- segment_allocator.get_block_size(),
- committed_to,
- segment_allocator.get_nonce());
+ journal_allocator.get_block_size(),
+ get_committed_to(),
+ journal_allocator.get_nonce());
DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
- get_name(), sizes, committed_to, num_outstanding_io);
+ get_name(), sizes, get_committed_to(), num_outstanding_io);
account_submission(1, sizes);
- return segment_allocator.write(std::move(to_write)
+ return journal_allocator.write(std::move(to_write)
).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
return record_locator_t{
write_result.start_seq.offset.add_offset(mdlength),
auto write_fut = p_current_batch->add_pending(
get_name(),
std::move(record),
- segment_allocator.get_block_size());
+ journal_allocator.get_block_size());
if (needs_flush) {
if (state == state_t::FULL) {
// #2 block concurrent submissions due to lack of resource
RecordSubmitter::open_ret
RecordSubmitter::open(bool is_mkfs)
{
- return segment_allocator.open(is_mkfs
+ return journal_allocator.open(is_mkfs
).safe_then([this](journal_seq_t ret) {
LOG_PREFIX(RecordSubmitter::open);
DEBUG("{} register metrics", get_name());
RecordSubmitter::close_ertr::future<>
RecordSubmitter::close()
{
+ committed_to = JOURNAL_SEQ_NULL;
ceph_assert(state == state_t::IDLE);
ceph_assert(num_outstanding_io == 0);
- committed_to = JOURNAL_SEQ_NULL;
ceph_assert(p_current_batch != nullptr);
ceph_assert(p_current_batch->is_empty());
ceph_assert(!wait_available_promise.has_value());
has_io_error = false;
ceph_assert(!wait_unfull_flush_promise.has_value());
metrics.clear();
- return segment_allocator.close();
+ return journal_allocator.close();
}
void RecordSubmitter::update_state()
increment_io();
auto num = p_batch->get_num_records();
auto [to_write, sizes] = p_batch->encode_batch(
- committed_to, segment_allocator.get_nonce());
+ get_committed_to(), journal_allocator.get_nonce());
DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
- get_name(), num, sizes, committed_to, num_outstanding_io);
+ get_name(), num, sizes, get_committed_to(), num_outstanding_io);
account_submission(num, sizes);
- std::ignore = segment_allocator.write(std::move(to_write)
+ std::ignore = journal_allocator.write(std::move(to_write)
).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
TRACE("{} {} records, {}, write done with {}",
get_name(), num, sizes, write_result);
#include "crimson/common/errorator.h"
#include "crimson/os/seastore/segment_manager_group.h"
#include "crimson/os/seastore/segment_seq_allocator.h"
-#include "crimson/os/seastore/journal/segment_allocator.h"
namespace crimson::os::seastore {
class SegmentProvider;
namespace crimson::os::seastore::journal {
+class JournalAllocator {
+public:
+ using base_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+ virtual const std::string& get_name() const = 0;
+
+ virtual void update_modify_time(record_t& record) = 0;
+
+ virtual extent_len_t get_block_size() const = 0;
+
+ using close_ertr = base_ertr;
+ virtual close_ertr::future<> close() = 0;
+
+ virtual segment_nonce_t get_nonce() const = 0;
+
+ using write_ertr = base_ertr;
+ using write_ret = write_ertr::future<write_result_t>;
+ virtual write_ret write(ceph::bufferlist&& to_write) = 0;
+
+ virtual bool can_write() const = 0;
+
+ using roll_ertr = base_ertr;
+ virtual roll_ertr::future<> roll() = 0;
+
+ virtual bool needs_roll(std::size_t length) const = 0;
+
+ using open_ertr = base_ertr;
+ using open_ret = open_ertr::future<journal_seq_t>;
+ virtual open_ret open(bool is_mkfs) = 0;
+
+};
+
/**
* RecordBatch
*
//
// Set write_result_t::write_length to 0 if the record is not the first one
// in the batch.
- using add_pending_ertr = SegmentAllocator::write_ertr;
+ using add_pending_ertr = JournalAllocator::write_ertr;
using add_pending_ret = add_pending_ertr::future<record_locator_t>;
add_pending_ret add_pending(
const std::string& name,
std::size_t batch_capacity,
std::size_t batch_flush_size,
double preferred_fullness,
- SegmentAllocator&);
+ JournalAllocator&);
const std::string& get_name() const {
- return segment_allocator.get_name();
+ return journal_allocator.get_name();
}
journal_seq_t get_committed_to() const {
std::size_t io_depth_limit;
double preferred_fullness;
- SegmentAllocator& segment_allocator;
+ JournalAllocator& journal_allocator;
// committed_to may be in a previous journal segment
journal_seq_t committed_to = JOURNAL_SEQ_NULL;
#include "crimson/common/errorator.h"
#include "crimson/os/seastore/segment_manager_group.h"
#include "crimson/os/seastore/segment_seq_allocator.h"
+#include "crimson/os/seastore/journal/record_submitter.h"
+#include "crimson/os/seastore/async_cleaner.h"
namespace crimson::os::seastore {
class SegmentProvider;
*
* Maintain an available segment for writes.
*/
-class SegmentAllocator {
- using base_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
+class SegmentAllocator : public JournalAllocator {
public:
+ // SegmentAllocator specific methods
SegmentAllocator(JournalTrimmer *trimmer,
data_category_t category,
rewrite_gen_t gen,
SegmentProvider &sp,
SegmentSeqAllocator &ssa);
- const std::string& get_name() const {
- return print_name;
- }
-
- SegmentProvider &get_provider() {
- return segment_provider;
- }
-
- extent_len_t get_block_size() const {
- return sm_group.get_block_size();
+ segment_id_t get_segment_id() const {
+ assert(can_write());
+ return current_segment->get_segment_id();
}
extent_len_t get_max_write_length() const {
sm_group.get_rounded_tail_length();
}
- bool can_write() const {
- return !!current_segment;
+ public:
+ // overriding methods
+ const std::string& get_name() const final {
+ return print_name;
}
- segment_id_t get_segment_id() const {
- assert(can_write());
- return current_segment->get_segment_id();
+ extent_len_t get_block_size() const final {
+ return sm_group.get_block_size();
}
- segment_nonce_t get_nonce() const {
- assert(can_write());
- return current_segment_nonce;
+ bool can_write() const final {
+ return !!current_segment;
}
- segment_off_t get_written_to() const {
+ segment_nonce_t get_nonce() const final {
assert(can_write());
- return written_to;
+ return current_segment_nonce;
}
// returns true iff the current segment has insufficient space
- bool needs_roll(std::size_t length) const {
+ bool needs_roll(std::size_t length) const final {
assert(can_write());
assert(current_segment->get_write_capacity() ==
sm_group.get_segment_size());
}
// open for write and generate the correct print name
- using open_ertr = base_ertr;
- using open_ret = open_ertr::future<journal_seq_t>;
- open_ret open(bool is_mkfs);
+ open_ret open(bool is_mkfs) final;
// close the current segment and initialize next one
- using roll_ertr = base_ertr;
- roll_ertr::future<> roll();
+ roll_ertr::future<> roll() final;
// write the buffer, return the write result
//
// May be called concurrently, but writes may complete in any order.
// If rolling/opening, no write is allowed.
- using write_ertr = base_ertr;
- using write_ret = write_ertr::future<write_result_t>;
- write_ret write(ceph::bufferlist&& to_write);
+ write_ret write(ceph::bufferlist&& to_write) final;
using close_ertr = base_ertr;
- close_ertr::future<> close();
+ close_ertr::future<> close() final;
+
+ void update_modify_time(record_t& record) final {
+ segment_provider.update_modify_time(
+ get_segment_id(),
+ record.modify_time,
+ record.extents.size());
+ }
private:
open_ret do_open(bool is_mkfs);