#include "crimson/os/seastore/extent_placement_manager.h"
-SET_SUBSYS(seastore_tm);
+#include "crimson/common/config_proxy.h"
+
+SET_SUBSYS(seastore_journal);
namespace crimson::os::seastore {
);
}
+SegmentedAllocator::Writer::Writer(
+ SegmentProvider& sp,
+ SegmentManager& sm)
+ : segment_allocator("OOL", segment_type_t::OOL, sp, sm),
+ record_submitter(crimson::common::get_conf<uint64_t>(
+ "seastore_journal_iodepth_limit"),
+ crimson::common::get_conf<uint64_t>(
+ "seastore_journal_batch_capacity"),
+ crimson::common::get_conf<Option::size_t>(
+ "seastore_journal_batch_flush_size"),
+ crimson::common::get_conf<double>(
+ "seastore_journal_batch_preferred_fullness"),
+ segment_allocator)
+{
+}
+
SegmentedAllocator::Writer::open_ertr::future<>
SegmentedAllocator::Writer::open()
{
return segment_allocator.open().discard_result();
}
-SegmentedAllocator::Writer::write_iertr::future<>
-SegmentedAllocator::Writer::_write(
+SegmentedAllocator::Writer::write_ertr::future<>
+SegmentedAllocator::Writer::write_record(
Transaction& t,
record_t&& record,
std::list<LogicalCachedExtentRef>&& extents)
{
- LOG_PREFIX(SegmentedAllocator::Writer::_write);
+ LOG_PREFIX(SegmentedAllocator::Writer::write_record);
assert(extents.size());
assert(extents.size() == record.extents.size());
assert(!record.deltas.size());
- auto record_group = record_group_t(
- std::move(record), segment_allocator.get_block_size());
- auto record_size = record_group.size;
- ceph::bufferlist bl = encode_records(
- record_group,
- JOURNAL_SEQ_NULL,
- segment_allocator.get_nonce()); // 0
- assert(bl.length() == record_size.get_encoded_length());
-
- DEBUGT("writing {} bytes to segment {}",
- t, bl.length(), segment_allocator.get_segment_id());
// account transactional ool writes before write()
+ // TODO: drop the incorrect size and fix the metrics
+ auto record_size = record_group_size_t(
+ record.size, segment_allocator.get_block_size());
auto& stats = t.get_ool_write_stats();
stats.extents.num += extents.size();
stats.extents.bytes += record_size.dlength;
stats.data_bytes += record_size.dlength;
stats.num_records += 1;
- return trans_intr::make_interruptible(
- segment_allocator.write(bl)
- ).si_then([FNAME, record_size, &t,
- extents=std::move(extents)](write_result_t wr) mutable {
- assert(wr.start_seq.segment_seq == OOL_SEG_SEQ);
- paddr_t extent_addr = wr.start_seq.offset;
- extent_addr = extent_addr.as_seg_paddr().add_offset(
- record_size.get_mdlength());
+ return record_submitter.submit(std::move(record)
+ ).safe_then([this, FNAME, &t, extents=std::move(extents)
+ ](record_locator_t ret) mutable {
+ assert(ret.write_result.start_seq.segment_seq == OOL_SEG_SEQ);
+ DEBUGT("{} finish with {} and {} extents",
+ t, segment_allocator.get_name(),
+ ret, extents.size());
+ paddr_t extent_addr = ret.record_block_base;
for (auto& extent : extents) {
- TRACET("ool extent written at {} -- {}", t, *extent, extent_addr);
+ TRACET("{} ool extent written at {} -- {}",
+ t, segment_allocator.get_name(),
+ extent_addr, *extent);
extent->hint = placement_hint_t::NUM_HINTS; // invalidate hint
t.mark_delayed_extent_ool(extent, extent_addr);
extent_addr = extent_addr.as_seg_paddr().add_offset(
extent->get_length());
}
- assert(extent_addr == wr.get_end_seq().offset);
});
}
{
LOG_PREFIX(SegmentedAllocator::Writer::do_write);
assert(!extents.empty());
- if (roll_promise.has_value()) {
+ if (!record_submitter.is_available()) {
+ DEBUGT("{} extents={} wait ...",
+ t, segment_allocator.get_name(),
+ extents.size());
return trans_intr::make_interruptible(
- roll_promise->get_shared_future()
- ).then_interruptible([this, &t, &extents] {
+ record_submitter.wait_available()
+ ).si_then([this, &t, &extents] {
return do_write(t, extents);
});
}
- assert(segment_allocator.can_write());
-
record_t record;
std::list<LogicalCachedExtentRef> pending_extents;
auto& extent = *it;
record_size_t wouldbe_rsize = record.size;
wouldbe_rsize.account_extent(extent->get_bptr().length());
- auto wouldbe_length = record_group_size_t(
- wouldbe_rsize, segment_allocator.get_block_size()
- ).get_encoded_length();
- if (segment_allocator.needs_roll(wouldbe_length)) {
- // reached the segment end, write and roll
- assert(!roll_promise.has_value());
- roll_promise = seastar::shared_promise<>();
+ using action_t = journal::RecordSubmitter::action_t;
+ action_t action = record_submitter.check_action(wouldbe_rsize);
+ if (action == action_t::ROLL) {
auto num_extents = pending_extents.size();
- DEBUGT("end of segment, writing {} extents", t, num_extents);
- return (num_extents ?
- _write(t, std::move(record), std::move(pending_extents)) :
- write_iertr::now()
- ).si_then([this] {
- return segment_allocator.roll();
- }).finally([this] {
- roll_promise->set_value();
- roll_promise.reset();
- }).si_then([this, &t, &extents] {
- if (!extents.empty()) {
- return do_write(t, extents);
- }
- return write_iertr::now();
+ DEBUGT("{} extents={} submit {} extents and roll, unavailable ...",
+ t, segment_allocator.get_name(),
+ extents.size(), num_extents);
+ auto fut_write = write_ertr::now();
+ if (num_extents > 0) {
+ assert(record_submitter.check_action(record.size) !=
+ action_t::ROLL);
+ fut_write = write_record(
+ t, std::move(record), std::move(pending_extents));
+ }
+ return trans_intr::make_interruptible(
+ record_submitter.roll_segment(
+ ).safe_then([fut_write=std::move(fut_write)]() mutable {
+ return std::move(fut_write);
+ })
+ ).si_then([this, &t, &extents] {
+ return do_write(t, extents);
});
}
- DEBUGT("add extent to record -- {}", t, *extent);
+ TRACET("{} extents={} add extent to record -- {}",
+ t, segment_allocator.get_name(),
+ extents.size(), *extent);
if (commit_type == record_commit_type_t::MODIFY) {
extent->set_last_modified(commit_time);
} else {
extent->get_last_modified().time_since_epoch().count()});
pending_extents.push_back(extent);
it = extents.erase(it);
+
+ assert(record_submitter.check_action(record.size) == action);
+ if (action == action_t::SUBMIT_FULL) {
+ DEBUGT("{} extents={} submit {} extents ...",
+ t, segment_allocator.get_name(),
+ extents.size(), pending_extents.size());
+ return trans_intr::make_interruptible(
+ write_record(t, std::move(record), std::move(pending_extents))
+ ).si_then([this, &t, &extents] {
+ if (!extents.empty()) {
+ return do_write(t, extents);
+ } else {
+ return write_iertr::now();
+ }
+ });
+ }
+ // SUBMIT_NOT_FULL: evaluate the next extent
}
- DEBUGT("writing {} extents", t, pending_extents.size());
- return _write(t, std::move(record), std::move(pending_extents));
+ auto num_extents = pending_extents.size();
+ DEBUGT("{} submit the rest {} extents ...",
+ t, segment_allocator.get_name(),
+ num_extents);
+ assert(num_extents > 0);
+ return trans_intr::make_interruptible(
+ write_record(t, std::move(record), std::move(pending_extents)));
}
SegmentedAllocator::Writer::write_iertr::future<>
#pragma once
#include "seastar/core/gate.hh"
-#include "seastar/core/shared_future.hh"
#include "crimson/os/seastore/cached_extent.h"
#include "crimson/os/seastore/journal/segment_allocator.h"
using open_ertr = base_ertr;
virtual open_ertr::future<> open() = 0;
- using write_iertr = trans_iertr<base_ertr>;
+ using write_ertr = base_ertr;
+ using write_iertr = trans_iertr<write_ertr>;
virtual write_iertr::future<> write(
Transaction& t,
std::list<LogicalCachedExtentRef>& extent) = 0;
class SegmentedAllocator : public ExtentAllocator {
class Writer : public ExtentOolWriter {
public:
- Writer(SegmentProvider& sp, SegmentManager& sm)
- : segment_allocator("OOL", segment_type_t::OOL, sp, sm) {}
-
+ Writer(SegmentProvider& sp, SegmentManager& sm);
Writer(Writer &&) = default;
open_ertr::future<> open() final;
Transaction& t,
std::list<LogicalCachedExtentRef>& extent);
- write_iertr::future<> _write(
+ write_ertr::future<> write_record(
Transaction& t,
record_t&& record,
std::list<LogicalCachedExtentRef>&& extents);
journal::SegmentAllocator segment_allocator;
- std::optional<seastar::shared_promise<>> roll_promise;
+ journal::RecordSubmitter record_submitter;
seastar::gate write_guard;
};
public:
Transaction& t,
std::list<LogicalCachedExtentRef>& extents) final {
LOG_PREFIX(SegmentedAllocator::alloc_ool_extents_paddr);
- SUBDEBUGT(seastore_tm, "start", t);
+ SUBDEBUGT(seastore_journal, "start", t);
return seastar::do_with(
std::map<Writer*, std::list<LogicalCachedExtentRef>>(),
[this, extents=std::move(extents), &t](auto& alloc_map) {
void add_allocator(device_type_t type, ExtentAllocatorRef&& allocator) {
allocators[type].emplace_back(std::move(allocator));
LOG_PREFIX(ExtentPlacementManager::add_allocator);
- SUBDEBUG(seastore_tm, "allocators for {}: {}",
+ SUBDEBUG(seastore_journal, "allocators for {}: {}",
type,
allocators[type].size());
}
using open_ertr = ExtentOolWriter::open_ertr;
open_ertr::future<> open() {
LOG_PREFIX(ExtentPlacementManager::open);
- SUBINFO(seastore_tm, "started");
+ SUBINFO(seastore_journal, "started");
return crimson::do_for_each(allocators, [](auto& allocators_item) {
return crimson::do_for_each(allocators_item.second, [](auto& allocator) {
return allocator->open();
Transaction& t,
const std::list<LogicalCachedExtentRef>& delayed_extents) {
LOG_PREFIX(ExtentPlacementManager::delayed_alloc_or_ool_write);
- SUBDEBUGT(seastore_tm, "start with {} delayed extents",
+ SUBDEBUGT(seastore_journal, "start with {} delayed extents",
t, delayed_extents.size());
return seastar::do_with(
std::map<ExtentAllocator*, std::list<LogicalCachedExtentRef>>(),
using close_ertr = ExtentOolWriter::stop_ertr;
close_ertr::future<> close() {
LOG_PREFIX(ExtentPlacementManager::close);
- SUBINFO(seastore_tm, "started");
+ SUBINFO(seastore_journal, "started");
return crimson::do_for_each(allocators, [](auto& allocators_item) {
return crimson::do_for_each(allocators_item.second, [](auto& allocator) {
return allocator->stop();