RecordBatch::add_pending(
const std::string& name,
record_t&& record,
- extent_len_t block_size)
+ extent_len_t block_size,
+ std::optional<journal_seq_t> maybe_write_base)
{
LOG_PREFIX(RecordBatch::add_pending);
auto new_size = get_encoded_length_after(record, block_size);
if (state == state_t::EMPTY) {
assert(!io_promise.has_value());
io_promise = seastar::shared_promise<maybe_promise_result_t>();
+ assert(maybe_write_base.has_value());
+ assert(!write_base.has_value());
+ write_base = maybe_write_base;
} else {
assert(io_promise.has_value());
}
state = state_t::PENDING;
+ assert(write_base.has_value());
return io_promise->get_shared_future(
).then([dlength_offset, FNAME, &name
});
}
-ceph::bufferlist RecordBatch::encode_batch(
+RecordBatch::encode_ret_t RecordBatch::encode_batch(
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
assert(state == state_t::PENDING);
assert(pending.get_size() > 0);
assert(io_promise.has_value());
+ assert(write_base.has_value());
state = state_t::SUBMITTING;
+ auto _write_base = *write_base;
+ write_base.reset();
submitting_size = pending.get_size();
submitting_length = pending.size.get_encoded_length();
submitting_mdlength = pending.size.get_mdlength();
auto bl = encode_records(pending, committed_to, segment_nonce);
// Note: pending is cleared here
assert(bl.length() == submitting_length);
- return bl;
+ return {_write_base, std::move(bl)};
}
void RecordBatch::set_result(
}
assert(state == state_t::SUBMITTING);
assert(io_promise.has_value());
+ assert(!write_base.has_value());
state = state_t::EMPTY;
submitting_size = 0;
});
}
// indirect batched write
+ std::optional<journal_seq_t> maybe_write_base;
+ if (p_current_batch->is_empty()) {
+ maybe_write_base = journal_allocator.get_written_to();
+ } else {
+ assert(p_current_batch->get_write_base().has_value());
+ assert(*p_current_batch->get_write_base() ==
+ journal_allocator.get_written_to());
+ }
auto write_fut = p_current_batch->add_pending(
get_name(),
std::move(record),
- journal_allocator.get_block_size());
+ journal_allocator.get_block_size(),
+ maybe_write_base);
if (needs_flush) {
if (state == state_t::FULL) {
// #2 block concurrent submissions due to lack of resource
account_submission(rg);
assert(stats.record_batch_stats.num_io ==
stats.io_depth_stats.num_io);
- auto to_write = p_batch->encode_batch(
+ auto encode_ret = p_batch->encode_batch(
get_committed_to(), journal_allocator.get_nonce());
// Note: rg is cleared
DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
get_name(), num, sizes, get_committed_to(), num_outstanding_io);
+ assert(encode_ret.write_base == journal_allocator.get_written_to());
write_result_t result{
- journal_allocator.get_written_to(),
- to_write.length()};
- std::ignore = journal_allocator.write(std::move(to_write)
+ encode_ret.write_base,
+ encode_ret.bl.length()};
+ std::ignore = journal_allocator.write(std::move(encode_ret.bl)
).safe_then([this, p_batch, FNAME, num, sizes, result] {
TRACE("{} {} records, {}, write done with {}",
get_name(), num, sizes, result);
return pending.size;
}
+ std::optional<journal_seq_t> get_write_base() const {
+ return write_base;
+ }
+
bool needs_flush() const {
assert(state != state_t::SUBMITTING);
assert(pending.get_size() <= batch_capacity);
//
// Set write_result_t::write_length to 0 if the record is not the first one
// in the batch.
+ //
+ // write_base must be assigned when the state is empty
using add_pending_ertr = JournalAllocator::write_ertr;
using add_pending_ret = add_pending_ertr::future<record_locator_t>;
add_pending_ret add_pending(
const std::string& name,
record_t&&,
- extent_len_t block_size);
+ extent_len_t block_size,
+ std::optional<journal_seq_t> maybe_write_base);
// Encode the batched records for write.
- ceph::bufferlist encode_batch(
+ struct encode_ret_t {
+ journal_seq_t write_base;
+ ceph::bufferlist bl;
+ };
+ encode_ret_t encode_batch(
const journal_seq_t& committed_to,
segment_nonce_t segment_nonce);
std::size_t index = 0;
std::size_t batch_capacity = 0;
std::size_t batch_flush_size = 0;
+ // Valid at state_t::PENDING
+ std::optional<journal_seq_t> write_base;
record_group_t pending;
std::size_t submitting_size = 0;