segment_manager(segment_manager) {}
-Journal::initialize_segment_ertr::future<> Journal::initialize_segment(
- Segment &segment)
+Journal::initialize_segment_ertr::future<segment_seq_t>
+Journal::initialize_segment(Segment &segment)
{
auto new_tail = segment_provider->get_journal_tail_target();
logger().debug(
// write out header
ceph_assert(segment.get_write_ptr() == 0);
bufferlist bl;
+ segment_seq_t seq = current_journal_segment_seq++;
auto header = segment_header_t{
- current_journal_segment_seq++,
+ seq,
segment.get_segment_id(),
segment_provider->get_journal_tail_target()};
::encode(header, bl);
return segment.write(0, bl).safe_then(
[=] {
segment_provider->update_journal_tail_committed(new_tail);
+ return seq;
},
- init_ertr::pass_further{},
+ initialize_segment_ertr::pass_further{},
crimson::ct_error::assert_all{ "TODO" });
}
current_journal_segment->get_write_capacity();
}
-Journal::roll_journal_segment_ertr::future<>
+Journal::roll_journal_segment_ertr::future<segment_seq_t>
Journal::roll_journal_segment()
{
auto old_segment_id = current_journal_segment ?
);
}
-Journal::init_ertr::future<> Journal::open_for_write()
+Journal::open_for_write_ret Journal::open_for_write()
{
- return roll_journal_segment();
+ return roll_journal_segment().safe_then([this](auto seq) {
+ return open_for_write_ret(
+ open_for_write_ertr::ready_future_marker{},
+ journal_seq_t{
+ seq,
+ paddr_t{
+ current_journal_segment->get_segment_id(),
+ static_cast<segment_off_t>(block_size)}
+ });
+ });
}
Journal::find_replay_segments_fut Journal::find_replay_segments()
* to submit_record. Should be called after replay if not a new
* Journal.
*/
- using init_ertr = crimson::errorator<
+ using open_for_write_ertr = crimson::errorator<
crimson::ct_error::input_output_error
>;
- init_ertr::future<> open_for_write();
+ using open_for_write_ret = open_for_write_ertr::future<journal_seq_t>;
+ open_for_write_ret open_for_write();
/**
* close journal
return crimson::ct_error::erange::make();
}
auto roll = needs_roll(total)
- ? roll_journal_segment()
+ ? roll_journal_segment().safe_then([](auto){})
: roll_journal_segment_ertr::now();
return roll.safe_then(
[this, rsize, record=std::move(record)]() mutable {
/// prepare segment for writes, writes out segment header
using initialize_segment_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
- initialize_segment_ertr::future<> initialize_segment(
+ initialize_segment_ertr::future<segment_seq_t> initialize_segment(
Segment &segment);
struct record_size_t {
/// close current segment and initialize next one
using roll_journal_segment_ertr = crimson::errorator<
crimson::ct_error::input_output_error>;
- roll_journal_segment_ertr::future<> roll_journal_segment();
+ roll_journal_segment_ertr::future<segment_seq_t> roll_journal_segment();
/// returns true iff current segment has insufficient space
bool needs_roll(segment_off_t length) const;
TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
{
- return journal.open_for_write().safe_then([this] {
+ return journal.open_for_write().safe_then([this](auto addr) {
logger().debug("TransactionManager::mkfs: about to do_with");
return seastar::do_with(
create_transaction(),
return segment_manager->init(
).safe_then([this] {
return journal.open_for_write();
- }).safe_then([this] {
+ }).safe_then([this](auto addr) {
return seastar::do_with(
make_transaction(),
[this](auto &transaction) {
return segment_manager->init(
).safe_then([this] {
return journal->open_for_write();
- }).handle_error(
+ }).safe_then(
+ [](auto){},
crimson::ct_error::all_same_way([] {
ASSERT_FALSE("Unable to mount");
}));
replay(
[&advance,
&delta_checker]
- (auto base, const auto &di) mutable {
+ (auto seq, auto base, const auto &di) mutable {
if (!delta_checker) {
EXPECT_FALSE("No Deltas Left");
}