sizeof(meta.seastore_id.uuid));
}
-Journal::Journal(SegmentManager &segment_manager, ExtentReader& scanner)
- : segment_manager(segment_manager), scanner(scanner) {}
-
-
-Journal::initialize_segment_ertr::future<segment_seq_t>
-Journal::initialize_segment(Segment &segment)
-{
- auto new_tail = segment_provider->get_journal_tail_target();
- // write out header
- ceph_assert(segment.get_write_ptr() == 0);
- bufferlist bl;
-
- segment_seq_t seq = next_journal_segment_seq++;
- current_segment_nonce = generate_nonce(
- seq, segment_manager.get_meta());
- auto header = segment_header_t{
- seq,
- segment.get_segment_id(),
- segment_provider->get_journal_tail_target(),
- current_segment_nonce,
- false};
- logger().debug(
- "initialize_segment {} journal_tail_target {}, header {}",
- segment.get_segment_id(),
- new_tail,
- header);
- encode(header, bl);
-
- bufferptr bp(
- ceph::buffer::create_page_aligned(
- segment_manager.get_block_size()));
- bp.zero();
- auto iter = bl.cbegin();
- iter.copy(bl.length(), bp.c_str());
- bl.clear();
- bl.append(bp);
-
- written_to = segment_manager.get_block_size();
- committed_to = 0;
- return segment.write(0, bl).safe_then(
- [=] {
- segment_provider->update_journal_tail_committed(new_tail);
- return seq;
- },
- initialize_segment_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in Journal::initialize_segment"
- });
-}
+Journal::Journal(
+ SegmentManager& segment_manager,
+ ExtentReader& scanner)
+ : journal_segment_manager(segment_manager),
+ scanner(scanner)
+{}
Journal::write_record_ret Journal::write_record(
record_size_t rsize,
OrderingHandle &handle)
{
ceph::bufferlist to_write = encode_record(
- rsize, std::move(record), segment_manager.get_block_size(),
- committed_to, current_segment_nonce);
- auto target = written_to;
- assert((to_write.length() % segment_manager.get_block_size()) == 0);
- written_to += to_write.length();
- logger().debug(
- "write_record, mdlength {}, dlength {}, target {}",
- rsize.mdlength,
- rsize.dlength,
- target);
-
- auto segment_id = current_journal_segment->get_segment_id();
-
+ rsize,
+ std::move(record),
+ journal_segment_manager.get_block_size(),
+ journal_segment_manager.get_committed_to(),
+ journal_segment_manager.get_nonce());
// Start write under the current exclusive stage, but wait for it
// in the device_submission concurrent stage to permit multiple
// overlapping writes.
- auto write_fut = current_journal_segment->write(target, to_write);
+ auto write_fut = journal_segment_manager.write(to_write);
return handle.enter(write_pipeline->device_submission
).then([write_fut = std::move(write_fut)]() mutable {
- return std::move(write_fut
- ).handle_error(
- write_record_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in Journal::write_record"
- }
- );
- }).safe_then([this, &handle] {
- return handle.enter(write_pipeline->finalize);
- }).safe_then([this, target, segment_id] {
- logger().debug(
- "write_record: commit target {}",
- target);
- if (segment_id == current_journal_segment->get_segment_id()) {
- assert(committed_to < target);
- committed_to = target;
- }
- return write_record_ret(
- write_record_ertr::ready_future_marker{},
- paddr_t{
- segment_id,
- target});
- });
-}
-
-bool Journal::needs_roll(segment_off_t length) const
-{
- return length + written_to >
- current_journal_segment->get_write_capacity();
-}
-
-Journal::roll_journal_segment_ertr::future<segment_seq_t>
-Journal::roll_journal_segment()
-{
- auto old_segment_id = current_journal_segment ?
- current_journal_segment->get_segment_id() :
- NULL_SEG_ID;
-
- return (current_journal_segment ?
- current_journal_segment->close() :
- Segment::close_ertr::now()).safe_then([this] {
- return segment_provider->get_segment(segment_manager.get_device_id());
- }).safe_then([this](auto segment) {
- return segment_manager.open(segment);
- }).safe_then([this](auto sref) {
- current_journal_segment = sref;
- written_to = 0;
- return initialize_segment(*current_journal_segment);
- }).safe_then([=](auto seq) {
- if (old_segment_id != NULL_SEG_ID) {
- segment_provider->close_segment(old_segment_id);
- }
- segment_provider->set_journal_segment(
- current_journal_segment->get_segment_id(),
- seq);
- return seq;
- }).handle_error(
- roll_journal_segment_ertr::pass_further{},
- crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })
- );
-}
-
-Journal::open_for_write_ret Journal::open_for_write()
-{
- return roll_journal_segment().safe_then([this](auto seq) {
- return open_for_write_ret(
- open_for_write_ertr::ready_future_marker{},
- journal_seq_t{
- seq,
- paddr_t{
- current_journal_segment->get_segment_id(),
- static_cast<segment_off_t>(segment_manager.get_block_size())}
- });
+ return std::move(write_fut);
+ }).safe_then([this, &handle, rsize](journal_seq_t write_start) {
+ return handle.enter(write_pipeline->finalize
+ ).then([this, write_start, rsize] {
+ auto committed_to = write_start;
+ committed_to.offset.offset += (rsize.mdlength + rsize.dlength);
+ journal_segment_manager.mark_committed(committed_to);
+ return write_start.offset;
+ });
});
}
rt.second.journal_segment_seq;
});
- next_journal_segment_seq =
- segments.rbegin()->second.journal_segment_seq + 1;
+ journal_segment_manager.set_segment_seq(
+ segments.rbegin()->second.journal_segment_seq);
std::for_each(
segments.begin(),
segments.end(),
} else {
replay_from = paddr_t{
from->first,
- (segment_off_t)segment_manager.get_block_size()};
+ (segment_off_t)journal_segment_manager.get_block_size()};
}
auto ret = replay_segments_t(segments.end() - from);
std::transform(
p.second.journal_segment_seq,
paddr_t{
p.first,
- (segment_off_t)segment_manager.get_block_size()}};
+ (segment_off_t)journal_segment_manager.get_block_size()}};
logger().debug(
"Journal::prep_replay_segments: replaying from {}",
ret);
});
}
+Journal::JournalSegmentManager::JournalSegmentManager(
+ SegmentManager& segment_manager)
+ : segment_manager{segment_manager}
+{
+ reset();
+}
+
+Journal::JournalSegmentManager::close_ertr::future<>
+Journal::JournalSegmentManager::close()
+{
+ return (
+ current_journal_segment ?
+ current_journal_segment->close() :
+ Segment::close_ertr::now()
+ ).handle_error(
+ close_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in JournalSegmentManager::close()"
+ }
+ ).finally([this] {
+ reset();
+ });
+}
+
+Journal::JournalSegmentManager::roll_ertr::future<>
+Journal::JournalSegmentManager::roll()
+{
+ auto old_segment_id = current_journal_segment ?
+ current_journal_segment->get_segment_id() :
+ NULL_SEG_ID;
+
+ return (
+ current_journal_segment ?
+ current_journal_segment->close() :
+ Segment::close_ertr::now()
+ ).safe_then([this] {
+ return segment_provider->get_segment(segment_manager.get_device_id());
+ }).safe_then([this](auto segment) {
+ return segment_manager.open(segment);
+ }).safe_then([this](auto sref) {
+ current_journal_segment = sref;
+ return initialize_segment(*current_journal_segment);
+ }).safe_then([this, old_segment_id] {
+ if (old_segment_id != NULL_SEG_ID) {
+ segment_provider->close_segment(old_segment_id);
+ }
+ segment_provider->set_journal_segment(
+ current_journal_segment->get_segment_id(),
+ get_segment_seq());
+ }).handle_error(
+ roll_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in JournalSegmentManager::roll"
+ }
+ );
+}
+
+Journal::JournalSegmentManager::write_ret
+Journal::JournalSegmentManager::write(ceph::bufferlist to_write)
+{
+ auto write_length = to_write.length();
+ auto write_start_seq = get_current_write_seq();
+ logger().debug(
+ "JournalSegmentManager::write: write_start {} => {}, length={}",
+ write_start_seq,
+ write_start_seq.offset.offset + write_length,
+ write_length);
+ assert((write_length % segment_manager.get_block_size()) == 0);
+ assert(!needs_roll(write_length));
+
+ auto write_start_offset = written_to;
+ written_to += write_length;
+ return current_journal_segment->write(
+ write_start_offset, to_write
+ ).handle_error(
+ write_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error in JournalSegmentManager::write"
+ }
+ ).safe_then([write_start_seq] {
+ return write_start_seq;
+ });
+}
+
+void Journal::JournalSegmentManager::mark_committed(
+ const journal_seq_t& new_committed_to)
+{
+ logger().debug(
+ "JournalSegmentManager::mark_committed: committed_to {} => {}",
+ committed_to, new_committed_to);
+ assert(new_committed_to.segment_seq <=
+ get_segment_seq());
+ if (new_committed_to.segment_seq ==
+ get_segment_seq()) {
+ assert(committed_to.offset.offset < new_committed_to.offset.offset);
+ committed_to = new_committed_to;
+ }
+}
+
+Journal::JournalSegmentManager::initialize_segment_ertr::future<>
+Journal::JournalSegmentManager::initialize_segment(Segment& segment)
+{
+ auto new_tail = segment_provider->get_journal_tail_target();
+ // write out header
+ ceph_assert(segment.get_write_ptr() == 0);
+ bufferlist bl;
+
+ segment_seq_t seq = next_journal_segment_seq++;
+ current_segment_nonce = generate_nonce(
+ seq, segment_manager.get_meta());
+ auto header = segment_header_t{
+ seq,
+ segment.get_segment_id(),
+ segment_provider->get_journal_tail_target(),
+ current_segment_nonce,
+ false};
+ logger().debug(
+ "JournalSegmentManager::initialize_segment: segment_id {} journal_tail_target {}, header {}",
+ segment.get_segment_id(),
+ new_tail,
+ header);
+ encode(header, bl);
+
+ bufferptr bp(
+ ceph::buffer::create_page_aligned(
+ segment_manager.get_block_size()));
+ bp.zero();
+ auto iter = bl.cbegin();
+ iter.copy(bl.length(), bp.c_str());
+ bl.clear();
+ bl.append(bp);
+
+ written_to = 0;
+ // FIXME: improve committed_to to point to another segment
+ committed_to = get_current_write_seq();
+ return write(bl
+ ).safe_then([this, new_tail, write_size=bl.length()
+ ](journal_seq_t write_start_seq) {
+ auto committed_to = write_start_seq;
+ committed_to.offset.offset += write_size;
+ mark_committed(committed_to);
+ segment_provider->update_journal_tail_committed(new_tail);
+ });
+}
+
}
#pragma once
#include <boost/intrusive_ptr.hpp>
+#include <optional>
#include <seastar/core/future.hh>
* Gets the current journal segment sequence.
*/
segment_seq_t get_segment_seq() const {
- return next_journal_segment_seq - 1;
+ return journal_segment_manager.get_segment_seq();
}
/**
*/
void set_segment_provider(SegmentProvider *provider) {
segment_provider = provider;
+ journal_segment_manager.set_segment_provider(provider);
}
/**
crimson::ct_error::input_output_error
>;
using open_for_write_ret = open_for_write_ertr::future<journal_seq_t>;
- open_for_write_ret open_for_write();
+ open_for_write_ret open_for_write() {
+ return journal_segment_manager.open();
+ }
/**
* close journal
using close_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
close_ertr::future<> close() {
- return (
- current_journal_segment ?
- current_journal_segment->close() :
- Segment::close_ertr::now()
- ).handle_error(
- close_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Error during Journal::close()"
- }
- ).finally([this] {
- current_journal_segment.reset();
- reset_soft_state();
- });
+ return journal_segment_manager.close();
}
/**
) {
assert(write_pipeline);
auto rsize = get_encoded_record_length(
- record, segment_manager.get_block_size());
+ record, journal_segment_manager.get_block_size());
auto total = rsize.mdlength + rsize.dlength;
- if (total > max_record_length()) {
+ auto max_record_length = journal_segment_manager.get_max_write_length();
+ if (total > max_record_length) {
auto &logger = crimson::get_logger(ceph_subsys_seastore);
logger.error(
"Journal::submit_record: record size {} exceeds max {}",
total,
- max_record_length()
+ max_record_length
);
return crimson::ct_error::erange::make();
}
- auto roll = needs_roll(total)
- ? roll_journal_segment().safe_then([](auto){})
- : roll_journal_segment_ertr::now();
+ auto roll = journal_segment_manager.needs_roll(total)
+ ? journal_segment_manager.roll()
+ : JournalSegmentManager::roll_ertr::now();
return roll.safe_then(
[this, rsize, record=std::move(record), &handle]() mutable {
- auto seq = next_journal_segment_seq - 1;
+ auto seq = journal_segment_manager.get_segment_seq();
return write_record(
rsize, std::move(record),
handle
}
private:
- SegmentProvider *segment_provider = nullptr;
- SegmentManager &segment_manager;
+ class JournalSegmentManager {
+ public:
+ JournalSegmentManager(SegmentManager&);
+
+ using base_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error>;
+ extent_len_t get_max_write_length() const {
+ return segment_manager.get_segment_size() -
+ p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
+ size_t(segment_manager.get_block_size()));
+ }
- segment_seq_t next_journal_segment_seq = 0;
- segment_nonce_t current_segment_nonce = 0;
+ segment_off_t get_block_size() const {
+ return segment_manager.get_block_size();
+ }
- SegmentRef current_journal_segment;
- segment_off_t written_to = 0;
- segment_off_t committed_to = 0;
+ segment_nonce_t get_nonce() const {
+ return current_segment_nonce;
+ }
- ExtentReader& scanner;
- WritePipeline *write_pipeline = nullptr;
+ segment_off_t get_committed_to() const {
+ assert(committed_to.segment_seq ==
+ get_segment_seq());
+ return committed_to.offset.offset;
+ }
- void reset_soft_state() {
- next_journal_segment_seq = 0;
- current_segment_nonce = 0;
- written_to = 0;
- committed_to = 0;
- }
+ segment_seq_t get_segment_seq() const {
+ return next_journal_segment_seq - 1;
+ }
- /// prepare segment for writes, writes out segment header
- using initialize_segment_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
- initialize_segment_ertr::future<segment_seq_t> initialize_segment(
- Segment &segment);
+ void set_segment_provider(SegmentProvider* provider) {
+ segment_provider = provider;
+ }
+
+ void set_segment_seq(segment_seq_t current_seq) {
+ next_journal_segment_seq = (current_seq + 1);
+ }
+
+ using open_ertr = base_ertr;
+ using open_ret = open_ertr::future<journal_seq_t>;
+ open_ret open() {
+ return roll().safe_then([this] {
+ return get_current_write_seq();
+ });
+ }
+
+ using close_ertr = base_ertr;
+ close_ertr::future<> close();
+ // returns true iff the current segment has insufficient space
+ bool needs_roll(std::size_t length) const {
+ auto write_capacity = current_journal_segment->get_write_capacity();
+ return length + written_to > std::size_t(write_capacity);
+ }
+
+ // close the current segment and initialize next one
+ using roll_ertr = base_ertr;
+ roll_ertr::future<> roll();
+
+ // write the buffer, return the write start
+ // May be called concurrently, writes may complete in any order.
+ using write_ertr = base_ertr;
+ using write_ret = write_ertr::future<journal_seq_t>;
+ write_ret write(ceph::bufferlist to_write);
+
+ // mark write committed in order
+ void mark_committed(const journal_seq_t& new_committed_to);
+
+ private:
+ journal_seq_t get_current_write_seq() const {
+ assert(current_journal_segment);
+ return journal_seq_t{
+ get_segment_seq(),
+ {current_journal_segment->get_segment_id(), written_to}
+ };
+ }
+
+ void reset() {
+ next_journal_segment_seq = 0;
+ current_segment_nonce = 0;
+ current_journal_segment.reset();
+ written_to = 0;
+ committed_to = {};
+ }
+
+ // prepare segment for writes, writes out segment header
+ using initialize_segment_ertr = base_ertr;
+ initialize_segment_ertr::future<> initialize_segment(Segment&);
+
+ SegmentProvider* segment_provider;
+ SegmentManager& segment_manager;
+
+ segment_seq_t next_journal_segment_seq;
+ segment_nonce_t current_segment_nonce;
+
+ SegmentRef current_journal_segment;
+ segment_off_t written_to;
+ // committed_to may be in a previous journal segment
+ journal_seq_t committed_to;
+ };
+
+ SegmentProvider* segment_provider = nullptr;
+ JournalSegmentManager journal_segment_manager;
+ ExtentReader& scanner;
+ WritePipeline *write_pipeline = nullptr;
/// do record write
using write_record_ertr = crimson::errorator<
record_t &&record,
OrderingHandle &handle);
- /// close current segment and initialize next one
- using roll_journal_segment_ertr = crimson::errorator<
- crimson::ct_error::input_output_error>;
- roll_journal_segment_ertr::future<segment_seq_t> roll_journal_segment();
-
- /// returns true iff current segment has insufficient space
- bool needs_roll(segment_off_t length) const;
-
/// return ordered vector of segments to replay
using replay_segments_t = std::vector<
std::pair<journal_seq_t, segment_header_t>>;
segment_header_t header, ///< [in] segment header
delta_handler_t &delta_handler ///< [in] processes deltas in order
);
-
- extent_len_t max_record_length() const;
};
using JournalRef = std::unique_ptr<Journal>;
}
-
-namespace crimson::os::seastore {
-
-inline extent_len_t Journal::max_record_length() const {
- return segment_manager.get_segment_size() -
- p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
- size_t(segment_manager.get_block_size()));
-}
-
-}