assert(device);
auto bptr = bufferptr(ceph::buffer::create_page_aligned(
device->get_block_size()));
- DEBUG("reading {}", device->get_journal_start());
- return device->read(device->get_journal_start(), bptr
+ DEBUG("reading {}", device->get_shard_journal_start());
+ return device->read(device->get_shard_journal_start(), bptr
).safe_then([bptr, FNAME]() mutable
-> read_header_ret {
bufferlist bl;
assert(bl.length() < get_block_size());
bufferptr bp = bufferptr(ceph::buffer::create_page_aligned(get_block_size()));
iter.copy(bl.length(), bp.c_str());
- return device->write(device->get_journal_start(), std::move(bp)
+ return device->write(device->get_shard_journal_start(), std::move(bp)
).handle_error(
write_ertr::pass_further{},
crimson::ct_error::assert_all{ "Invalid error device->write" }
}
rbm_abs_addr get_records_start() const {
assert(device);
- return device->get_journal_start() + get_block_size();
+ return device->get_shard_journal_start() + get_block_size();
}
size_t get_records_available_size() const {
return get_records_total_size() - get_records_used_size();
}
rbm_abs_addr get_journal_end() const {
assert(device);
- return device->get_journal_start() + device->get_journal_size();
+ return device->get_shard_journal_start() + device->get_journal_size();
}
read_ertr::future<> read(
namespace crimson::os::seastore {
+struct rbm_shard_info_t {
+ std::size_t size = 0;
+ uint64_t start_offset = 0;
+
+ DENC(rbm_shard_info_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.size, p);
+ denc(v.start_offset, p);
+ DENC_FINISH(p);
+ }
+};
+
struct rbm_metadata_header_t {
size_t size = 0;
size_t block_size = 0;
uint64_t journal_size = 0;
checksum_t crc = 0;
device_config_t config;
+ unsigned int shard_num = 0;
+ std::vector<rbm_shard_info_t> shard_infos;
DENC(rbm_metadata_header_t, v, p) {
DENC_START(1, 1, p);
denc(v.journal_size, p);
denc(v.crc, p);
denc(v.config, p);
+ denc(v.shard_num, p);
+ denc(v.shard_infos, p);
DENC_FINISH(p);
}
+ void validate() const {
+ ceph_assert(shard_num == seastar::smp::count);
+ ceph_assert(block_size > 0);
+ for (unsigned int i = 0; i < seastar::smp::count; i ++) {
+ ceph_assert(shard_infos[i].size > block_size &&
+ shard_infos[i].size % block_size == 0);
+ ceph_assert_always(shard_infos[i].size <= DEVICE_OFF_MAX);
+ ceph_assert(journal_size > 0 &&
+ journal_size % block_size == 0);
+ ceph_assert(shard_infos[i].start_offset < size &&
+ shard_infos[i].start_offset % block_size == 0);
+ }
+ ceph_assert(config.spec.magic != 0);
+ ceph_assert(get_default_backend_of_device(config.spec.dtype) ==
+ backend_type_t::RANDOM_BLOCK);
+ ceph_assert(config.spec.id <= DEVICE_ID_MAX_VALID);
+ }
};
enum class rbm_extent_state_t {
get_rb_device(const std::string &device);
std::ostream &operator<<(std::ostream &out, const rbm_metadata_header_t &header);
+std::ostream &operator<<(std::ostream &out, const rbm_shard_info_t &shard);
}
+WRITE_CLASS_DENC_BOUNDED(
+ crimson::os::seastore::rbm_shard_info_t
+)
WRITE_CLASS_DENC_BOUNDED(
crimson::os::seastore::rbm_metadata_header_t
)
#if FMT_VERSION >= 90000
template<> struct fmt::formatter<crimson::os::seastore::rbm_metadata_header_t> : fmt::ostream_formatter {};
+template<> struct fmt::formatter<crimson::os::seastore::rbm_shard_info_t> : fmt::ostream_formatter {};
#endif
auto ool_start = get_start_rbm_addr();
allocator->init(
ool_start,
- device->get_available_size() -
+ device->get_shard_end() -
ool_start,
device->get_block_size());
return open_ertr::now();
LOG_PREFIX(BlockRBManager::write);
ceph_assert(device);
rbm_abs_addr addr = convert_paddr_to_abs_addr(paddr);
- rbm_abs_addr start = 0;
- rbm_abs_addr end = device->get_available_size();
+ rbm_abs_addr start = device->get_shard_start();
+ rbm_abs_addr end = device->get_shard_end();
if (addr < start || addr + bptr.length() > end) {
ERROR("out of range: start {}, end {}, addr {}, length {}",
start, end, addr, bptr.length());
LOG_PREFIX(BlockRBManager::read);
ceph_assert(device);
rbm_abs_addr addr = convert_paddr_to_abs_addr(paddr);
- rbm_abs_addr start = 0;
- rbm_abs_addr end = device->get_available_size();
+ rbm_abs_addr start = device->get_shard_start();
+ rbm_abs_addr end = device->get_shard_end();
if (addr < start || addr + bptr.length() > end) {
ERROR("out of range: start {}, end {}, addr {}, length {}",
start, end, addr, bptr.length());
<< ", feature=" << header.feature
<< ", journal_size=" << header.journal_size
<< ", crc=" << header.crc
- << ", config=" << header.config;
+ << ", config=" << header.config
+ << ", shard_num=" << header.shard_num;
+ for (auto p : header.shard_infos) {
+ out << p;
+ }
+ return out << ")";
+}
+
+std::ostream &operator<<(std::ostream &out, const rbm_shard_info_t &shard)
+{
+ out << " rbm_shard_info_t(size=" << shard.size
+ << ", start_offset=" << shard.start_offset;
return out << ")";
}
void complete_allocation(paddr_t addr, size_t size) final;
size_t get_start_rbm_addr() const {
- return device->get_journal_start() + device->get_journal_size();
+ return device->get_shard_journal_start() + device->get_journal_size();
}
size_t get_size() const final {
- return device->get_available_size() - get_start_rbm_addr();
+ return device->get_shard_end() - get_start_rbm_addr();
};
extent_len_t get_block_size() const final { return device->get_block_size(); }
assert(allocator);
rbm_abs_addr addr = convert_paddr_to_abs_addr(paddr);
assert(addr >= get_start_rbm_addr() &&
- addr + len <= device->get_available_size());
+ addr + len <= device->get_shard_end());
allocator->mark_extent_used(addr, len);
}
assert(allocator);
rbm_abs_addr addr = convert_paddr_to_abs_addr(paddr);
assert(addr >= get_start_rbm_addr() &&
- addr + len <= device->get_available_size());
+ addr + len <= device->get_shard_end());
allocator->free_extent(addr, len);
}
assert(allocator);
rbm_abs_addr addr = convert_paddr_to_abs_addr(paddr);
assert(addr >= get_start_rbm_addr() &&
- addr + size <= device->get_available_size());
+ addr + size <= device->get_shard_end());
return allocator->get_extent_state(addr, size);
}
namespace crimson::os::seastore::random_block_device::nvme {
+NVMeBlockDevice::mkfs_ret NVMeBlockDevice::mkfs(device_config_t config) {
+ using crimson::common::get_conf;
+ return shard_devices.local().do_primary_mkfs(config,
+ seastar::smp::count,
+ get_conf<Option::size_t>("seastore_cbjournal_size")
+ );
+}
+
open_ertr::future<> NVMeBlockDevice::open(
const std::string &in_path,
seastar::open_flags mode) {
return seastar::do_with(in_path, [this, mode](auto& in_path) {
return seastar::file_stat(in_path).then([this, mode, in_path](auto stat) {
return seastar::open_file_dma(in_path, mode).then([=, this](auto file) {
- device = file;
+ device = std::move(file);
logger().debug("open");
// Get SSD's features from identify_controller and namespace command.
// Do identify_controller first, and then identify_namespace.
return seastar::open_file_dma(in_path, mode).then([this](
auto file) {
assert(io_device.size() > stream_index_to_open);
- io_device[stream_index_to_open] = file;
+ io_device[stream_index_to_open] = std::move(file);
return io_device[stream_index_to_open].fcntl(
F_SET_FILE_RW_HINT,
(uintptr_t)&stream_index_to_open).then([this](auto ret) {
NVMeBlockDevice::mount_ret NVMeBlockDevice::mount()
{
logger().debug(" mount ");
- return do_mount();
+ return shard_devices.invoke_on_all([](auto &local_device) {
+ return local_device.do_shard_mount(
+ ).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in RBMDevice::do_mount"
+ });
+ });
}
write_ertr::future<> NVMeBlockDevice::write(
mount_ret mount() final;
- mkfs_ret mkfs(device_config_t config) final {
- using crimson::common::get_conf;
- super.journal_size = get_conf<Option::size_t>("seastore_cbjournal_size");
- return do_mkfs(config);
- }
+ mkfs_ret mkfs(device_config_t config) final;
write_ertr::future<> writev(
uint64_t offset,
return device_path;
}
+ seastar::future<> start() final {
+ return shard_devices.start(device_path);
+ }
+
+ seastar::future<> stop() final {
+ return shard_devices.stop();
+ }
+
+ Device& get_sharded_device() final {
+ return shard_devices.local();
+ }
+
uint64_t get_preffered_write_granularity() const { return write_granularity; }
uint64_t get_preffered_write_alignment() const { return write_alignment; }
uint64_t get_atomic_write_unit() const { return atomic_write_unit; }
bool data_protection_enabled = false;
std::string device_path;
+ seastar::sharded<NVMeBlockDevice> shard_devices;
};
}
#include "crimson/os/seastore/logging.h"
SET_SUBSYS(seastore_device);
-RBMDevice::mkfs_ret RBMDevice::do_mkfs(device_config_t config) {
- LOG_PREFIX(RBMDevice::mkfs);
+RBMDevice::mkfs_ret RBMDevice::do_primary_mkfs(device_config_t config,
+ int shard_num, size_t journal_size) {
+ LOG_PREFIX(RBMDevice::do_primary_mkfs);
return stat_device(
).handle_error(
mkfs_ertr::pass_further{},
crimson::ct_error::assert_all{
- "Invalid error stat_device in RBMDevice::mkfs"}
- ).safe_then([this, FNAME, config=std::move(config)](auto st) {
+ "Invalid error stat_device in RBMDevice::do_primary_mkfs"}
+ ).safe_then(
+ [this, FNAME, config=std::move(config), shard_num, journal_size](auto st) {
super.block_size = st.block_size;
super.size = st.size;
super.feature |= RBM_BITMAP_BLOCK_CRC;
super.config = std::move(config);
- assert(super.journal_size);
- assert(super.size >= super.journal_size);
+ super.journal_size = journal_size;
+ ceph_assert_always(super.journal_size > 0);
+ ceph_assert_always(super.size >= super.journal_size);
+ ceph_assert_always(shard_num > 0);
+
+ std::vector<rbm_shard_info_t> shard_infos(shard_num);
+ for (int i = 0; i < shard_num; i++) {
+ uint64_t aligned_size =
+ (super.size / shard_num) -
+ ((super.size / shard_num) % super.block_size);
+ shard_infos[i].size = aligned_size;
+ shard_infos[i].start_offset = i * aligned_size;
+ assert(shard_infos[i].size > super.journal_size);
+ }
+ super.shard_infos = shard_infos;
+ super.shard_num = shard_num;
+ shard_info = shard_infos[seastar::this_shard_id()];
DEBUG("super {} ", super);
+
// write super block
return open(get_device_path(),
seastar::open_flags::rw | seastar::open_flags::dsync
).handle_error(
mkfs_ertr::pass_further{},
crimson::ct_error::assert_all{
- "Invalid error open in RBMDevice::mkfs"}
+ "Invalid error open in RBMDevice::do_primary_mkfs"}
).safe_then([this] {
return write_rbm_header(
).safe_then([this] {
}).handle_error(
mkfs_ertr::pass_further{},
crimson::ct_error::assert_all{
- "Invalid error write_rbm_header in RBMDevice::mkfs"
+ "Invalid error write_rbm_header in RBMDevice::do_primary_mkfs"
});
});
});
});
}
-RBMDevice::mount_ret RBMDevice::do_mount()
+RBMDevice::mount_ret RBMDevice::do_shard_mount()
{
return open(get_device_path(),
seastar::open_flags::rw | seastar::open_flags::dsync
).handle_error(
mount_ertr::pass_further{},
crimson::ct_error::assert_all{
- "Invalid error stat_device in RBMDevice::mount"}
+ "Invalid error stat_device in RBMDevice::do_shard_mount"}
).safe_then([this](auto st) {
+ assert(st.block_size > 0);
super.block_size = st.block_size;
return read_rbm_header(RBM_START_ADDRESS
- ).safe_then([](auto s) {
+ ).safe_then([this](auto s) {
+ LOG_PREFIX(RBMDevice::do_shard_mount);
+ shard_info = s.shard_infos[seastar::this_shard_id()];
+ INFO("{} read {}", device_id_printer_t{get_device_id()}, shard_info);
+ s.validate();
return seastar::now();
});
});
}).handle_error(
mount_ertr::pass_further{},
crimson::ct_error::assert_all{
- "Invalid error mount in NVMeBlockDevice::mount"}
+ "Invalid error mount in RBMDevice::do_shard_mount"}
);
}
EphemeralRBMDeviceRef create_test_ephemeral(uint64_t journal_size, uint64_t data_size) {
return EphemeralRBMDeviceRef(
new EphemeralRBMDevice(journal_size + data_size +
- random_block_device::RBMDevice::get_journal_start(),
+ random_block_device::RBMDevice::get_shard_reserved_size(),
EphemeralRBMDevice::TEST_BLOCK_SIZE));
}
return write_ertr::now();
}
+EphemeralRBMDevice::mount_ret EphemeralRBMDevice::mount() {
+ return do_shard_mount();
+}
+
+EphemeralRBMDevice::mkfs_ret EphemeralRBMDevice::mkfs(device_config_t config) {
+ return do_primary_mkfs(config, 1, DEFAULT_TEST_CBJOURNAL_SIZE);
+}
+
}
}
protected:
rbm_metadata_header_t super;
+ rbm_shard_info_t shard_info;
public:
RBMDevice() {}
virtual ~RBMDevice() = default;
mkfs_ret do_mkfs(device_config_t);
+ // shard 0 mkfs
+ mkfs_ret do_primary_mkfs(device_config_t, int shard_num, size_t journal_size);
+
mount_ret do_mount();
+ mount_ret do_shard_mount();
+
write_ertr::future<> write_rbm_header();
read_ertr::future<rbm_metadata_header_t> read_rbm_header(rbm_abs_addr addr);
return super.journal_size;
}
- static rbm_abs_addr get_journal_start() {
+ static rbm_abs_addr get_shard_reserved_size() {
return RBM_SUPERBLOCK_SIZE;
}
+
+ rbm_abs_addr get_shard_journal_start() {
+ return shard_info.start_offset + get_shard_reserved_size();
+ }
+
+ uint64_t get_shard_start() const {
+ return shard_info.start_offset;
+ }
+
+ uint64_t get_shard_end() const {
+ return shard_info.start_offset + shard_info.size;
+ }
};
using RBMDeviceRef = std::unique_ptr<RBMDevice>;
std::size_t get_available_size() const final { return size; }
extent_len_t get_block_size() const final { return block_size; }
- mount_ret mount() final {
- return do_mount();
- }
-
- mkfs_ret mkfs(device_config_t config) final {
- super.journal_size = DEFAULT_TEST_CBJOURNAL_SIZE;
- return do_mkfs(config);
- }
+ mount_ret mount() final;
+ mkfs_ret mkfs(device_config_t config) final;
open_ertr::future<> open(
const std::string &in_path,
->get_journal_size() - primary_device->get_block_size();
// see CircularBoundedJournal::get_records_start()
roll_start = static_cast<random_block_device::RBMDevice*>(primary_device)
- ->get_journal_start() + primary_device->get_block_size();
+ ->get_shard_journal_start() + primary_device->get_block_size();
ceph_assert_always(roll_size <= DEVICE_OFF_MAX);
ceph_assert_always((std::size_t)roll_size + roll_start <=
primary_device->get_available_size());