SegmentSeqAllocator &ssa);
open_ertr::future<> open() final {
- return record_submitter.open().discard_result();
+ return record_submitter.open(false).discard_result();
}
alloc_write_iertr::future<> alloc_write_ool_extents(
class Journal {
public:
+ /**
+ * initializes journal for mkfs writes -- must run prior to calls
+ * to submit_record.
+ */
+ using open_for_mkfs_ertr = crimson::errorator<
+ crimson::ct_error::input_output_error
+ >;
+ using open_for_mkfs_ret = open_for_mkfs_ertr::future<journal_seq_t>;
+ virtual open_for_mkfs_ret open_for_mkfs() = 0;
+
/**
* initializes journal for new writes -- must run prior to calls
* to submit_record. Should be called after replay if not a new
* Journal.
*/
- using open_for_write_ertr = crimson::errorator<
- crimson::ct_error::input_output_error
- >;
- using open_for_write_ret = open_for_write_ertr::future<journal_seq_t>;
- virtual open_for_write_ret open_for_write() = 0;
+ using open_for_mount_ertr = open_for_mkfs_ertr;
+ using open_for_mount_ret = open_for_mkfs_ret;
+ virtual open_for_mount_ret open_for_mount() = 0;
/// close journal
using close_ertr = crimson::errorator<
});
}
-CircularBoundedJournal::open_for_write_ertr::future<>
+CircularBoundedJournal::open_for_mount_ertr::future<>
CircularBoundedJournal::_open_device(const std::string &path)
{
ceph_assert(device);
return device->open(path, seastar::open_flags::rw
).handle_error(
- open_for_write_ertr::pass_further{},
+ open_for_mount_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error device->open"
}
return bl;
}
-CircularBoundedJournal::open_for_write_ret CircularBoundedJournal::open_for_write()
+CircularBoundedJournal::open_for_mkfs_ret
+CircularBoundedJournal::open_for_mkfs()
+{
+ return open_for_mount();
+}
+
+CircularBoundedJournal::open_for_mount_ret
+CircularBoundedJournal::open_for_mount()
{
ceph_assert(initialized);
paddr_t paddr = convert_abs_addr_to_paddr(
if (circulation_seq == NULL_SEG_SEQ) {
circulation_seq = 0;
}
- return open_for_write_ret(
- open_for_write_ertr::ready_future_marker{},
+ return open_for_mount_ret(
+ open_for_mount_ertr::ready_future_marker{},
journal_seq_t{
circulation_seq,
paddr
initialized = false;
return device->close();
}).handle_error(
- open_for_write_ertr::pass_further{},
+ open_for_mount_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error write_header"
}
);
}
-CircularBoundedJournal::open_for_write_ret
+CircularBoundedJournal::open_for_mount_ret
CircularBoundedJournal::open_device_read_header()
{
LOG_PREFIX(CircularBoundedJournal::open_device_read_header);
).safe_then([this, FNAME]() {
return read_header(
).handle_error(
- open_for_write_ertr::pass_further{},
+ open_for_mount_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error read_header"
}).safe_then([this, FNAME](auto p) mutable {
get_written_to(),
header.device_id);
initialized = true;
- return open_for_write_ret(
- open_for_write_ertr::ready_future_marker{},
+ return open_for_mount_ret(
+ open_for_mount_ertr::ready_future_marker{},
journal_seq_t{
circulation_seq,
paddr
});
});
}).handle_error(
- open_for_write_ertr::pass_further{},
+ open_for_mount_ertr::pass_further{},
crimson::ct_error::assert_all{
"Invalid error _open_device"
});
CircularBoundedJournal(NVMeBlockDevice* device, const std::string &path);
~CircularBoundedJournal() {}
- open_for_write_ret open_for_write() final;
- open_for_write_ret open_device_read_header();
+ open_for_mkfs_ret open_for_mkfs() final;
+
+ open_for_mount_ret open_for_mount() final;
+
+ open_for_mount_ret open_device_read_header();
close_ertr::future<> close() final;
journal_type_t get_type() final {
replay_ret replay(delta_handler_t &&delta_handler) final;
- open_for_write_ertr::future<> _open_device(const std::string &path);
+ open_for_mount_ertr::future<> _open_device(const std::string &path);
struct cbj_header_t;
using write_ertr = submit_record_ertr;
}
SegmentAllocator::open_ret
-SegmentAllocator::do_open()
+SegmentAllocator::do_open(bool is_mkfs)
{
LOG_PREFIX(SegmentAllocator::do_open);
ceph_assert(!current_segment);
crimson::ct_error::assert_all{
"Invalid error in SegmentAllocator::do_open open"
}
- ).safe_then([this, FNAME, new_segment_seq](auto sref) {
+ ).safe_then([this, is_mkfs, FNAME, new_segment_seq](auto sref) {
// initialize new segment
+ segment_id_t segment_id = sref->get_segment_id();
journal_seq_t new_journal_tail;
journal_seq_t new_alloc_replay_from;
if (type == segment_type_t::JOURNAL) {
new_journal_tail = segment_provider.get_journal_tail_target();
new_alloc_replay_from = segment_provider.get_alloc_info_replay_from();
+ if (is_mkfs) {
+ ceph_assert(new_journal_tail == JOURNAL_SEQ_NULL);
+ ceph_assert(new_alloc_replay_from == JOURNAL_SEQ_NULL);
+ auto mkfs_seq = journal_seq_t{
+ new_segment_seq,
+ paddr_t::make_seg_paddr(segment_id, 0)
+ };
+ new_journal_tail = mkfs_seq;
+ new_alloc_replay_from = mkfs_seq;
+ } else {
+ ceph_assert(new_journal_tail != JOURNAL_SEQ_NULL);
+ ceph_assert(new_alloc_replay_from != JOURNAL_SEQ_NULL);
+ }
} else { // OOL
+ ceph_assert(!is_mkfs);
new_journal_tail = NO_DELTAS;
new_alloc_replay_from = NO_DELTAS;
}
- segment_id_t segment_id = sref->get_segment_id();
auto header = segment_header_t{
new_segment_seq,
segment_id,
}
SegmentAllocator::open_ret
-SegmentAllocator::open()
+SegmentAllocator::open(bool is_mkfs)
{
LOG_PREFIX(SegmentAllocator::open);
auto& device_ids = sm_group.get_device_ids();
print_name = oss.str();
INFO("{}", print_name);
- return do_open();
+ return do_open(is_mkfs);
}
SegmentAllocator::roll_ertr::future<>
{
ceph_assert(can_write());
return close_segment().safe_then([this] {
- return do_open().discard_result();
+ return do_open(false).discard_result();
});
}
}
RecordSubmitter::open_ret
-RecordSubmitter::open()
+RecordSubmitter::open(bool is_mkfs)
{
- return segment_allocator.open(
+ return segment_allocator.open(is_mkfs
).safe_then([this](journal_seq_t ret) {
LOG_PREFIX(RecordSubmitter::open);
DEBUG("{} register metrics", get_name());
// open for write and generate the correct print name
using open_ertr = base_ertr;
using open_ret = open_ertr::future<journal_seq_t>;
- open_ret open();
+ open_ret open(bool is_mkfs);
// close the current segment and initialize next one
using roll_ertr = base_ertr;
close_ertr::future<> close();
private:
- open_ret do_open();
+ open_ret do_open(bool is_mkfs);
void reset() {
current_segment.reset();
// open for write, generate the correct print name, and register metrics
using open_ertr = base_ertr;
using open_ret = open_ertr::future<journal_seq_t>;
- open_ret open();
+ open_ret open(bool is_mkfs);
using close_ertr = base_ertr;
close_ertr::future<> close();
{
}
-SegmentedJournal::open_for_write_ret SegmentedJournal::open_for_write()
+SegmentedJournal::open_for_mkfs_ret
+SegmentedJournal::open_for_mkfs()
{
- return record_submitter.open();
+ return record_submitter.open(true);
+}
+
+SegmentedJournal::open_for_mount_ret
+SegmentedJournal::open_for_mount()
+{
+ return record_submitter.open(false);
}
SegmentedJournal::close_ertr::future<> SegmentedJournal::close()
auto journal_tail = segments.rbegin()->second.journal_tail;
segment_provider.update_journal_tail_committed(journal_tail);
- auto replay_from = journal_tail.offset;
- auto from = segments.begin();
- if (replay_from != P_ADDR_NULL) {
- from = std::find_if(
- segments.begin(),
- segments.end(),
- [&replay_from](const auto &seg) -> bool {
- auto& seg_addr = replay_from.as_seg_paddr();
- return seg.first == seg_addr.get_segment_id();
- });
- if (from->second.segment_seq != journal_tail.segment_seq) {
- ERROR("journal_tail {} does not match {}",
- journal_tail, from->second);
- ceph_abort();
- }
- } else {
- replay_from = paddr_t::make_seg_paddr(
- from->first,
- journal_segment_allocator.get_block_size());
+ auto journal_tail_paddr = journal_tail.offset;
+ ceph_assert(journal_tail != JOURNAL_SEQ_NULL);
+ ceph_assert(journal_tail_paddr != P_ADDR_NULL);
+ auto from = std::find_if(
+ segments.begin(),
+ segments.end(),
+ [&journal_tail_paddr](const auto &seg) -> bool {
+ auto& seg_addr = journal_tail_paddr.as_seg_paddr();
+ return seg.first == seg_addr.get_segment_id();
+ });
+ if (from->second.segment_seq != journal_tail.segment_seq) {
+ ERROR("journal_tail {} does not match {}",
+ journal_tail, from->second);
+ ceph_abort();
}
auto num_segments = segments.end() - from;
- INFO("{} segments to replay, from {}",
- num_segments, replay_from);
+ INFO("{} segments to replay from {}",
+ num_segments, journal_tail);
auto ret = replay_segments_t(num_segments);
std::transform(
from, segments.end(), ret.begin(),
p.second.segment_seq,
paddr_t::make_seg_paddr(
p.first,
- journal_segment_allocator.get_block_size())
+ sm_group.get_block_size())
};
return std::make_pair(ret, p.second);
});
- ret[0].first.offset = replay_from;
+ ret[0].first.offset = journal_tail_paddr;
return prep_replay_segments_fut(
prep_replay_segments_ertr::ready_future_marker{},
std::move(ret));
SegmentedJournal(SegmentProvider &segment_provider);
~SegmentedJournal() {}
- open_for_write_ret open_for_write() final;
+ open_for_mkfs_ret open_for_mkfs() final;
+
+ open_for_mount_ret open_for_mount() final;
close_ertr::future<> close() final;
INFO("enter");
return async_cleaner->mount(
).safe_then([this] {
- return journal->open_for_write();
+ return journal->open_for_mkfs();
}).safe_then([this](auto) {
async_cleaner->init_mkfs();
return epm->open();
modify_time);
});
}).safe_then([this] {
- return journal->open_for_write();
+ return journal->open_for_mount();
}).safe_then([this, FNAME](auto) {
return seastar::do_with(
create_weak_transaction(
std::map<segment_id_t, segment_seq_t> segment_seqs;
std::map<segment_id_t, segment_type_t> segment_types;
+ journal_seq_t dummy_tail;
+
mutable segment_info_t tmp_info;
btree_test_base() = default;
*/
void set_journal_head(journal_seq_t) final {}
- journal_seq_t get_journal_tail_target() const final { return journal_seq_t{}; }
+ journal_seq_t get_journal_tail_target() const final { return dummy_tail; }
const segment_info_t& get_seg_info(segment_id_t id) const final {
tmp_info = {};
SegmentManagerGroup* get_segment_manager_group() final { return sms.get(); }
journal_seq_t get_dirty_extents_replay_from() const final {
- return JOURNAL_SEQ_NULL;
+ return dummy_tail;
}
journal_seq_t get_alloc_info_replay_from() const final {
- return JOURNAL_SEQ_NULL;
+ return dummy_tail;
}
virtual void complete_commit(Transaction &t) {}
epm->add_device(segment_manager.get(), true);
journal->set_write_pipeline(&pipeline);
- return journal->open_for_write().discard_result();
+ return journal->open_for_mkfs().discard_result();
}).safe_then([this] {
+ dummy_tail = journal_seq_t{0,
+ paddr_t::make_seg_paddr(segment_id_t(segment_manager->get_device_id(), 0), 0)};
return epm->open();
}).safe_then([this] {
return seastar::do_with(
}
void open() {
cbj->open_device_read_header().unsafe_get0();
- cbj->open_for_write().unsafe_get0();
+ cbj->open_for_mkfs().unsafe_get0();
}
auto get_available_size() {
return cbj->get_available_size();
std::map<segment_id_t, segment_seq_t> segment_seqs;
std::map<segment_id_t, segment_type_t> segment_types;
+ journal_seq_t dummy_tail;
+
mutable segment_info_t tmp_info;
journal_test_t() = default;
*/
void set_journal_head(journal_seq_t) final {}
- journal_seq_t get_journal_tail_target() const final { return journal_seq_t{}; }
+ journal_seq_t get_journal_tail_target() const final { return dummy_tail; }
const segment_info_t& get_seg_info(segment_id_t id) const final {
tmp_info = {};
}
journal_seq_t get_dirty_extents_replay_from() const final {
- return JOURNAL_SEQ_NULL;
+ return dummy_tail;
}
journal_seq_t get_alloc_info_replay_from() const final {
- return JOURNAL_SEQ_NULL;
+ return dummy_tail;
}
segment_id_t allocate_segment(
journal = journal::make_segmented(*this);
journal->set_write_pipeline(&pipeline);
sms->add_segment_manager(segment_manager.get());
- return journal->open_for_write();
- }).safe_then(
- [](auto){},
- crimson::ct_error::all_same_way([] {
- ASSERT_FALSE("Unable to mount");
- }));
+ return journal->open_for_mkfs();
+ }).safe_then([this](auto) {
+ dummy_tail = journal_seq_t{0,
+ paddr_t::make_seg_paddr(segment_id_t(segment_manager->get_device_id(), 0), 0)};
+ }, crimson::ct_error::all_same_way([] {
+ ASSERT_FALSE("Unable to mount");
+ }));
}
seastar::future<> tear_down_fut() final {
journal->set_write_pipeline(&pipeline);
return journal->replay(std::forward<T>(std::move(f)));
}).safe_then([this] {
- return journal->open_for_write();
+ return journal->open_for_mount();
});
}