SegmentedOolWriter::write_record(
Transaction& t,
record_t&& record,
- std::list<LogicalCachedExtentRef>&& extents)
+ std::list<LogicalCachedExtentRef>&& extents,
+ bool with_atomic_roll_segment)
{
LOG_PREFIX(SegmentedOolWriter::write_record);
assert(extents.size());
stats.md_bytes += record.size.get_raw_mdlength();
stats.num_records += 1;
- return record_submitter.submit(std::move(record)
+ return record_submitter.submit(
+ std::move(record),
+ with_atomic_roll_segment
).safe_then([this, FNAME, &t, extents=std::move(extents)
](record_locator_t ret) mutable {
DEBUGT("{} finish with {} and {} extents",
assert(record_submitter.check_action(record.size) !=
action_t::ROLL);
fut_write = write_record(
- t, std::move(record), std::move(pending_extents));
+ t, std::move(record), std::move(pending_extents),
+ true/* with_atomic_roll_segment */);
}
return trans_intr::make_interruptible(
record_submitter.roll_segment(
!has_io_error;
#ifndef NDEBUG
if (ret) {
- // invariants when available
+ // unconditional invariants
ceph_assert(segment_allocator.can_write());
ceph_assert(p_current_batch != nullptr);
ceph_assert(!p_current_batch->is_submitting());
+ // the current batch accepts a further write
ceph_assert(!p_current_batch->needs_flush());
if (!p_current_batch->is_empty()) {
auto submit_length =
p_current_batch->get_submit_size().get_encoded_length();
ceph_assert(!segment_allocator.needs_roll(submit_length));
}
+ // I'm not rolling
}
#endif
return ret;
RecordSubmitter::roll_segment()
{
LOG_PREFIX(RecordSubmitter::roll_segment);
- assert(is_available());
+ assert(p_current_batch->needs_flush() ||
+ is_available());
// #1 block concurrent submissions due to rolling
wait_available_promise = seastar::shared_promise<>();
assert(!wait_unfull_flush_promise.has_value());
}
RecordSubmitter::submit_ret
-RecordSubmitter::submit(record_t&& record)
+RecordSubmitter::submit(
+ record_t&& record,
+ bool with_atomic_roll_segment)
{
LOG_PREFIX(RecordSubmitter::submit);
assert(is_available());
get_name(),
p_current_batch->get_num_records(),
num_outstanding_io);
- wait_available_promise = seastar::shared_promise<>();
- assert(!wait_unfull_flush_promise.has_value());
- wait_unfull_flush_promise = seastar::promise<>();
- // flush and mark available in background
- std::ignore = wait_unfull_flush_promise->get_future(
- ).finally([FNAME, this] {
- DEBUG("{} flush done, available", get_name());
- wait_available_promise->set_value();
- wait_available_promise.reset();
- });
+ if (with_atomic_roll_segment) {
+ // wait_available_promise and wait_unfull_flush_promise
+ // need to be delegated to the follow-up atomic roll_segment();
+ assert(p_current_batch->is_pending());
+ } else {
+ wait_available_promise = seastar::shared_promise<>();
+ assert(!wait_unfull_flush_promise.has_value());
+ wait_unfull_flush_promise = seastar::promise<>();
+ // flush and mark available in background
+ std::ignore = wait_unfull_flush_promise->get_future(
+ ).finally([FNAME, this] {
+ DEBUG("{} flush done, available", get_name());
+ wait_available_promise->set_value();
+ wait_available_promise.reset();
+ });
+ }
} else {
DEBUG("{} added pending, flush", get_name());
flush_current_batch();