CircularBoundedJournal::mkfs(const mkfs_config_t& config)
{
LOG_PREFIX(CircularBoundedJournal::mkfs);
- return _open_device(path
- ).safe_then([this, config, FNAME]() mutable -> mkfs_ret {
- assert(static_cast<seastore_off_t>(config.block_size) ==
- device->get_block_size());
- ceph::bufferlist bl;
- CircularBoundedJournal::cbj_header_t head;
- head.block_size = config.block_size;
- head.size = config.total_size - device->get_block_size();
- head.journal_tail = device->get_block_size();
- head.device_id = config.device_id;
- encode(head, bl);
- header = head;
- set_written_to(head.journal_tail);
- DEBUG(
- "initialize header block in CircularBoundedJournal, length {}",
- bl.length());
- return write_header(
- ).handle_error(
- mkfs_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error in CircularBoundedJournal::mkfs"
- }
- );
- }).safe_then([this]() -> mkfs_ret {
- if (device) {
- return device->close();
- }
- return mkfs_ertr::now();
- });
-}
-
-CircularBoundedJournal::open_for_mount_ertr::future<>
-CircularBoundedJournal::_open_device(const std::string &path)
-{
- ceph_assert(device);
- return device->open(path, seastar::open_flags::rw
+ assert(device);
+ assert(static_cast<seastore_off_t>(config.block_size) ==
+ device->get_block_size());
+ ceph::bufferlist bl;
+ CircularBoundedJournal::cbj_header_t head;
+ head.block_size = config.block_size;
+ head.size = config.total_size - device->get_block_size();
+ head.journal_tail = device->get_block_size();
+ head.device_id = config.device_id;
+ encode(head, bl);
+ header = head;
+ set_written_to(head.journal_tail);
+ initialized = true;
+ DEBUG(
+ "initialize header block in CircularBoundedJournal, length {}",
+ bl.length());
+ return write_header(
).handle_error(
- open_for_mount_ertr::pass_further{},
+ mkfs_ertr::pass_further{},
crimson::ct_error::assert_all{
- "Invalid error device->open"
+ "Invalid error in CircularBoundedJournal::mkfs"
}
);
}
CircularBoundedJournal::open_for_mount_ret
CircularBoundedJournal::open_for_mount()
{
- ceph_assert(initialized);
paddr_t paddr = convert_abs_addr_to_paddr(
get_written_to(),
header.device_id);
+ ceph_assert(initialized);
if (circulation_seq == NULL_SEG_SEQ) {
circulation_seq = 0;
}
open_for_mount_ertr::ready_future_marker{},
journal_seq_t{
circulation_seq,
- paddr
- });
+ paddr});
}
CircularBoundedJournal::close_ertr::future<> CircularBoundedJournal::close()
return write_header(
).safe_then([this]() -> close_ertr::future<> {
initialized = false;
- return device->close();
+ return close_ertr::now();
}).handle_error(
open_for_mount_ertr::pass_further{},
crimson::ct_error::assert_all{
);
}
-CircularBoundedJournal::open_for_mount_ret
-CircularBoundedJournal::open_device_read_header()
-{
- LOG_PREFIX(CircularBoundedJournal::open_device_read_header);
- ceph_assert(!initialized);
- return _open_device(path
- ).safe_then([this, FNAME]() {
- return read_header(
- ).handle_error(
- open_for_mount_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error read_header"
- }).safe_then([this, FNAME](auto p) mutable {
- auto &[head, bl] = *p;
- header = head;
- DEBUG("header : {}", header);
- paddr_t paddr = convert_abs_addr_to_paddr(
- get_written_to(),
- header.device_id);
- initialized = true;
- return open_for_mount_ret(
- open_for_mount_ertr::ready_future_marker{},
- journal_seq_t{
- circulation_seq,
- paddr
- });
- });
- }).handle_error(
- open_for_mount_ertr::pass_further{},
- crimson::ct_error::assert_all{
- "Invalid error _open_device"
- });
-}
-
CircularBoundedJournal::submit_record_ret CircularBoundedJournal::submit_record(
record_t &&record,
OrderingHandle &handle)
* read records from last applied record prior to written_to, and replay
*/
LOG_PREFIX(CircularBoundedJournal::replay);
- return open_device_read_header(
- ).safe_then([this, FNAME, delta_handler=std::move(delta_handler)] (auto) {
+ return read_header(
+ ).handle_error(
+ open_for_mount_ertr::pass_further{},
+ crimson::ct_error::assert_all{
+ "Invalid error read_header"
+ }).safe_then([this, FNAME, delta_handler=std::move(delta_handler)](auto p) mutable {
+ auto &[head, bl] = *p;
+ header = head;
+ DEBUG("header : {}", header);
+ initialized = true;
circulation_seq = NULL_SEG_SEQ;
set_written_to(get_journal_tail());
return seastar::do_with(
if (expected_seq == NULL_SEG_SEQ || is_rolled) {
DEBUG("no more records, stop replaying");
return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::yes);
+ seastar::stop_iteration>(seastar::stop_iteration::yes);
} else {
cursor_addr = get_start_addr();
++expected_seq;
is_rolled = true;
return replay_ertr::make_ready_future<
- seastar::stop_iteration>(seastar::stop_iteration::no);
+ seastar::stop_iteration>(seastar::stop_iteration::no);
}
}
auto [r_header, bl] = *ret;
if (!maybe_record_deltas_list) {
// This should be impossible, we did check the crc on the mdbuf
ERROR("unable to decode deltas for record {} at {}",
- r_header, record_block_base);
+ r_header, record_block_base);
return crimson::ct_error::input_output_error::make();
}
DEBUG("{} at {}", r_header, cursor_addr);
_mount().handle_error(crimson::ct_error::assert_all{}).get0();
SUBINFO(test, "finish");
}
-
- seastar::future<> tm_setup(
- journal_type_t type = journal_type_t::SEGMENT_JOURNAL) {
- LOG_PREFIX(EphemeralTestState::tm_setup);
- SUBINFO(test, "begin with {} devices ...", get_num_devices());
- journal_type = type;
- // FIXME: should not initialize segment_manager with circularbounded-journal
+ seastar::future<> segment_setup()
+ {
+ LOG_PREFIX(EphemeralTestState::segment_setup);
segment_manager = segment_manager::create_test_ephemeral();
for (auto &sec_sm : secondary_segment_managers) {
sec_sm = segment_manager::create_test_ephemeral();
}
- if (journal_type == journal_type_t::CIRCULARBOUNDED_JOURNAL) {
- auto config =
- journal::CircularBoundedJournal::mkfs_config_t::get_default();
- rb_device.reset(new random_block_device::TestMemory(config.total_size));
- rb_device->set_device_id(
- 1 << (std::numeric_limits<device_id_t>::digits - 1));
- }
return segment_manager->init(
).safe_then([this] {
return crimson::do_for_each(
segment_manager::get_ephemeral_device_config(cnt, get_num_devices()));
});
});
- }).safe_then([this] {
+ }).safe_then([this, FNAME] {
init();
- return _mkfs();
- }).safe_then([this] {
- return _teardown();
- }).safe_then([this] {
- destroy();
- segment_manager->remount();
- for (auto &sec_sm : secondary_segment_managers) {
- sec_sm->remount();
- }
- init();
- return _mount();
+ return _mkfs(
+ ).safe_then([this] {
+ return _teardown();
+ }).safe_then([this] {
+ destroy();
+ segment_manager->remount();
+ for (auto &sec_sm : secondary_segment_managers) {
+ sec_sm->remount();
+ }
+ init();
+ return _mount();
+ }).handle_error(
+ crimson::ct_error::assert_all{}
+ ).then([FNAME] {
+ SUBINFO(test, "finish");
+ });
}).handle_error(
crimson::ct_error::assert_all{}
- ).then([FNAME] {
- SUBINFO(test, "finish");
+ );
+ }
+ seastar::future<> randomblock_setup()
+ {
+ auto config =
+ journal::CircularBoundedJournal::mkfs_config_t::get_default();
+ rb_device.reset(new random_block_device::TestMemory(config.total_size));
+ rb_device->set_device_id(
+ 1 << (std::numeric_limits<device_id_t>::digits - 1));
+ return rb_device->mount().handle_error(crimson::ct_error::assert_all{}
+ ).then([this]() {
+ return segment_setup();
});
}
+ seastar::future<> tm_setup(
+ journal_type_t type = journal_type_t::SEGMENT_JOURNAL) {
+ LOG_PREFIX(EphemeralTestState::tm_setup);
+ SUBINFO(test, "begin with {} devices ...", get_num_devices());
+ journal_type = type;
+ // FIXME: should not initialize segment_manager with circularbounded-journal
+ if (journal_type == journal_type_t::SEGMENT_JOURNAL) {
+ return segment_setup();
+ } else {
+ assert(journal_type == journal_type_t::CIRCULARBOUNDED_JOURNAL);
+ return randomblock_setup();
+ }
+ }
+
seastar::future<> tm_teardown() {
LOG_PREFIX(EphemeralTestState::tm_teardown);
SUBINFO(test, "begin");
return static_cast<journal::CircularBoundedJournal*>(tm->get_journal())->mkfs(
config
).safe_then([this]() {
- return static_cast<journal::CircularBoundedJournal*>(tm->get_journal())->
- open_device_read_header(
- ).safe_then([this](auto) {
- return tm->mkfs(
- ).handle_error(
- crimson::ct_error::assert_all{"Error in mkfs"}
- );
- });
+ return tm->mkfs(
+ ).handle_error(
+ crimson::ct_error::assert_all{"Error in mkfs"}
+ );
}).handle_error(
crimson::ct_error::assert_all{"Error in mkfs"}
);