SeaStore::Shard::Shard(
std::string root,
Device* dev,
- bool is_test)
+ bool is_test,
+ unsigned int store_shard_nums,
+ unsigned int store_index)
:root(root),
max_object_size(
get_conf<uint64_t>("seastore_default_max_object_size")),
is_test(is_test),
throttler(
- get_conf<uint64_t>("seastore_max_concurrent_transactions"))
+ get_conf<uint64_t>("seastore_max_concurrent_transactions")),
+ store_index(store_index)
{
- device = &(dev->get_sharded_device());
- register_metrics();
+ if(seastar::this_shard_id() + seastar::smp::count * store_index >= store_shard_nums) {
+ store_active = false;
+ }
+ device = &(dev->get_sharded_device(store_index));
+
+ register_metrics(store_index);
}
SeaStore::SeaStore(
: root(root),
mdstore(std::move(mdstore))
{
+ store_shard_nums = seastar::smp::count;
}
SeaStore::~SeaStore() = default;
-void SeaStore::Shard::register_metrics()
+void SeaStore::Shard::register_metrics(unsigned int store_index)
{
+ if(!store_active) {
+ return;
+ }
namespace sm = seastar::metrics;
using op_type_t = crimson::os::seastore::op_type_t;
std::pair<op_type_t, sm::label_instance> labels_by_op_type[] = {
return get_latency(op_type);
},
sm::description(desc),
- {label}
+ {label, sm::label_instance("shard_store_index", std::to_string(store_index))}
),
}
);
[this] {
return throttler.get_current();
},
- sm::description("transactions that are running inside seastore")
+ sm::description("transactions that are running inside seastore"),
+ {sm::label_instance("shard_store_index", std::to_string(store_index))}
),
sm::make_gauge(
"pending_transactions",
[this] {
return throttler.get_pending();
},
- sm::description("transactions waiting to get "
- "through seastore's throttler")
+ sm::description("transactions waiting to get "
+ "through seastore's throttler"),
+ {sm::label_instance("shard_store_index", std::to_string(store_index))}
)
}
);
}
-seastar::future<> SeaStore::start()
+seastar::future<> SeaStore::get_shard_nums()
+{
+ LOG_PREFIX(SeaStore::get_shard_nums);
+ auto tuple = co_await read_meta("mkfs_done");
+ auto [done, value] = tuple;
+ if (done == -1) {
+ INFO("seastore not mkfs yet");
+ store_shard_nums = seastar::smp::count;
+ co_return;
+ } else {
+ INFO("seastore mkfs done");
+ auto shard_nums = co_await device->get_shard_nums(
+ ).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in device->get_shard_nums"
+ });
+ INFO("seastore shard nums {}", shard_nums);
+ store_shard_nums = shard_nums;
+ co_return;
+ }
+}
+
+seastar::future<> SeaStore::shard_stores_start(bool is_test)
+{
+ LOG_PREFIX(SeaStore::shard_stores_start);
+ auto num_shard_services = (store_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count;
+ INFO("store_shard_nums={} seastar::smp={}, num_shard_services={}", store_shard_nums, seastar::smp::count, num_shard_services);
+ return shard_stores.start(num_shard_services, root, device.get(), is_test, store_shard_nums);
+}
+
+seastar::future<> SeaStore::shard_stores_stop()
+{
+ LOG_PREFIX(SeaStore::shard_stores_stop);
+ INFO("stopping shard stores");
+ return shard_stores.stop();
+}
+
+seastar::future<unsigned int> SeaStore::start()
{
LOG_PREFIX(SeaStore::start);
INFO("...");
ceph_assert(root != "");
DeviceRef device_obj = co_await Device::make_device(root, d_type);
device = std::move(device_obj);
- co_await device->start();
+ co_await get_shard_nums();
+ co_await device->start(store_shard_nums);
ceph_assert(device);
- co_await shard_stores.start(root, device.get(), is_test);
+ co_await shard_stores_start(is_test);;
INFO("done");
+ co_return store_shard_nums;
}
seastar::future<> SeaStore::test_start(DeviceRef device_obj)
ceph_assert(device_obj);
ceph_assert(root == "");
device = std::move(device_obj);
- co_await shard_stores.start_single(root, device.get(), true);
+ co_await shard_stores.start_single(1, root, device.get(), true, seastar::smp::count);
INFO("done");
}
if (device) {
co_await device->stop();
}
- co_await shard_stores.stop();
+ co_await shard_stores_stop();
INFO("done");
}
INFO("...");
ceph_assert(seastar::this_shard_id() == primary_core);
- co_await shard_stores.local().mount_managers();
+ co_await seastar::do_for_each(shard_stores.local().mshard_stores, [this](auto& mshard_store) {
+ return mshard_store->mount_managers();
+ });
INFO("done");
}
ceph_assert(seastar::this_shard_id() == primary_core);
co_await device->mount();
- ceph_assert(device->get_sharded_device().get_block_size() >= laddr_t::UNIT_SIZE);
+ ceph_assert(device->get_sharded_device(0).get_block_size() >= laddr_t::UNIT_SIZE);
- auto &sec_devices = device->get_sharded_device().get_secondary_devices();
+ auto &sec_devices = device->get_sharded_device(0).get_secondary_devices();
for (auto& device_entry : sec_devices) {
device_id_t id = device_entry.first;
[[maybe_unused]] magic_t magic = device_entry.second.magic;
device_type_t dtype = device_entry.second.dtype;
std::string path = fmt::format("{}/block.{}.{}", root, dtype, std::to_string(id));
DeviceRef sec_dev = co_await Device::make_device(path, dtype);
- co_await sec_dev->start();
+ co_await sec_dev->start(store_shard_nums);
co_await sec_dev->mount();
- ceph_assert(sec_dev->get_sharded_device().get_block_size() >= laddr_t::UNIT_SIZE);
- assert(sec_dev->get_sharded_device().get_magic() == magic);
+ ceph_assert(sec_dev->get_sharded_device(0).get_block_size() >= laddr_t::UNIT_SIZE);
+ assert(sec_dev->get_sharded_device(0).get_magic() == magic);
secondaries.emplace_back(std::move(sec_dev));
co_await set_secondaries();
}
co_await shard_stores.invoke_on_all([](auto &local_store) {
- return local_store.mount_managers();
+ return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+ return mshard_store->mount_managers();
+ });
});
INFO("done");
}
seastar::future<> SeaStore::Shard::mount_managers()
{
+ if(!store_active) {
+ return seastar::now();
+ }
LOG_PREFIX(SeaStore::mount_managers);
INFO("start");
init_managers();
ceph_assert(seastar::this_shard_id() == primary_core);
co_await shard_stores.invoke_on_all([](auto &local_store) {
- return local_store.umount().handle_error(
- crimson::ct_error::assert_all{"Invalid error in SeaStoreS::umount"}
- );
+ return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+ return mshard_store->umount().handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in shard_store->umount"
+ });
+ });
});
INFO("done");
}
base_ertr::future<> SeaStore::Shard::umount()
{
+ if(!store_active) {
+ co_return;
+ }
if (transaction_manager) {
co_await transaction_manager->close();
}
{
LOG_PREFIX(SeaStoreS::mkfs_managers);
INFO("...");
+ if(!store_active) {
+ co_return;
+ }
init_managers();
co_await transaction_manager->mkfs();
init_managers();
{
auto sec_dev_ite = secondaries.rbegin();
Device* sec_dev = sec_dev_ite->get();
+
return shard_stores.invoke_on_all([sec_dev](auto &local_store) {
- local_store.set_secondaries(sec_dev->get_sharded_device());
+ return seastar::do_for_each(local_store.mshard_stores, [sec_dev](auto& mshard_store) {
+ unsigned int index = mshard_store->get_store_index();
+ mshard_store->set_secondaries(sec_dev->get_sharded_device(index));
+ });
});
}
ERROR("failed");
co_return;
}
- co_await shard_stores.local().mkfs_managers().handle_error(
+ co_await shard_stores.local().mshard_stores[0]->mkfs_managers().handle_error(
crimson::ct_error::assert_all{"Invalid error in SeaStore::mkfs"});
co_await prepare_meta(new_osd_fsid);
INFO("done");
DeviceRef sec_dev = co_await Device::make_device(path, dtype);
auto p_sec_dev = sec_dev.get();
secondaries.emplace_back(std::move(sec_dev));
- co_await p_sec_dev->start();
+ co_await p_sec_dev->start(store_shard_nums);
magic_t magic = (magic_t)std::rand();
sds.emplace((device_id_t)id, device_spec_t{magic, dtype, (device_id_t)id});
co_await p_sec_dev->mkfs(
co_await device->mount();
DEBUG("mkfs managers");
co_await shard_stores.invoke_on_all([] (auto &local_store) {
- return local_store.mkfs_managers().handle_error(
- crimson::ct_error::assert_all{"Invalid error in SeaStoreS::mkfs_managers"});
+ return seastar::do_for_each(local_store.mshard_stores, [](auto& mshard_store) {
+ return mshard_store->mkfs_managers().handle_error(
+ crimson::ct_error::assert_all{"Invalid error in SeaStoreS::mkfs_managers"});
+ });
});
co_await prepare_meta(new_osd_fsid);
co_await umount();
DEBUG("...");
ceph_assert(seastar::this_shard_id() == primary_core);
- return shard_stores.map([](auto &local_store) {
- return local_store.list_collections();
- }).then([FNAME](std::vector<std::vector<coll_core_t>> results) {
- std::vector<coll_core_t> collections;
- for (auto& colls : results) {
- collections.insert(collections.end(), colls.begin(), colls.end());
+ return shard_stores.map_reduce0(
+ [this](auto& local_store) {
+ // For each local store, collect all collections from its mshard_stores
+ return seastar::map_reduce(
+ local_store.mshard_stores.begin(),
+ local_store.mshard_stores.end(),
+ [](auto& mshard_store) {
+ return mshard_store->list_collections();
+ },
+ std::vector<coll_core_t>(),
+ [](auto&& merged, auto&& result) {
+ merged.insert(merged.end(), result.begin(), result.end());
+ return std::move(merged);
+ }
+ );
+ },
+ std::vector<coll_core_t>(),
+ [](auto&& total, auto&& shard_result) {
+ total.insert(total.end(), shard_result.begin(), shard_result.end());
+ return std::move(total);
}
- DEBUG("got {} collections", collections.size());
- return seastar::make_ready_future<std::vector<coll_core_t>>(
- std::move(collections));
+ ).then([FNAME](auto all_collections) {
+ DEBUG("got {} collections", all_collections.size());
+ return seastar::make_ready_future<std::vector<coll_core_t>>(std::move(all_collections));
});
}
-store_statfs_t SeaStore::Shard::stat() const
+seastar::future<store_statfs_t> SeaStore::Shard::stat() const
{
+ if(!store_active) {
+ return seastar::make_ready_future<store_statfs_t>(store_statfs_t());
+ }
LOG_PREFIX(SeaStoreS::stat);
auto ss = transaction_manager->store_stat();
DEBUG("stat={}", ss);
- return ss;
+ return seastar::make_ready_future<store_statfs_t>(ss);
}
seastar::future<store_statfs_t> SeaStore::stat() const
ceph_assert(seastar::this_shard_id() == primary_core);
return shard_stores.map_reduce0(
- [](const SeaStore::Shard &local_store) {
- return local_store.stat();
+ [](auto& local_store) {
+ return seastar::map_reduce(
+ local_store.mshard_stores.begin(),
+ local_store.mshard_stores.end(),
+ [](auto& mshard_store) { return mshard_store->stat(); },
+ store_statfs_t(),
+ [](auto&& ss, auto&& ret) {
+ ss.add(ret);
+ return std::move(ss);
+ }
+ );
},
store_statfs_t(),
- [](auto &&ss, auto &&ret) {
- ss.add(ret);
- return std::move(ss);
+ [](auto&& total_stats, auto&& shard_stats) {
+ total_stats.add(shard_stats);
+ return std::move(total_stats);
}
- ).then([FNAME](store_statfs_t ss) {
- DEBUG("done, stat={}", ss);
- return seastar::make_ready_future<store_statfs_t>(std::move(ss));
+ ).then([FNAME](auto final_stats) {
+ DEBUG("done, stat={}", final_stats);
+ return seastar::make_ready_future<store_statfs_t>(std::move(final_stats));
});
}
DEBUG("...");
ceph_assert(seastar::this_shard_id() == primary_core);
- shard_device_stats.resize(seastar::smp::count);
- shard_io_stats.resize(seastar::smp::count);
- shard_cache_stats.resize(seastar::smp::count);
- return shard_stores.invoke_on_all([this](const Shard &local_store) {
- bool report_detail = false;
- double seconds = 0;
- if (seastar::this_shard_id() == 0) {
- // avoid too verbose logs, only report detail in a particular shard
- report_detail = true;
- seconds = local_store.reset_report_interval();
- }
- shard_device_stats[seastar::this_shard_id()] =
- local_store.get_device_stats(report_detail, seconds);
- shard_io_stats[seastar::this_shard_id()] =
- local_store.get_io_stats(report_detail, seconds);
- shard_cache_stats[seastar::this_shard_id()] =
- local_store.get_cache_stats(report_detail, seconds);
+ shard_device_stats.resize(store_shard_nums);
+ shard_io_stats.resize(store_shard_nums);
+ shard_cache_stats.resize(store_shard_nums);
+ return shard_stores.invoke_on_all([this](auto& local_store) {
+ return seastar::do_for_each(local_store.mshard_stores, [this](auto& mshard_store) {
+ bool report_detail = false;
+ double seconds = 0;
+ if (seastar::this_shard_id() == 0 && mshard_store->get_store_index() == 0) {
+ // avoid too verbose logs, only report detail in a particular shard
+ report_detail = true;
+ seconds = mshard_store->reset_report_interval();
+ }
+ shard_device_stats[seastar::this_shard_id() + seastar::smp::count * mshard_store->get_store_index()] =
+ mshard_store->get_device_stats(report_detail, seconds);
+ shard_io_stats[seastar::this_shard_id() + seastar::smp::count * mshard_store->get_store_index()] =
+ mshard_store->get_io_stats(report_detail, seconds);
+ shard_cache_stats[seastar::this_shard_id() + seastar::smp::count * mshard_store->get_store_index()] =
+ mshard_store->get_cache_stats(report_detail, seconds);
+ });
}).then([this, FNAME] {
auto now = seastar::lowres_clock::now();
if (last_tp == seastar::lowres_clock::time_point::min()) {
TransactionManager::read_extent_iertr::future<std::optional<unsigned>>
SeaStore::Shard::get_coll_bits(CollectionRef ch, Transaction &t) const
{
- return transaction_manager->read_collection_root(t)
- .si_then([this, ch, &t](auto coll_root) {
- return collection_manager->list(coll_root, t);
- }).si_then([ch](auto colls) {
- auto it = std::find_if(colls.begin(), colls.end(),
- [ch](const std::pair<coll_t, coll_info_t>& element) {
- return element.first == ch->get_cid();
- });
- if (it != colls.end()) {
- return TransactionManager::read_extent_iertr::make_ready_future<
- std::optional<unsigned>>(it->second.split_bits);
- } else {
- return TransactionManager::read_extent_iertr::make_ready_future<
- std::optional<unsigned>>(std::nullopt);
- }
- });
+ auto coll_root = co_await transaction_manager->read_collection_root(t);
+ auto colls = co_await collection_manager->list(coll_root, t);
+
+ auto it = std::find_if(colls.begin(), colls.end(),
+ [ch](const std::pair<coll_t, coll_info_t>& element) {
+ return element.first == ch->get_cid();
+ });
+
+ if (it != colls.end()) {
+ co_return it->second.split_bits;
+ } else {
+ co_return std::nullopt;
+ }
}
col_obj_ranges_t
uint64_t limit,
uint32_t op_flags) const
{
+ assert(store_active);
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
seastar::future<CollectionRef>
SeaStore::Shard::create_new_collection(const coll_t& cid)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::create_new_collection);
DEBUG("cid={}", cid);
return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
seastar::future<CollectionRef>
SeaStore::Shard::open_collection(const coll_t& cid)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::open_collection);
DEBUG("cid={} ...", cid);
return list_collections(
).then([cid, this, FNAME] (auto colls_cores) {
if (auto found = std::find(colls_cores.begin(),
colls_cores.end(),
- std::make_pair(cid, seastar::this_shard_id()));
+ std::make_pair(cid, std::make_pair(seastar::this_shard_id(), store_index)));
found != colls_cores.end()) {
DEBUG("cid={} exists", cid);
return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
SeaStore::Shard::set_collection_opts(CollectionRef c,
const pool_opts_t& opts)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::set_collection_opts);
DEBUG("cid={}, opts={} not implemented", c->get_cid(), opts);
//TODO
seastar::future<std::vector<coll_core_t>>
SeaStore::Shard::list_collections()
{
+ if(!store_active) {
+ return seastar::make_ready_future<std::vector<coll_core_t>>();
+ }
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
return transaction_manager->read_collection_root(t
).si_then([this, &t](auto coll_root) {
return collection_manager->list(coll_root, t);
- }).si_then([&ret](auto colls) {
+ }).si_then([this, &ret](auto colls) {
ret.resize(colls.size());
std::transform(
colls.begin(), colls.end(), ret.begin(),
- [](auto p) {
- return std::make_pair(p.first, seastar::this_shard_id());
+ [this](auto p) {
+ return std::make_pair(p.first, std::make_pair(seastar::this_shard_id(), store_index));
});
});
});
std::size_t len,
uint32_t op_flags)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::_read);
size_t size = onode.get_layout().size;
if (offset >= size) {
size_t len,
uint32_t op_flags)
{
+ assert(store_active);
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
const ghobject_t& oid,
uint32_t op_flags)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::exists);
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
interval_set<uint64_t>& m,
uint32_t op_flags)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::readv);
DEBUG("cid={} oid={} op_flags=0x{:x} {} intervals",
ch->get_cid(), _oid, op_flags, m.num_intervals());
Onode& onode,
std::string_view name) const
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::_get_attr);
auto& layout = onode.get_layout();
if (name == OI_ATTR && layout.oi_size) {
std::string_view name,
uint32_t op_flags) const
{
+ assert(store_active);
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
Transaction& t,
Onode& onode)
{
+ assert(store_active);
auto& layout = onode.get_layout();
return omaptree_get_values(
t, get_omap_root(omap_type_t::XATTR, onode), std::nullopt
const ghobject_t& oid,
uint32_t op_flags)
{
+ assert(store_active);
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
const ghobject_t& oid,
uint32_t op_flags)
{
+ if(!store_active) {
+ return seastar::make_ready_future<struct stat>();
+ }
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
const ghobject_t& oid,
uint32_t op_flags)
{
+ assert(store_active);
return get_attr(ch, oid, OMAP_HEADER_XATTR_KEY, op_flags);
}
omap_root_t SeaStore::Shard::select_log_omap_root(Onode& onode) const
{
+ assert(store_active);
auto log_root = get_omap_root(omap_type_t::LOG, onode);
if (log_root.is_null()) {
return get_omap_root(omap_type_t::OMAP, onode);
const omap_keys_t &keys,
uint32_t op_flags)
{
+ assert(store_active);
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
omap_iterate_cb_t callback,
uint32_t op_flags)
{
+ assert(store_active);
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
return seastar::do_with(
uint64_t len,
uint32_t op_flags)
{
+ assert(store_active);
++(shard_stats.read_num);
++(shard_stats.pending_read_num);
CollectionRef _ch,
ceph::os::Transaction&& _t)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::do_transaction_no_callbacks);
++(shard_stats.io_num);
++(shard_stats.pending_io_num);
seastar::future<> SeaStore::Shard::flush(CollectionRef ch)
{
+ assert(store_active);
++(shard_stats.flush_num);
++(shard_stats.pending_flush_num);
ceph_assert(seastar::this_shard_id() == primary_core);
return seastar::do_with(key, value,
[this, FNAME](auto& key, auto& value) {
- return shard_stores.local().write_meta(key, value
+ return shard_stores.local().mshard_stores[0]->write_meta(key, value
).then([this, &key, &value] {
return mdstore->write_meta(key, value);
}).safe_then([FNAME, &key, &value] {
const std::string& key,
const std::string& value)
{
+ assert(store_active);
++(shard_stats.io_num);
++(shard_stats.pending_io_num);
// For TM::submit_transaction()
void SeaStore::Shard::init_managers()
{
+ assert(store_active);
LOG_PREFIX(SeaStore::init_managers);
DEBUG("start");
transaction_manager.reset();
shard_stats = {};
transaction_manager = make_transaction_manager(
- device, secondaries, shard_stats, is_test);
+ device, secondaries, shard_stats, store_index, is_test);
collection_manager = std::make_unique<collection_manager::FlatCollectionManager>(
*transaction_manager);
onode_manager = std::make_unique<crimson::os::seastore::onode::FLTreeOnodeManager>(
double SeaStore::Shard::reset_report_interval() const
{
+ if(!store_active) {
+ return 0;
+ }
double seconds;
auto now = seastar::lowres_clock::now();
if (last_tp == seastar::lowres_clock::time_point::min()) {
device_stats_t SeaStore::Shard::get_device_stats(
bool report_detail, double seconds) const
{
+ if (!store_active) {
+ return device_stats_t();
+ }
return transaction_manager->get_device_stats(report_detail, seconds);
}
shard_stats_t SeaStore::Shard::get_io_stats(
bool report_detail, double seconds) const
{
+ if (!store_active) {
+ return shard_stats_t();
+ }
shard_stats_t ret = shard_stats;
ret.minus(last_shard_stats);
cache_stats_t SeaStore::Shard::get_cache_stats(
bool report_detail, double seconds) const
{
+ if (!store_active) {
+ return cache_stats_t();
+ }
return transaction_manager->get_cache_stats(
report_detail, seconds);
}
omap_root_t&& root,
std::string_view key) const
{
+ assert(store_active);
return seastar::do_with(
BtreeOMapManager(*transaction_manager),
std::move(root),
omap_root_t&& root,
const omap_keys_t& keys) const
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::omaptree_get_values);
auto type = root.get_type();
if (root.is_null()) {
const std::optional<std::string>& start,
OMapManager::omap_list_config_t config) const
{
+ assert(store_active);
if (root.is_null()) {
return seastar::make_ready_future<omaptree_list_bare_ret>(
true, omap_values_t{}
omap_root_t&& root,
const std::optional<std::string>& start) const
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::omaptree_get_values);
auto type = root.get_type();
DEBUGT("{} start={} ...", t, type, start.has_value() ? *start : "");
Transaction& t,
omap_root_t&& root)
{
+ assert(store_active);
assert(!root.is_null());
return seastar::do_with(
BtreeOMapManager(*transaction_manager),
Transaction& t,
omap_root_t&& root)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::omaptree_clear_no_onode);
if (root.is_null()) {
DEBUGT("{}, null root", t, root.get_type());
omap_root_t&& root,
Onode& onode)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::omaptree_clear);
if (root.is_null()) {
DEBUGT("{}, null root", t, root.get_type());
Onode& onode,
Onode& d_onode)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::omaptree_clone);
DEBUGT("{} start, list ...", t, type);
return trans_intr::repeat([&t, &onode, &d_onode, this, type, FNAME] {
Onode& onode,
std::map<std::string, ceph::bufferlist>&& kvs)
{
+ assert(store_active);
return seastar::do_with(
BtreeOMapManager(*transaction_manager),
std::move(root),
Onode& onode,
omap_keys_t&& keys)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::omaptree_rm_keys);
auto type = root.get_type();
if (root.is_null()) {
std::string first,
std::string last)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::omaptree_rm_keyrange);
auto type = root.get_type();
if (first > last) {
Onode& onode,
std::string&& name)
{
+ assert(store_active);
LOG_PREFIX(SeaStoreS::omaptree_rm_key);
if (root.is_null()) {
DEBUGT("{} key={}, null root", t, root.get_type(), name);
#include <sys/mman.h>
#include <string.h>
+#include <boost/range/irange.hpp>
+
#include <fmt/format.h>
#include <seastar/core/metrics.hh>
{
}
+seastar::future<> BlockSegmentManager::start(unsigned int shard_nums)
+{
+ LOG_PREFIX(BlockSegmentManager::start);
+ device_shard_nums = shard_nums;
+ auto num_shard_services = (device_shard_nums + seastar::smp::count - 1 ) / seastar::smp::count;
+ INFO("device_shard_nums={} seastar::smp={}, num_shard_services={}", device_shard_nums, seastar::smp::count, num_shard_services);
+ return shard_devices.start(num_shard_services, device_path, superblock.config.spec.dtype);
+
+}
+
+seastar::future<> BlockSegmentManager::stop()
+{
+ return shard_devices.stop();
+}
+
+Device& BlockSegmentManager::get_sharded_device(unsigned int store_index)
+{
+ assert(store_index < shard_devices.local().mshard_devices.size());
+ return *shard_devices.local().mshard_devices[store_index];
+}
+
+SegmentManager::read_ertr::future<unsigned int> BlockSegmentManager::get_shard_nums()
+{
+ return open_device(
+ device_path
+ ).safe_then([this](auto p) {
+ device = std::move(p.first);
+ auto sd = p.second;
+ return read_superblock(device, sd);
+ }).safe_then([](auto sb) {
+ return read_ertr::make_ready_future<unsigned int>(sb.shard_num);
+ }).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in BlockSegmentManager::get_shard_nums"
+ }
+ );
+}
+
BlockSegmentManager::mount_ret BlockSegmentManager::mount()
{
return shard_devices.invoke_on_all([](auto &local_device) {
- return local_device.shard_mount(
- ).handle_error(
- crimson::ct_error::assert_all{
- "Invalid error in BlockSegmentManager::mount"
+ return seastar::do_for_each(local_device.mshard_devices, [](auto& mshard_device) {
+ return mshard_device->shard_mount(
+ ).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in BlockSegmentManager::mount"
+ });
});
});
}
device = std::move(p.first);
auto sd = p.second;
return read_superblock(device, sd);
- }).safe_then([=, this](auto sb) {
+ }).safe_then([=, this](auto sb) ->mount_ertr::future<> {
set_device_id(sb.config.spec.id);
- shard_info = sb.shard_infos[seastar::this_shard_id()];
+ if(seastar::this_shard_id() + seastar::smp::count * store_index >= sb.shard_num) {
+ INFO("{} shard_id {} out of range {}",
+ device_id_printer_t{get_device_id()},
+ seastar::this_shard_id() + seastar::smp::count * store_index,
+ sb.shard_num);
+ store_active = false;
+ return mount_ertr::now();
+ }
+ shard_info = sb.shard_infos[seastar::this_shard_id() + seastar::smp::count * store_index];
INFO("{} read {}", device_id_printer_t{get_device_id()}, shard_info);
sb.validate();
superblock = sb;
});
}).safe_then([this, FNAME] {
INFO("{} complete", device_id_printer_t{get_device_id()});
- register_metrics();
+ register_metrics(store_index);
});
}
BlockSegmentManager::mkfs_ret BlockSegmentManager::mkfs(
device_config_t sm_config)
{
- return shard_devices.local().primary_mkfs(sm_config
+ return shard_devices.local().mshard_devices[0]->primary_mkfs(sm_config
).safe_then([this] {
return shard_devices.invoke_on_all([](auto &local_device) {
- return local_device.shard_mkfs(
- ).handle_error(
- crimson::ct_error::assert_all{
- "Invalid error in BlockSegmentManager::mkfs"
+ return seastar::do_for_each(local_device.mshard_devices, [](auto& mshard_device) {
+ return mshard_device->shard_mkfs(
+ ).handle_error(
+ crimson::ct_error::assert_all{
+ "Invalid error in BlockSegmentManager::mkfs"
+ });
});
});
});
out);
}
-void BlockSegmentManager::register_metrics()
+void BlockSegmentManager::register_metrics(unsigned int store_index)
{
LOG_PREFIX(BlockSegmentManager::register_metrics);
+ if (!store_active) {
+ INFO("{} shard {} is not active, skip registering metrics",
+ device_id_printer_t{get_device_id()}, store_index);
+ return;
+ }
+
DEBUG("{}", device_id_printer_t{get_device_id()});
namespace sm = seastar::metrics;
std::vector<sm::label_instance> label_instances;
label_instances.push_back(sm::label_instance("device_id", get_device_id()));
+ label_instances.push_back(
+ sm::label_instance("shard_device_index", std::to_string(store_index)));
stats.reset();
+
metrics.add_group(
"segment_manager",
{
"data_read_num",
stats.data_read.num,
sm::description("total number of data read"),
- label_instances
+ label_instances
),
sm::make_counter(
"data_read_bytes",
stats.data_read.bytes,
sm::description("total bytes of data read"),
- label_instances
+ label_instances
),
sm::make_counter(
"data_write_num",
stats.data_write.num,
sm::description("total number of data write"),
- label_instances
+ label_instances
),
sm::make_counter(
"data_write_bytes",
stats.data_write.bytes,
sm::description("total bytes of data write"),
- label_instances
+ label_instances
),
sm::make_counter(
"metadata_write_num",
stats.metadata_write.num,
sm::description("total number of metadata write"),
- label_instances
+ label_instances
),
sm::make_counter(
"metadata_write_bytes",
stats.metadata_write.bytes,
sm::description("total bytes of metadata write"),
- label_instances
+ label_instances
),
sm::make_counter(
"opened_segments",
stats.opened_segments,
sm::description("total segments opened"),
- label_instances
+ label_instances
),
sm::make_counter(
"closed_segments",
stats.closed_segments,
sm::description("total segments closed"),
- label_instances
+ label_instances
),
sm::make_counter(
"closed_segments_unused_bytes",
stats.closed_segments_unused_bytes,
sm::description("total unused bytes of closed segments"),
- label_instances
+ label_instances
),
sm::make_counter(
"released_segments",
stats.released_segments,
sm::description("total segments released"),
- label_instances
+ label_instances
),
}
);