});
}
-std::pair<ceph::bufferlist, record_group_size_t>
-RecordBatch::encode_batch(
+ceph::bufferlist RecordBatch::encode_batch(
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
state = state_t::SUBMITTING;
submitting_size = pending.get_size();
- auto gsize = pending.size;
- submitting_length = gsize.get_encoded_length();
- submitting_mdlength = gsize.get_mdlength();
+ submitting_length = pending.size.get_encoded_length();
+ submitting_mdlength = pending.size.get_mdlength();
auto bl = encode_records(pending, committed_to, segment_nonce);
// Note: pending is cleared here
assert(bl.length() == submitting_length);
- return std::make_pair(bl, gsize);
+ return bl;
}
void RecordBatch::set_result(
io_promise.reset();
}
-std::pair<ceph::bufferlist, record_group_size_t>
+ceph::bufferlist
RecordBatch::submit_pending_fast(
- record_t&& record,
+ record_group_t&& group,
extent_len_t block_size,
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
+ assert(group.get_size() == 1);
+ auto& record = group.records[0];
auto new_size = get_encoded_length_after(record, block_size);
std::ignore = new_size;
assert(state == state_t::EMPTY);
assert(evaluate_submit(record.size, block_size).submit_size == new_size);
-
- auto group = record_group_t(std::move(record), block_size);
- auto size = group.size;
- assert(size == new_size);
+ assert(group.size == new_size);
auto bl = encode_records(group, committed_to, segment_nonce);
- assert(bl.length() == size.get_encoded_length());
- return std::make_pair(std::move(bl), size);
+ // Note: group is cleared here
+ assert(bl.length() == new_size.get_encoded_length());
+ return bl;
}
RecordSubmitter::RecordSubmitter(
state != state_t::FULL) {
// fast path with direct write
increment_io();
- auto [to_write, sizes] = p_current_batch->submit_pending_fast(
- std::move(record),
- journal_allocator.get_block_size(),
+ auto block_size = journal_allocator.get_block_size();
+ auto rg = record_group_t(std::move(record), block_size);
+ account_submission(rg);
+ record_group_size_t sizes = rg.size;
+ auto to_write = p_current_batch->submit_pending_fast(
+ std::move(rg),
+ block_size,
get_committed_to(),
journal_allocator.get_nonce());
DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
get_name(), sizes, get_committed_to(), num_outstanding_io);
- account_submission(1, sizes);
return journal_allocator.write(std::move(to_write)
).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
return record_locator_t{
}
void RecordSubmitter::account_submission(
- std::size_t num,
- const record_group_size_t& size)
+ const record_group_t& rg)
{
stats.record_group_padding_bytes +=
- (size.get_mdlength() - size.get_raw_mdlength());
- stats.record_group_metadata_bytes += size.get_raw_mdlength();
- stats.record_group_data_bytes += size.dlength;
- stats.record_batch_stats.increment(num);
+ (rg.size.get_mdlength() - rg.size.get_raw_mdlength());
+ stats.record_group_metadata_bytes += rg.size.get_raw_mdlength();
+ stats.record_group_data_bytes += rg.size.dlength;
+ stats.record_batch_stats.increment(rg.get_size());
}
void RecordSubmitter::finish_submit_batch(
increment_io();
auto num = p_batch->get_num_records();
- auto [to_write, sizes] = p_batch->encode_batch(
+ const auto& rg = p_batch->get_record_group();
+ assert(rg.get_size() == num);
+ record_group_size_t sizes = rg.size;
+ account_submission(rg);
+ auto to_write = p_batch->encode_batch(
get_committed_to(), journal_allocator.get_nonce());
+ // Note: rg is cleared
DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
get_name(), num, sizes, get_committed_to(), num_outstanding_io);
- account_submission(num, sizes);
std::ignore = journal_allocator.write(std::move(to_write)
- ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
+ ).safe_then([this, p_batch, FNAME, num, sizes](auto write_result) {
TRACE("{} {} records, {}, write done with {}",
get_name(), num, sizes, write_result);
finish_submit_batch(p_batch, write_result);
}).handle_error(
- crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+ crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes](auto e) {
ERROR("{} {} records, {}, got error {}",
get_name(), num, sizes, e);
finish_submit_batch(p_batch, std::nullopt);
return seastar::now();
})
- ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+ ).handle_exception([this, p_batch, FNAME, num, sizes](auto e) {
ERROR("{} {} records, {}, got exception {}",
get_name(), num, sizes, e);
finish_submit_batch(p_batch, std::nullopt);