record_t&& record,
extent_len_t block_size)
{
- auto new_encoded_length = get_encoded_length_after(record, block_size);
+ auto new_size = get_encoded_length_after(record, block_size);
logger().debug(
"Journal::RecordBatch::add_pending: batches={}, write_size={}",
pending.get_size() + 1,
- new_encoded_length);
+ new_size.get_encoded_length());
assert(state != state_t::SUBMITTING);
- assert(can_batch(record, block_size) == new_encoded_length);
+ assert(can_batch(record, block_size).value() == new_size);
auto dlength_offset = pending.current_dlength;
pending.push_back(
std::move(record), block_size);
- assert(pending.size.get_encoded_length() == new_encoded_length);
+ assert(pending.size == new_size);
if (state == state_t::EMPTY) {
assert(!io_promise.has_value());
io_promise = seastar::shared_promise<maybe_promise_result_t>();
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
- auto encoded_length = get_encoded_length_after(record, block_size);
+ auto new_size = get_encoded_length_after(record, block_size);
logger().debug(
"Journal::RecordBatch::submit_pending_fast: write_size={}",
- encoded_length);
+ new_size.get_encoded_length());
assert(state == state_t::EMPTY);
- assert(can_batch(record, block_size) == encoded_length);
+ assert(can_batch(record, block_size).value() == new_size);
auto group = record_group_t(std::move(record), block_size);
auto size = group.size;
+ assert(size == new_size);
auto bl = encode_records(group, committed_to, segment_nonce);
- assert(bl.length() == encoded_length);
assert(bl.length() == size.get_encoded_length());
return std::make_pair(bl, size);
}
if (state <= state_t::PENDING) {
// can increment io depth
assert(!wait_submit_promise.has_value());
- auto batched_size = p_current_batch->can_batch(
+ auto maybe_new_size = p_current_batch->can_batch(
record, journal_segment_manager.get_block_size());
- if (batched_size == 0 ||
- batched_size > journal_segment_manager.get_max_write_length()) {
+ if (!maybe_new_size.has_value() ||
+ (maybe_new_size->get_encoded_length() >
+ journal_segment_manager.get_max_write_length())) {
assert(p_current_batch->is_pending());
flush_current_batch();
return do_submit(std::move(record), handle);
- } else if (journal_segment_manager.needs_roll(batched_size)) {
+ } else if (journal_segment_manager.needs_roll(
+ maybe_new_size->get_encoded_length())) {
if (p_current_batch->is_pending()) {
flush_current_batch();
}
assert(state == state_t::FULL);
// cannot increment io depth
- auto batched_size = p_current_batch->can_batch(
+ auto maybe_new_size = p_current_batch->can_batch(
record, journal_segment_manager.get_block_size());
- if (batched_size == 0 ||
- batched_size > journal_segment_manager.get_max_write_length() ||
- journal_segment_manager.needs_roll(batched_size)) {
+ if (!maybe_new_size.has_value() ||
+ (maybe_new_size->get_encoded_length() >
+ journal_segment_manager.get_max_write_length()) ||
+ journal_segment_manager.needs_roll(
+ maybe_new_size->get_encoded_length())) {
if (!wait_submit_promise.has_value()) {
wait_submit_promise = seastar::promise<>();
}
return pending.get_size();
}
- // return the expected write size if allows to batch,
- // otherwise, return 0
- std::size_t can_batch(
+ // return the expected write sizes if allows to batch,
+ // otherwise, return nullopt
+ std::optional<record_group_size_t> can_batch(
const record_t& record,
extent_len_t block_size) const {
assert(state != state_t::SUBMITTING);
(pending.get_size() > 0 &&
pending.size.get_encoded_length() > batch_flush_size)) {
assert(state == state_t::PENDING);
- return 0;
+ return std::nullopt;
}
return get_encoded_length_after(record, block_size);
}
segment_nonce_t segment_nonce);
private:
- std::size_t get_encoded_length_after(
+ record_group_size_t get_encoded_length_after(
const record_t& record,
extent_len_t block_size) const {
return pending.size.get_encoded_length_after(
void account(const delta_info_t& delta);
};
+WRITE_EQ_OPERATORS_2(record_size_t, plain_mdlength, dlength);
struct record_t {
std::vector<extent_t> extents;
return get_mdlength() + dlength;
}
- extent_len_t get_encoded_length_after(
+ record_group_size_t get_encoded_length_after(
const record_size_t& rsize,
extent_len_t block_size) const {
record_group_size_t tmp = *this;
tmp.account(rsize, block_size);
- return tmp.get_encoded_length();
+ return tmp;
}
void account(const record_size_t& rsize,
extent_len_t block_size);
};
+WRITE_EQ_OPERATORS_3(record_group_size_t, plain_mdlength, dlength, block_size);
struct record_group_t {
std::vector<record_t> records;