case need remote store calling.
Signed-off-by: chunmei liu <chunmei.liu@ibm.com>
logger().info("error during data error injection: {}", e.what());
co_return tell_result_t(-EINVAL, e.what());
}
- co_await shard_services.get_store().inject_data_error(obj);
+ co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::inject_data_error>(
+ shard_services.get_store(DEFAULT_STORE_INDEX),
+ obj);
logger().info("successfully injected data error for obj={}", obj);
ceph::bufferlist bl;
bl.append("ok"sv);
logger().info("error during metadata error injection: {}", e.what());
co_return tell_result_t(-EINVAL, e.what());
}
- co_await shard_services.get_store().inject_mdata_error(obj);
+ co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::inject_mdata_error>(
+ shard_services.get_store(DEFAULT_STORE_INDEX),
+ obj);
logger().info("successfully injected metadata error for obj={}", obj);
ceph::bufferlist bl;
bl.append("ok"sv);
using core_id_t = seastar::shard_id;
static constexpr core_id_t NULL_CORE = std::numeric_limits<core_id_t>::max();
static constexpr unsigned int NULL_STORE_INDEX = std::numeric_limits<unsigned int>::max();
+static constexpr unsigned int DEFAULT_STORE_INDEX = 0;
/**
* submit_to
*
ECBackend::ECBackend(shard_id_t shard,
ECBackend::CollectionRef coll,
crimson::osd::ShardServices& shard_services,
+ unsigned int store_index,
const ec_profile_t&,
uint64_t,
DoutPrefixProvider &dpp)
- : PGBackend{shard, coll, shard_services, dpp}
+ : PGBackend{shard, coll, shard_services, store_index, dpp}
{
// todo
}
ECBackend(shard_id_t shard,
CollectionRef coll,
crimson::osd::ShardServices& shard_services,
+ unsigned int store_index,
const ec_profile_t& ec_profile,
uint64_t stripe_width,
DoutPrefixProvider &dpp);
seastar::future<bufferlist> OSDMeta::load_map(epoch_t e)
{
- return store.read(coll,
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+ store, coll,
osdmap_oid(e), 0, 0,
CEPH_OSD_OP_FLAG_FADVISE_WILLNEED).handle_error(
read_errorator::assert_all_func([e](const auto&) {
read_errorator::future<ceph::bufferlist> OSDMeta::load_inc_map(epoch_t e)
{
- return store.read(coll,
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+ store, coll,
inc_osdmap_oid(e), 0, 0,
CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
}
{
LOG_PREFIX(OSDMeta::load_superblock);
DEBUG("");
- return store.read(
- coll, superblock_oid(), 0, 0
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+ store,
+ coll, superblock_oid(), 0, 0, 0
).safe_then([FNAME] (bufferlist&& bl) {
DEBUG("successfully read superblock");
auto p = bl.cbegin();
std::string,
OSDMeta::ec_profile_t>>
OSDMeta::load_final_pool_info(int64_t pool) {
- return store.read(coll, final_pool_info_oid(pool),
- 0, 0).safe_then([] (bufferlist&& bl) {
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+ store, coll, final_pool_info_oid(pool),
+ 0, 0, 0).safe_then([] (bufferlist&& bl) {
auto p = bl.cbegin();
pg_pool_t pi;
string name;
class OSDMeta {
template<typename T> using Ref = boost::intrusive_ptr<T>;
- crimson::os::FuturizedStore::Shard& store;
+ crimson::os::FuturizedStore::StoreShardRef store;
Ref<crimson::os::FuturizedCollection> coll;
public:
OSDMeta(Ref<crimson::os::FuturizedCollection> coll,
- crimson::os::FuturizedStore::Shard& store)
+ crimson::os::FuturizedStore::StoreShardRef store)
: store{store}, coll{coll}
{}
DEBUGDPP("obj: {}", pg, obj);
auto &entry = ret.objects[obj.hobj];
return interruptor::make_interruptible(
- pg.shard_services.get_store().stat(
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::stat>(
+ pg.shard_services.get_store(pg.get_store_index()),
pg.get_collection_ref(),
- obj)
+ obj,
+ 0)
).then_interruptible([FNAME, &pg, &obj, &entry](struct stat obj_stat) {
DEBUGDPP("obj: {}, stat complete, size {}", pg, obj, obj_stat.st_size);
entry.size = obj_stat.st_size;
- return pg.shard_services.get_store().get_attrs(
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>(
+ pg.shard_services.get_store(pg.get_store_index()),
pg.get_collection_ref(),
- obj);
+ obj,
+ 0);
}).safe_then_interruptible([FNAME, &pg, &obj, &entry](auto &&attrs) {
DEBUGDPP("obj: {}, got {} attrs", pg, obj, attrs.size());
for (auto &i : attrs) {
pg, *this, obj, progress);
const auto stride = local_conf().get_val<Option::size_t>(
"osd_deep_scrub_stride");
- return pg.shard_services.get_store().read(
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+ pg.shard_services.get_store(pg.get_store_index()),
pg.get_collection_ref(),
obj,
*(progress.offset),
- stride
+ stride,
+ 0
).safe_then([this, FNAME, stride, &obj, &progress, &entry, &pg](auto bl) {
size_t offset = *progress.offset;
DEBUGDPP("op: {}, obj: {}, progress: {} got offset {}",
{
DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header",
pg, *this, obj, progress);
- return pg.shard_services.get_store().omap_get_header(
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_header>(
+ pg.shard_services.get_store(pg.get_store_index()),
pg.get_collection_ref(),
- obj
+ obj,
+ 0
).safe_then([&progress](auto bl) {
progress.omap_hash << bl;
}).handle_error(
{
DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys",
pg, *this, obj, progress);
- return pg.shard_services.get_store().omap_iterate(
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
+ pg.shard_services.get_store(pg.get_store_index()),
pg.get_collection_ref(),
obj,
start_from,
- callback
+ callback,
+ 0
).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
assert(result == ObjectStore::omap_iter_ret_t::NEXT);
DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
PG::PG(
spg_t pgid,
pg_shard_t pg_shard,
+ unsigned int store_index,
crimson::os::CollectionRef coll_ref,
pg_pool_t&& pool,
std::string&& name,
: pgid{pgid},
pg_whoami{pg_shard},
coll_ref{coll_ref},
+ store_index{store_index},
pgmeta_oid{pgid.make_pgmeta_oid()},
osdmap_gate("PG::osdmap_gate"),
shard_services{shard_services},
*backend.get(),
*this},
osdriver(
- &shard_services.get_store(),
+ shard_services.get_store(store_index),
coll_ref,
pgid.make_snapmapper_oid()),
snap_mapper(
bool PG::try_flush_or_schedule_async() {
logger().debug("PG::try_flush_or_schedule_async: flush ...");
- (void)shard_services.get_store().flush(
- coll_ref
+ (void)crimson::os::with_store_do_transaction(
+ shard_services.get_store(store_index),
+ coll_ref, ceph::os::Transaction{}
).then(
[this, epoch=get_osdmap_epoch()]() {
- return shard_services.start_operation<LocalPeeringEvent>(
+ return shard_services.start_operation<LocalPeeringEvent>(
this,
pg_whoami,
pgid,
PeeringState::UnfoundRecovery());
}
}
- return get_shard_services().dispatch_context(get_collection_ref(), std::move(rctx));
+ return get_shard_services().dispatch_context(store_index, get_collection_ref(), std::move(rctx));
}
void PG::recheck_readable()
{
logger().info("removing pg {}", pgid);
auto fut = interruptor::make_interruptible(
- shard_services.get_store().list_objects(
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::list_objects>(
+ shard_services.get_store(store_index),
coll_ref,
_next,
ghobject_t::get_max(),
- local_conf()->osd_target_transaction_size));
+ local_conf()->osd_target_transaction_size,
+ 0));
auto [objs_to_rm, next] = fut.get();
if (objs_to_rm.empty()) {
t.remove(coll_ref->get_cid(), pgid.make_snapmapper_oid());
t.remove(coll_ref->get_cid(), pgmeta_oid);
t.remove_collection(coll_ref->get_cid());
- (void) shard_services.get_store().do_transaction(
- coll_ref, t.claim_and_reset()).then([this] {
+ (void) crimson::os::with_store_do_transaction(
+ shard_services.get_store(store_index),
+ coll_ref,
+ t.claim_and_reset()).then([this] {
return shard_services.remove_pg(pgid);
});
return {next, false};
ceph::os::Transaction t;
auto max_size = local_conf()->osd_target_transaction_size;
while(true) {
- auto [objs, next] = co_await shard_services.get_store().list_objects(
- coll_ref, _next, ghobject_t::get_max(), max_size);
+ auto [objs, next] = co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::list_objects>(
+ shard_services.get_store(store_index),
+ coll_ref, _next, ghobject_t::get_max(), max_size, 0);
if (objs.empty()) {
if (!t.empty()) {
- co_await shard_services.get_store().do_transaction(
+ co_await crimson::os::with_store_do_transaction(
+ shard_services.get_store(store_index),
coll_ref, std::move(t));
}
break;
}
_next = next;
if (t.get_num_ops() >= max_size) {
- co_await shard_services.get_store().do_transaction(
+ co_await crimson::os::with_store_do_transaction(
+ shard_services.get_store(store_index),
coll_ref, t.claim_and_reset());
}
}
role, newup, new_up_primary, newacting,
new_acting_primary, history, pi, t);
assert(coll_ref);
- return shard_services.get_store().exists(
- get_collection_ref(), pgid.make_snapmapper_oid()
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::exists>(
+ shard_services.get_store(store_index),
+ get_collection_ref(), pgid.make_snapmapper_oid(), 0
).safe_then([&t, this](bool existed) {
- if (!existed) {
- t.touch(coll_ref->get_cid(), pgid.make_snapmapper_oid());
- }
- },
- ::crimson::ct_error::assert_all{fmt::format(
- "{} {} unexpected eio", *this, __func__).c_str()}
- );
+ if (!existed) {
+ t.touch(coll_ref->get_cid(), pgid.make_snapmapper_oid());
+ }
+ },
+ ::crimson::ct_error::assert_all{fmt::format(
+ "{} {} unexpected eio", *this, __func__).c_str()}
+);
}
-seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
+seastar::future<> PG::read_state(crimson::os::FuturizedStore::StoreShardRef store)
{
if (__builtin_expect(stopping, false)) {
return seastar::make_exception_future<>(
crimson::common::system_shutdown_exception());
}
- return seastar::do_with(PGMeta(*store, pgid), [] (auto& pg_meta) {
+ return seastar::do_with(PGMeta(store, pgid), [] (auto& pg_meta) {
return pg_meta.load();
}).then([this, store](auto&& ret) {
auto [pg_info, past_intervals] = std::move(ret);
std::move(past_intervals),
[this, store] (PGLog &pglog) {
return pglog.read_log_and_missing_crimson(
- *store,
+ store,
coll_ref,
peering_state.get_info(),
pgmeta_oid);
return seastar::now();
}).then([this, store]() {
logger().debug("{} setting collection options", __func__);
- return store->set_collection_opts(
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::set_collection_opts>(
+ store,
coll_ref,
get_pgpool().info.opts);
});
void PG::init_collection_pool_opts()
{
- std::ignore = shard_services.get_store().set_collection_opts(coll_ref, get_pgpool().info.opts);
+ std::ignore = crimson::os::with_store<&crimson::os::FuturizedStore::Shard::set_collection_opts>(
+ shard_services.get_store(store_index), coll_ref, get_pgpool().info.opts);
}
void PG::on_pool_change()
}
co_await interruptor::make_interruptible(
- shard_services.get_store().do_transaction(
+ crimson::os::with_store_do_transaction(
+ shard_services.get_store(store_index),
get_collection_ref(), std::move(t)
));
DEBUGDPP("{} do_transaction", *this, *req);
auto commit_fut = interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll_ref, std::move(txn))
+ crimson::os::with_store_do_transaction(
+ shard_services.get_store(store_index),
+ coll_ref, std::move(txn))
);
const auto &lcod = peering_state.get_info().last_complete;
peering_state.append_log_entries_update_missing(
m->entries, t, op_trim_to, op_pg_committed_to);
- return interruptor::make_interruptible(shard_services.get_store().do_transaction(
- coll_ref, std::move(t))).then_interruptible(
+ return interruptor::make_interruptible(
+ crimson::os::with_store_do_transaction(
+ shard_services.get_store(store_index),
+ coll_ref, std::move(t))).then_interruptible(
[m, conn, lcod=peering_state.get_info().last_complete, this] {
if (!peering_state.pg_has_reset_since(m->get_epoch())) {
peering_state.update_last_complete_ondisk(lcod);
spg_t pgid;
pg_shard_t pg_whoami;
crimson::os::CollectionRef coll_ref;
+ unsigned int store_index;
ghobject_t pgmeta_oid;
seastar::timer<seastar::lowres_clock> check_readable_timer;
PG(spg_t pgid,
pg_shard_t pg_shard,
+ unsigned int store_index,
crimson::os::CollectionRef coll_ref,
pg_pool_t&& pool,
std::string&& name,
return pgid;
}
+ const unsigned int get_store_index() {
+ return store_index;
+ }
PGBackend& get_backend() {
return *backend;
}
std::swap(o, orderer);
return seastar::when_all(
shard_services.dispatch_context(
+ store_index,
get_collection_ref(),
std::move(rctx)),
shard_services.run_orderer(std::move(o))
PGPeeringEventRef on_commit) final {
LOG_PREFIX(PG::schedule_event_on_commit);
SUBDEBUGDPP(osd, "on_commit {}", *this, on_commit->get_desc());
+
t.register_on_commit(
make_lambda_context(
[this, on_commit=std::move(on_commit)](int) {
const PastIntervals& pim,
ceph::os::Transaction &t);
- seastar::future<> read_state(crimson::os::FuturizedStore::Shard* store);
+ seastar::future<> read_state(crimson::os::FuturizedStore::StoreShardRef store);
void do_peering_event(PGPeeringEvent& evt, PeeringCtx &rctx);
seed,
target);
init_pg_ondisk(t, child, pool);
- return shard_services.get_store().do_transaction(
+ return crimson::os::with_store_do_transaction(
+ shard_services.get_store(store_index),
coll_ref, std::move(t));
}
coll, shard_services,
dpp);
case pg_pool_t::TYPE_ERASURE:
- return std::make_unique<ECBackend>(pg_shard.shard, coll, shard_services,
+ return std::make_unique<ECBackend>( pg_shard.shard, coll, shard_services, pg.get_store_index(),
std::move(ec_profile),
pool.stripe_width,
dpp);
PGBackend::PGBackend(shard_id_t shard,
CollectionRef coll,
crimson::osd::ShardServices &shard_services,
+ unsigned int store_index,
DoutPrefixProvider &dpp)
: shard{shard},
coll{coll},
shard_services{shard_services},
dpp{dpp},
- store{&shard_services.get_store()}
+ store{shard_services.get_store(store_index)}
{}
PGBackend::load_metadata_iertr::future
<PGBackend::loaded_object_md_t::ref>
PGBackend::load_metadata(const hobject_t& oid)
{
- return interruptor::make_interruptible(store->get_attrs(
+ return interruptor::make_interruptible(
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>(
+ store,
coll,
- ghobject_t{oid, ghobject_t::NO_GEN, shard})).safe_then_interruptible(
+ ghobject_t{oid, ghobject_t::NO_GEN, shard}, 0)).safe_then_interruptible(
[oid](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t::ref>{
loaded_object_md_t::ref ret(new loaded_object_md_t());
if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
}
logger().trace("sparse_read: {} {}~{}",
os.oi.soid, (uint64_t)op.extent.offset, (uint64_t)op.extent.length);
- return interruptor::make_interruptible(store->fiemap(coll, ghobject_t{os.oi.soid},
- offset, adjusted_length)).safe_then_interruptible(
+
+ return interruptor::make_interruptible(
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::fiemap>(
+ store, coll, ghobject_t{os.oi.soid},
+ static_cast<uint64_t>(offset),
+ static_cast<uint64_t>(adjusted_length),static_cast<uint32_t>(0))).safe_then_interruptible(
[&delta_stats, &os, &osd_op, this](auto&& m) {
return seastar::do_with(interval_set<uint64_t>{std::move(m)},
[&delta_stats, &os, &osd_op, this](auto&& extents) {
- return interruptor::make_interruptible(store->readv(coll, ghobject_t{os.oi.soid},
- extents, osd_op.op.flags)).safe_then_interruptible_tuple(
+ return interruptor::make_interruptible(
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::readv>(
+ store, coll, ghobject_t{os.oi.soid},
+ std::ref(extents), osd_op.op.flags)).safe_then_interruptible_tuple(
[&delta_stats, &os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> {
if (_read_verify_data(os.oi, bl)) {
osd_op.op.extent.length = bl.length();
auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard};
auto gend = end.is_max() ? ghobject_t::get_max() : ghobject_t{end, 0, shard};
auto [gobjects, next] = co_await interruptor::make_interruptible(
- store->list_objects(coll, gstart, gend, limit));
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::list_objects>(
+ store, coll, gstart, gend, limit, 0));
std::vector<hobject_t> objects;
boost::copy(
osd_op.op.xattr.value_len > local_conf()->osd_max_attr_size) {
return crimson::ct_error::file_too_large::make();
}
+ return crimson::os::with_store<
+ &crimson::os::FuturizedStore::Shard::get_max_attr_name_length
+ >(store).then([this, &os, &osd_op, &txn, &delta_stats](unsigned store_max_name_len) {
+ const auto max_name_len = std::min<uint64_t>(
+ store_max_name_len, local_conf()->osd_max_attr_name_len);
+ if (osd_op.op.xattr.name_len > max_name_len) {
+ return setxattr_ierrorator::future<>(crimson::ct_error::enametoolong::make());
+ }
- const auto max_name_len = std::min<uint64_t>(
- store->get_max_attr_name_length(), local_conf()->osd_max_attr_name_len);
- if (osd_op.op.xattr.name_len > max_name_len) {
- return crimson::ct_error::enametoolong::make();
- }
+ maybe_create_new_object(os, txn, delta_stats);
- maybe_create_new_object(os, txn, delta_stats);
-
- std::string name{"_"};
- ceph::bufferlist val;
- {
- auto bp = osd_op.indata.cbegin();
- bp.copy(osd_op.op.xattr.name_len, name);
- bp.copy(osd_op.op.xattr.value_len, val);
- }
- logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name);
- txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val);
- delta_stats.num_wr++;
- return seastar::now();
+ std::string name{"_"};
+ ceph::bufferlist val;
+ {
+ auto bp = osd_op.indata.cbegin();
+ bp.copy(osd_op.op.xattr.name_len, name);
+ bp.copy(osd_op.op.xattr.value_len, val);
+ }
+ logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name);
+ txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val);
+ delta_stats.num_wr++;
+ return setxattr_ierrorator::future<>(seastar::now());
+ });
}
PGBackend::get_attr_ierrorator::future<> PGBackend::getxattr(
const hobject_t& soid,
std::string_view key) const
{
- return store->get_attr(coll, ghobject_t{soid}, key);
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attr>(
+ store, coll, ghobject_t{soid}, key, 0);
}
PGBackend::get_attr_ierrorator::future<ceph::bufferlist>
std::string&& key) const
{
return seastar::do_with(key, [this, &soid](auto &key) {
- return store->get_attr(coll, ghobject_t{soid}, key);
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attr>(
+ store, coll, ghobject_t{soid}, key, 0);
});
}
OSDOp& osd_op,
object_stat_sum_t& delta_stats) const
{
- return store->get_attrs(coll, ghobject_t{os.oi.soid}).safe_then(
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>(
+ store, coll, ghobject_t{os.oi.soid}, 0).safe_then(
[&delta_stats, &osd_op](auto&& attrs) {
std::vector<std::pair<std::string, bufferlist>> user_xattrs;
ceph::bufferlist bl;
get_omap_iertr::future<
crimson::os::FuturizedStore::Shard::omap_values_t>
maybe_get_omap_vals_by_keys(
- crimson::os::FuturizedStore::Shard* store,
+ crimson::os::FuturizedStore::StoreShardRef store,
const crimson::os::CollectionRef& coll,
const object_info_t& oi,
const std::set<std::string>& keys_to_get)
{
if (oi.is_omap()) {
- return store->omap_get_values(coll, ghobject_t{oi.soid}, keys_to_get);
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>(
+ store, coll, ghobject_t{oi.soid}, keys_to_get, 0);
} else {
return crimson::ct_error::enodata::make();
}
static
get_omap_iterate_ertr::future<ObjectStore::omap_iter_ret_t>
maybe_do_omap_iterate(
- crimson::os::FuturizedStore::Shard* store,
+ crimson::os::FuturizedStore::StoreShardRef store,
const crimson::os::CollectionRef& coll,
const object_info_t& oi,
ObjectStore::omap_iter_seek_t start_from,
omap_iterate_cb_t callback)
{
if (oi.is_omap()) {
- return store->omap_iterate(coll, ghobject_t{oi.soid}, start_from, callback);
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
+ store, coll, ghobject_t{oi.soid}, start_from, callback, 0);
} else {
return crimson::ct_error::enodata::make();
}
const ghobject_t& oid,
uint32_t op_flags) const
{
- return store->omap_get_header(c, oid, op_flags)
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_header>(
+ store, c, oid, op_flags)
.handle_error(
crimson::ct_error::enodata::handle([] {
return seastar::make_ready_future<bufferlist>();
for (auto &i: assertions) {
to_get.insert(i.first);
}
- return store->omap_get_values(coll, ghobject_t{os.oi.soid}, to_get)
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>(
+ store, coll, ghobject_t{os.oi.soid}, to_get, 0)
.safe_then([=, &osd_op] (auto&& out) -> omap_cmp_iertr::future<> {
osd_op.rval = 0;
return do_omap_val_cmp(out, assertions);
CollectionRef c,
const ghobject_t& oid) const
{
- return store->stat(c, oid);
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::stat>(
+ store, c, oid, 0);
}
PGBackend::read_errorator::future<std::map<uint64_t, uint64_t>>
uint64_t len,
uint32_t op_flags)
{
- return store->fiemap(c, oid, off, len);
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::fiemap>(
+ store, c, oid, off, len, 0);
}
PGBackend::write_iertr::future<> PGBackend::tmapput(
using rep_op_fut_t = interruptible_future<rep_op_ret_t>;
PGBackend(shard_id_t shard, CollectionRef coll,
crimson::osd::ShardServices &shard_services,
+ unsigned int store_index,
DoutPrefixProvider &dpp);
virtual ~PGBackend() = default;
static std::unique_ptr<PGBackend> create(pg_t pgid,
CollectionRef coll;
crimson::osd::ShardServices &shard_services;
DoutPrefixProvider &dpp; ///< provides log prefix context
- crimson::os::FuturizedStore::Shard* store;
+ crimson::os::FuturizedStore::StoreShardRef store;
virtual seastar::future<> request_committed(
const osd_reqid_t& reqid,
const eversion_t& at_version) = 0;
// easily skip them
using crimson::os::FuturizedStore;
-PGMeta::PGMeta(FuturizedStore::Shard& store, spg_t pgid)
+PGMeta::PGMeta(FuturizedStore::StoreShardRef store, spg_t pgid)
: store{store},
pgid{pgid}
{}
seastar::future<epoch_t> PGMeta::get_epoch()
{
- return store.open_collection(coll_t{pgid}).then([this](auto ch) {
- return store.omap_get_values(ch,
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::open_collection>(
+ store, coll_t{pgid}).then([this](auto ch) {
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>(
+ store, ch,
pgid.make_pgmeta_oid(),
- {string{infover_key},
- string{epoch_key}}).safe_then(
+ std::set<std::string>{
+ string{infover_key},
+ string{epoch_key}},
+ 0).safe_then(
[](auto&& values) {
{
// sanity check
seastar::future<std::tuple<pg_info_t, PastIntervals>> PGMeta::load()
{
- return store.open_collection(coll_t{pgid}).then([this](auto ch) {
- return store.omap_get_values(ch,
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::open_collection>(
+ store, coll_t{pgid}).then([this](auto ch) {
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>(
+ store, ch,
pgid.make_pgmeta_oid(),
- {string{infover_key},
+ std::set<std::string>{
+ string{infover_key},
string{info_key},
string{biginfo_key},
- string{fastinfo_key}});
+ string{fastinfo_key}},
+ 0);
}).safe_then([](auto&& values) {
{
// sanity check
/// PG related metadata
class PGMeta
{
- crimson::os::FuturizedStore::Shard& store;
+ crimson::os::FuturizedStore::StoreShardRef store;
const spg_t pgid;
public:
- PGMeta(crimson::os::FuturizedStore::Shard& store, spg_t pgid);
+ PGMeta(crimson::os::FuturizedStore::StoreShardRef store, spg_t pgid);
seastar::future<epoch_t> get_epoch();
seastar::future<std::tuple<pg_info_t, PastIntervals>> load();
};
m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
t);
DEBUGDPP("submitting transaction", pg);
- return shard_services.get_store().do_transaction(
+ return crimson::os::with_store_do_transaction(
+ shard_services.get_store(pg.get_store_index()),
pg.get_collection_ref(), std::move(t)).or_terminate();
}
}
DEBUGDPP("submitting transaction", pg);
co_await interruptor::make_interruptible(
- shard_services.get_store().do_transaction(
+ crimson::os::with_store_do_transaction(
+ shard_services.get_store(pg.get_store_index()),
pg.get_collection_ref(), std::move(t)).or_terminate());
}
RecoveryBackend(crimson::osd::PG& pg,
crimson::osd::ShardServices& shard_services,
crimson::os::CollectionRef coll,
+ unsigned int store_index,
PGBackend* backend)
: pg{pg},
shard_services{shard_services},
- store{&shard_services.get_store()},
+ store(shard_services.get_store(store_index)),
coll{coll},
backend{backend} {}
virtual ~RecoveryBackend() {}
protected:
crimson::osd::PG& pg;
crimson::osd::ShardServices& shard_services;
- crimson::os::FuturizedStore::Shard* store;
+ crimson::os::FuturizedStore::StoreShardRef store;
crimson::os::CollectionRef coll;
PGBackend* backend;
ReplicatedBackend::ReplicatedBackend(pg_t pgid,
pg_shard_t whoami,
- crimson::osd::PG& pg,
+ crimson::osd::PG& pg,
ReplicatedBackend::CollectionRef coll,
crimson::osd::ShardServices& shard_services,
DoutPrefixProvider &dpp)
- : PGBackend{whoami.shard, coll, shard_services, dpp},
+ : PGBackend{whoami.shard, coll, shard_services, pg.get_store_index(), dpp},
pgid{pgid},
whoami{whoami},
pg(pg),
const uint64_t len,
const uint32_t flags)
{
- return store->read(coll, ghobject_t{hoid}, off, len, flags);
+ return crimson::os::with_store<&crimson::os::FuturizedStore::Shard::read>(
+ store, coll, ghobject_t{hoid}, off, len, flags);
}
MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
false);
auto all_completed = interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll, std::move(txn))
+ crimson::os::with_store_do_transaction(
+ shard_services.get_store(pg.get_store_index()),
+ coll, std::move(txn))
).then_interruptible([FNAME, this,
peers=pending_txn->second.weak_from_this()] {
if (!peers) {
soid, _recovery_info, is_delete, t
).then_interruptible([FNAME, this, &t] {
DEBUGDPP("submitting transaction", pg);
- return shard_services.get_store().do_transaction(coll, std::move(t));
+ return crimson::os::with_store_do_transaction(
+ shard_services.get_store(pg.get_store_index()),
+ coll, std::move(t));
}).then_interruptible(
[this, epoch_frozen, last_complete = pg.get_info().last_complete] {
pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
}).then_interruptible(
[FNAME, this, &txn]() mutable {
DEBUGDPP("submitting transaction", pg);
- return shard_services.get_store().do_transaction(coll,
- std::move(txn));
+ return crimson::os::with_store_do_transaction(
+ shard_services.get_store(pg.get_store_index()),
+ coll,
+ std::move(txn));
});
});
}
return seastar::make_ready_future<bufferlist>();
})),
interruptor::make_interruptible(
- store->get_attrs(coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::get_attrs>(
+ store, coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
).handle_error_interruptible<false>(
crimson::os::FuturizedStore::Shard::get_attrs_ertr::all_same_way(
[FNAME, this, oid] (const std::error_code& e) {
// 3. read the truncated extents
// TODO: check if the returned extents are pruned
return interruptor::make_interruptible(
- store->readv(
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::readv>(
+ store,
coll,
ghobject_t{oid},
- push_op->data_included,
+ std::ref(push_op->data_included),
CEPH_OSD_OP_FLAG_FADVISE_DONTNEED));
}).safe_then_interruptible([push_op, range_end=copy_subset.range_end()](auto &&bl) {
push_op->data.claim_append(std::move(bl));
};
co_await interruptor::make_interruptible(
- shard_services.get_store().omap_iterate(
- coll, ghobject_t{oid}, start_from, callback
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
+ shard_services.get_store(pg.get_store_index()),
+ coll, ghobject_t{oid}, start_from, callback, 0
).safe_then([&new_progress](auto ret) {
if (ret == ObjectStore::omap_iter_ret_t::NEXT) {
new_progress.omap_complete = true;
);
DEBUGDPP("submitting transaction, complete", pg);
co_await interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll, std::move(t)));
+ crimson::os::with_store_do_transaction(
+ shard_services.get_store(pg.get_store_index()),
+ coll, std::move(t)));
} else {
response->soid = push_op.soid;
response->recovery_info = pull_info.recovery_info;
response->recovery_progress = pull_info.recovery_progress;
DEBUGDPP("submitting transaction, incomplete", pg);
co_await interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll, std::move(t)));
+ crimson::os::with_store_do_transaction(
+ shard_services.get_store(pg.get_store_index()),
+ coll, std::move(t)));
}
co_return complete;
false, t);
co_await interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll, std::move(t)));
+ crimson::os::with_store_do_transaction(
+ shard_services.get_store(pg.get_store_index()),
+ coll, std::move(t)));
replica_push_targets.erase(ptiter);
pg.get_recovery_handler()->_committed_pushed_object(
epoch_frozen, pg.get_info().last_complete);
} else {
co_await interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll, std::move(t)));
+ crimson::os::with_store_do_transaction(
+ shard_services.get_store(pg.get_store_index()), coll, std::move(t)));
}
auto reply = crimson::make_message<MOSDPGPushReply>();
// clone overlap content in local object if using a new object
auto st = co_await interruptor::make_interruptible(
- store->stat(coll, ghobject_t(recovery_info.soid)));
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::stat>(
+ store, coll, ghobject_t(recovery_info.soid), 0));
// TODO: pg num bytes counting
uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
crimson::osd::ShardServices& shard_services,
crimson::os::CollectionRef coll,
PGBackend* backend)
- : RecoveryBackend(pg, shard_services, coll, backend)
+ : RecoveryBackend(pg, shard_services, coll, pg.get_store_index(), backend)
{}
interruptible_future<> handle_recovery_op(
Ref<MOSDFastDispatchOp> m,
namespace {
struct FuturizedShardStoreLogReader {
- crimson::os::FuturizedStore::Shard &store;
+ crimson::os::FuturizedStore::StoreShardRef store;
const pg_info_t &info;
PGLog::IndexedLog &log;
std::set<std::string>* log_keys_debug = NULL;
return ObjectStore::omap_iter_ret_t::NEXT;
};
- co_await store.omap_iterate(
- ch, pgmeta_oid, start_from, callback
+ co_await crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
+ store,
+ ch, pgmeta_oid, start_from, callback, 0
).safe_then([] (auto ret) {
ceph_assert (ret == ObjectStore::omap_iter_ret_t::NEXT);
}).handle_error(
}
seastar::future<> PGLog::read_log_and_missing_crimson(
- crimson::os::FuturizedStore::Shard &store,
+ crimson::os::FuturizedStore::StoreShardRef store,
crimson::os::CollectionRef ch,
const pg_info_t &info,
IndexedLog &log,
#ifdef WITH_CRIMSON
seastar::future<> read_log_and_missing_crimson(
- crimson::os::FuturizedStore::Shard &store,
+ crimson::os::FuturizedStore::StoreShardRef store,
crimson::os::CollectionRef ch,
const pg_info_t &info,
ghobject_t pgmeta_oid
}
static seastar::future<> read_log_and_missing_crimson(
- crimson::os::FuturizedStore::Shard &store,
+ crimson::os::FuturizedStore::StoreShardRef store,
crimson::os::CollectionRef ch,
const pg_info_t &info,
IndexedLog &log,
LOG_PREFIX("OSDriver::get_keys");
DEBUG("");
using crimson::os::FuturizedStore;
- return interruptor::green_get(os->omap_get_values(
- ch, hoid, keys
+ return interruptor::green_get(
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_get_values>(
+ os,
+ ch, hoid, keys, 0
).safe_then([out] (FuturizedStore::Shard::omap_values_t&& vals) {
// just the difference in comparator (`std::less<>` in omap_values_t`)
reinterpret_cast<FuturizedStore::Shard::omap_values_t&>(*out) = std::move(vals);
}
};
return interruptor::green_get(
- os->omap_iterate(ch, hoid, start_from, callback
+ crimson::os::with_store<&crimson::os::FuturizedStore::Shard::omap_iterate>(
+ os, ch, hoid, start_from, callback, 0
).safe_then([FNAME, key] (auto ret) {
if (ret == ObjectStore::omap_iter_ret_t::NEXT) {
DEBUG("key {} no more values", key);
DEBUG("key {}", key);
using crimson::os::FuturizedStore;
// let's try to get current first
- return interruptor::green_get(os->omap_get_values(
- ch, hoid, FuturizedStore::Shard::omap_keys_t{key}
+ return interruptor::green_get(crimson::os::with_store<
+ &crimson::os::FuturizedStore::Shard::omap_get_values>(
+ os,
+ ch, hoid, FuturizedStore::Shard::omap_keys_t{key}, 0
).safe_then([FNAME, &key, next_or_current] (FuturizedStore::Shard::omap_values_t&& vals) {
DEBUG("returning {}", key);
ceph_assert(vals.size() == 1);
#ifdef WITH_CRIMSON
using ObjectStoreT = crimson::os::FuturizedStore::Shard;
using CollectionHandleT = ObjectStoreT::CollectionRef;
+
+ using ObjectStoreTLRef = seastar::shared_ptr<ObjectStoreT>;
+ using ObjectStoreTFRef = seastar::foreign_ptr<ObjectStoreTLRef>;
+ using ObjectStoreTRef = ::crimson::local_shared_foreign_ptr<ObjectStoreTLRef>;
#else
using ObjectStoreT = ObjectStore;
using CollectionHandleT = ObjectStoreT::CollectionHandle;
+ using ObjectStoreTRef = ObjectStoreT*;
#endif
- ObjectStoreT *os;
+ ObjectStoreTRef os;
CollectionHandleT ch;
ghobject_t hoid;
}
#ifndef WITH_CRIMSON
- OSDriver(ObjectStoreT *os, const coll_t& cid, const ghobject_t &hoid) :
+ OSDriver(ObjectStoreTRef os, const coll_t& cid, const ghobject_t &hoid) :
OSDriver(os, os->open_collection(cid), hoid) {}
#endif
- OSDriver(ObjectStoreT *os, CollectionHandleT ch, const ghobject_t &hoid) :
+ OSDriver(ObjectStoreTRef os, CollectionHandleT ch, const ghobject_t &hoid) :
os(os),
ch(ch),
hoid(hoid) {}