ool_record_t& record)
{
LOG_PREFIX(SegmentedAllocator::Writer::_write);
+ record.set_base(allocated_to);
auto record_size = record.get_encoded_record_length();
allocated_to += record_size.get_encoded_length();
segment_provider.update_segment_avail_bytes(
stats.data_bytes += record_size.dlength;
stats.num_records += 1;
+ for (auto& ool_extent : record.get_extents()) {
+ auto& lextent = ool_extent.get_lextent();
+ auto paddr = ool_extent.get_ool_paddr();
+ TRACET("ool extent written at {} -- {}", t, *lextent, paddr);
+ lextent->hint = placement_hint_t::NUM_HINTS; // invalidate hint
+ t.mark_delayed_extent_ool(lextent, paddr);
+ }
+
return trans_intr::make_interruptible(
current_segment->write(record.get_base(), bl
- ).safe_then([FNAME, &t, &record, cs=current_segment] {
- DEBUGT("written {} {}",
- t, cs->get_segment_id(), record.get_base());
+ ).safe_then([FNAME, &t, base=record.get_base(), cs=current_segment] {
+ DEBUGT("written {} {}", t, cs->get_segment_id(), base);
})
- ).si_then([FNAME, &record, &t] {
- for (auto& ool_extent : record.get_extents()) {
- auto& lextent = ool_extent.get_lextent();
- auto paddr = ool_extent.get_ool_paddr();
- TRACET("ool extent written at {} -- {}", t, *lextent, paddr);
- lextent->hint = placement_hint_t::NUM_HINTS; // invalidate hint
- t.mark_delayed_extent_ool(lextent, paddr);
- }
- record.clear();
- });
-}
-
-void SegmentedAllocator::Writer::add_extent_to_write(
- ool_record_t& record,
- LogicalCachedExtentRef& extent) {
- logger().debug(
- "SegmentedAllocator::Writer::add_extent_to_write: "
- "add extent {} to record",
- extent);
- extent->prepare_write();
- record.add_extent(extent);
+ );
}
SegmentedAllocator::Writer::write_iertr::future<>
-SegmentedAllocator::Writer::write(
+SegmentedAllocator::Writer::do_write(
Transaction& t,
std::list<LogicalCachedExtentRef>& extents)
{
- auto write_func = [this, &extents, &t] {
- return seastar::do_with(ool_record_t(segment_manager.get_block_size()),
- [this, &extents, &t](auto& record) {
- return trans_intr::repeat([this, &record, &t, &extents]()
- -> write_iertr::future<seastar::stop_iteration> {
- if (extents.empty()) {
- return seastar::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
+ LOG_PREFIX(SegmentedAllocator::Writer::do_write);
+ assert(!extents.empty());
+ if (roll_promise.has_value()) {
+ return roll_promise->get_shared_future(
+ ).then([this, &t, &extents] {
+ return do_write(t, extents);
+ });
+ }
+ assert(current_segment);
+
+ ool_record_t record(segment_manager.get_block_size());
+ for (auto it = extents.begin(); it != extents.end();) {
+ auto& extent = *it;
+ auto wouldbe_length = record.get_wouldbe_encoded_record_length(extent);
+ if (_needs_roll(wouldbe_length)) {
+ // reached the segment end, write and roll
+ assert(!roll_promise.has_value());
+ roll_promise = seastar::shared_promise<>();
+ auto num_extents = record.get_num_extents();
+ DEBUGT(
+ "end of segment, writing {} extents to segment {} at {}",
+ t,
+ num_extents,
+ current_segment->get_segment_id(),
+ allocated_to);
+ return (num_extents ?
+ _write(t, record) :
+ write_iertr::now()
+ ).si_then([this] {
+ return roll_segment();
+ }).finally([this] {
+ roll_promise->set_value();
+ roll_promise.reset();
+ }).si_then([this, &t, &extents] {
+ if (!extents.empty()) {
+ return do_write(t, extents);
}
-
- return segment_rotation_guard.wait(
- [this] {
- return !rolling_segment;
- },
- [this, &record, &extents, &t]() -> write_iertr::future<> {
- LOG_PREFIX(SegmentedAllocator::Writer::write);
- record.set_base(allocated_to);
- for (auto it = extents.begin();
- it != extents.end();) {
- auto& extent = *it;
- auto wouldbe_length =
- record.get_wouldbe_encoded_record_length(extent);
- if (_needs_roll(wouldbe_length)) {
- // reached the segment end, write and roll
- assert(!rolling_segment);
- rolling_segment = true;
- auto num_extents = record.get_num_extents();
- DEBUGT(
- "end of segment, writing {} extents to segment {} at {}",
- t,
- num_extents,
- current_segment->get_segment_id(),
- allocated_to);
- return (num_extents ?
- _write(t, record) :
- write_iertr::now()
- ).si_then([this]() mutable {
- return roll_segment(false);
- }).finally([this] {
- rolling_segment = false;
- segment_rotation_guard.broadcast();
- });
- }
- add_extent_to_write(record, extent);
- it = extents.erase(it);
- }
-
- DEBUGT(
- "writing {} extents to segment {} at {}",
- t,
- record.get_num_extents(),
- current_segment->get_segment_id(),
- allocated_to);
- return _write(t, record);
- }
- ).si_then([]()
- -> write_iertr::future<seastar::stop_iteration> {
- return seastar::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::no);
- });
+ return write_iertr::now();
});
- });
- };
+ }
+ DEBUGT("add extent to record -- {}", t, *extent);
+ extent->prepare_write();
+ record.add_extent(extent);
+ it = extents.erase(it);
+ }
- return seastar::with_gate(write_guard,
- [this, write_func=std::move(write_func)]() mutable
- {
- if (rolling_segment) {
- return segment_rotation_guard.wait([this] {
- return !rolling_segment;
- }, std::move(write_func));
+ DEBUGT(
+ "writing {} extents to segment {} at {}",
+ t,
+ record.get_num_extents(),
+ current_segment->get_segment_id(),
+ allocated_to);
+ return _write(t, record);
+}
- } else if (!current_segment) {
- return trans_intr::make_interruptible(roll_segment(true)).si_then(
- [write_func=std::move(write_func)] {
- return write_func();
+SegmentedAllocator::Writer::write_iertr::future<>
+SegmentedAllocator::Writer::write(
+ Transaction& t,
+ std::list<LogicalCachedExtentRef>& extents)
+{
+ if (extents.empty()) {
+ return write_iertr::now();
+ }
+ return seastar::with_gate(write_guard, [this, &t, &extents] {
+ if (!roll_promise.has_value() && !current_segment) {
+ roll_promise = seastar::shared_promise<>();
+ return trans_intr::make_interruptible(
+ roll_segment().finally([this] {
+ roll_promise->set_value();
+ roll_promise.reset();
+ })
+ ).si_then([this, &t, &extents] {
+ return do_write(t, extents);
});
}
- return write_func();
+ return do_write(t, extents);
});
}
}
SegmentedAllocator::Writer::roll_segment_ertr::future<>
-SegmentedAllocator::Writer::roll_segment(bool set_rolling) {
+SegmentedAllocator::Writer::roll_segment() {
LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
- DEBUG("set_rolling {}", set_rolling);
- if (set_rolling) {
- rolling_segment = true;
- }
- assert(rolling_segment);
+ DEBUG("start");
+ assert(roll_promise.has_value());
return [this, FNAME] {
if (current_segment) {
auto seg_to_close = std::move(current_segment);
).safe_then([segref=std::move(segref), this, FNAME] {
assert(!current_segment);
current_segment = segref;
- rolling_segment = false;
- segment_rotation_guard.broadcast();
DEBUG("inited new segment: {}", segref->get_segment_id());
});
}).handle_error(