Transaction& t,
ool_record_t& record)
{
+ LOG_PREFIX(SegmentedAllocator::Writer::_write);
auto record_size = record.get_encoded_record_length();
allocated_to += record_size.get_encoded_length();
segment_provider.update_segment_avail_bytes(
paddr_t::make_seg_paddr(
- current_segment->segment->get_segment_id(),
+ current_segment->get_segment_id(),
allocated_to));
bufferlist bl = record.encode(
- current_segment->segment->get_segment_id(),
+ current_segment->get_segment_id(),
0);
- seastar::promise<> pr;
- current_segment->inflight_writes.emplace_back(pr.get_future());
- LOG_PREFIX(SegmentedAllocator::Writer::_write);
DEBUGT(
"written {} extents, {} bytes to segment {} at {}",
t,
record.get_num_extents(),
bl.length(),
- current_segment->segment->get_segment_id(),
+ current_segment->get_segment_id(),
record.get_base());
// account transactional ool writes before write()
stats.num_records += 1;
return trans_intr::make_interruptible(
- current_segment->segment->write(record.get_base(), bl).safe_then(
- [this, FNAME, pr=std::move(pr), &t,
- it=(--current_segment->inflight_writes.end()),
- cs=current_segment]() mutable {
- if (cs->outdated) {
- DEBUGT("segment rolled", t);
- pr.set_value();
- } else{
- DEBUGT("segment not rolled", t);
- current_segment->inflight_writes.erase(it);
- }
- return seastar::now();
+ current_segment->write(record.get_base(), bl
+ ).safe_then([FNAME, &t, &record, cs=current_segment] {
+ DEBUGT("written {} {}",
+ t, cs->get_segment_id(), record.get_base());
})
).si_then([FNAME, &record, &t] {
for (auto& ool_extent : record.get_extents()) {
"end of segment, writing {} extents to segment {} at {}",
t,
num_extents,
- current_segment->segment->get_segment_id(),
+ current_segment->get_segment_id(),
allocated_to);
return (num_extents ?
_write(t, record) :
"writing {} extents to segment {} at {}",
t,
record.get_num_extents(),
- current_segment->segment->get_segment_id(),
+ current_segment->get_segment_id(),
allocated_to);
return _write(t, record);
}
});
};
- if (rolling_segment) {
- return segment_rotation_guard.wait([this] {
- return !rolling_segment;
- }, std::move(write_func));
+ return seastar::with_gate(write_guard,
+ [this, write_func=std::move(write_func)]() mutable
+ {
+ if (rolling_segment) {
+ return segment_rotation_guard.wait([this] {
+ return !rolling_segment;
+ }, std::move(write_func));
- } else if (!current_segment) {
- return trans_intr::make_interruptible(roll_segment(true)).si_then(
- [write_func=std::move(write_func)] {
- return write_func();
- });
- }
- return write_func();
+ } else if (!current_segment) {
+ return trans_intr::make_interruptible(roll_segment(true)).si_then(
+ [write_func=std::move(write_func)] {
+ return write_func();
+ });
+ }
+ return write_func();
+ });
}
bool SegmentedAllocator::Writer::_needs_roll(seastore_off_t length) const {
- return allocated_to + length > current_segment->segment->get_write_capacity();
+ return allocated_to + length > current_segment->get_write_capacity();
}
SegmentedAllocator::Writer::init_segment_ertr::future<>
rolling_segment = true;
}
assert(rolling_segment);
- if (current_segment) {
- (void) seastar::with_gate(writer_guard, [this] {
- auto fut = seastar::now();
- if (!current_segment->inflight_writes.empty()) {
- fut = seastar::when_all_succeed(
- current_segment->inflight_writes.begin(),
- current_segment->inflight_writes.end());
- }
- current_segment->outdated = true;
- return fut.then(
- [cs=std::move(current_segment), this, it=(--open_segments.end())] {
- return cs->segment->close().safe_then([this, cs, it] {
- LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
- assert((*it).get() == cs.get());
- segment_provider.close_segment(cs->segment->get_segment_id());
- open_segments.erase(it);
- DEBUG("closed segment: {}", cs->segment->get_segment_id());
+ return [this, FNAME] {
+ if (current_segment) {
+ auto seg_to_close = std::move(current_segment);
+ if (write_guard.is_closed()) {
+ DEBUG("write_guard is closed, should be stopping");
+ return seg_to_close->close(
+ ).safe_then([seg_to_close=std::move(seg_to_close)] {});
+ } else {
+ DEBUG("rolling OOL segment, close {} ...", seg_to_close->get_segment_id());
+ (void) seastar::with_gate(write_guard,
+ [this, seg_to_close=std::move(seg_to_close)]() mutable
+ {
+ return seg_to_close->close(
+ ).safe_then([this, seg_to_close=std::move(seg_to_close)] {
+ segment_provider.close_segment(seg_to_close->get_segment_id());
+ });
});
- });
- }).handle_exception_type([](seastar::gate_closed_exception e) {
- LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
- DEBUG(" writer_guard closed, should be stopping");
- return seastar::now();
- });
- }
-
- auto new_segment_id = segment_provider.get_segment(
- segment_manager.get_device_id(), OOL_SEG_SEQ);
- return segment_manager.open(new_segment_id
- ).safe_then([this](auto segref) {
- LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
+ return Segment::close_ertr::now();
+ }
+ } else {
+ DEBUG("rolling OOL segment, no current ...");
+ return Segment::close_ertr::now();
+ }
+ }().safe_then([this] {
+ auto new_segment_id = segment_provider.get_segment(
+ segment_manager.get_device_id(), OOL_SEG_SEQ);
+ return segment_manager.open(new_segment_id);
+ }).safe_then([this, FNAME](auto segref) {
DEBUG("opened new segment: {}", segref->get_segment_id());
- return init_segment(*segref).safe_then([segref=std::move(segref), this] {
- LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
- assert(!current_segment.get());
- current_segment.reset(new open_segment_wrapper_t());
- current_segment->segment = segref;
- open_segments.emplace_back(current_segment);
+ return init_segment(*segref
+ ).safe_then([segref=std::move(segref), this, FNAME] {
+ assert(!current_segment);
+ current_segment = segref;
rolling_segment = false;
segment_rotation_guard.broadcast();
DEBUG("inited new segment: {}", segref->get_segment_id());