return seastar::make_ready_future<bufferlist>();
}
-ECBackend::interruptible_future<crimson::osd::acked_peers_t>
+ECBackend::rep_op_fut_t
ECBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
std::vector<pg_log_entry_t>&& log_entries)
{
// todo
- return seastar::make_ready_future<crimson::osd::acked_peers_t>();
+ return {seastar::now(),
+ seastar::make_ready_future<crimson::osd::acked_peers_t>()};
}
private:
ll_read_ierrorator::future<ceph::bufferlist>
_read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override;
- interruptible_future<crimson::osd::acked_peers_t>
+ rep_op_fut_t
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
interruptible_errorated_future<osd_op_errorator>
execute_op(OSDOp& osd_op);
+ using rep_op_fut_tuple =
+ std::tuple<interruptible_future<>, osd_op_ierrorator::future<>>;
+ using rep_op_fut_t =
+ interruptible_future<rep_op_fut_tuple>;
template <typename MutFunc>
- osd_op_ierrorator::future<> flush_changes_n_do_ops_effects(
+ rep_op_fut_t flush_changes_n_do_ops_effects(
Ref<PG> pg,
MutFunc&& mut_func) &&;
}
template <typename MutFunc>
-OpsExecuter::osd_op_ierrorator::future<>
+OpsExecuter::rep_op_fut_t
OpsExecuter::flush_changes_n_do_ops_effects(Ref<PG> pg, MutFunc&& mut_func) &&
{
const bool want_mutate = !txn.empty();
// osd_op_params are instantiated by every wr-like operation.
assert(osd_op_params || !want_mutate);
assert(obc);
- auto maybe_mutated = interruptor::make_interruptible(osd_op_errorator::now());
+ rep_op_fut_t maybe_mutated =
+ interruptor::make_ready_future<rep_op_fut_tuple>(
+ seastar::now(),
+ interruptor::make_interruptible(osd_op_errorator::now()));
if (want_mutate) {
osd_op_params->req_id = msg->get_reqid();
osd_op_params->mtime = msg->get_mtime();
- maybe_mutated = std::forward<MutFunc>(mut_func)(std::move(txn),
+ auto [submitted, all_completed] = std::forward<MutFunc>(mut_func)(std::move(txn),
std::move(obc),
std::move(*osd_op_params),
user_modify);
+ maybe_mutated = interruptor::make_ready_future<rep_op_fut_tuple>(
+ std::move(submitted),
+ osd_op_ierrorator::future<>(std::move(all_completed)));
}
if (__builtin_expect(op_effects.empty(), true)) {
return maybe_mutated;
} else {
- return maybe_mutated.safe_then_interruptible([pg=std::move(pg),
- this] () mutable {
- // let's do the cleaning of `op_effects` in destructor
- return interruptor::do_for_each(op_effects,
- [pg=std::move(pg)] (auto& op_effect) {
- return op_effect->execute(pg);
- });
+ return maybe_mutated.then_unpack_interruptible(
+ [this, pg=std::move(pg)](auto&& submitted, auto&& all_completed) mutable {
+ return interruptor::make_ready_future<rep_op_fut_tuple>(
+ std::move(submitted),
+ all_completed.safe_then_interruptible([this, pg=std::move(pg)] {
+ // let's do the cleaning of `op_effects` in destructor
+ return interruptor::do_for_each(op_effects,
+ [pg=std::move(pg)](auto& op_effect) {
+ return op_effect->execute(pg);
+ });
+ }));
});
}
}
return conn->send(std::move(reply));
}
}
- return pg->do_osd_ops(m, obc, op_info).safe_then_interruptible(
- [this](Ref<MOSDOpReply> reply) -> interruptible_future<> {
- return conn->send(std::move(reply));
+ return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible(
+ [this, pg](auto submitted, auto all_completed) mutable {
+ return submitted.then_interruptible(
+ [this, pg] {
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ handle.enter(pp(*pg).wait_repop));
+ }).then_interruptible(
+ [this, pg, all_completed=std::move(all_completed)]() mutable {
+ return all_completed.safe_then_interruptible(
+ [this, pg](Ref<MOSDOpReply> reply) {
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ handle.enter(pp(*pg).send_reply)).then_interruptible(
+ [this, reply=std::move(reply)] {
+ return conn->send(std::move(reply));
+ });
+ }, crimson::ct_error::eagain::handle([this, pg]() mutable {
+ return process_op(pg);
+ }));
+ });
}, crimson::ct_error::eagain::handle([this, pg]() mutable {
return process_op(pg);
}));
[] (const std::error_code& e) {
return PG::do_osd_ops_iertr::now();
}
- ).safe_then_interruptible(
- [] {
- return interruptor::now();
+ ).safe_then_unpack_interruptible(
+ [](auto submitted, auto all_completed) {
+ return all_completed.handle_error_interruptible(
+ crimson::ct_error::eagain::handle([] {
+ return seastar::now();
+ }));
}, crimson::ct_error::eagain::handle([] {
return interruptor::now();
})
return seastar::now();
}
-PG::interruptible_future<> PG::submit_transaction(
+std::tuple<PG::interruptible_future<>,
+ PG::interruptible_future<>>
+PG::submit_transaction(
const OpInfo& op_info,
const std::vector<OSDOp>& ops,
ObjectContextRef&& obc,
osd_op_params_t&& osd_op_p)
{
if (__builtin_expect(stopping, false)) {
- return seastar::make_exception_future<>(
- crimson::common::system_shutdown_exception());
+ return {seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception()),
+ seastar::now()};
}
epoch_t map_epoch = get_osdmap_epoch();
peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
txn, true, false);
- return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
- std::move(obc),
- std::move(txn),
- std::move(osd_op_p),
- peering_state.get_last_peering_reset(),
- map_epoch,
- std::move(log_entries)).then_interruptible(
+ auto [submitted, all_completed] = backend->mutate_object(
+ peering_state.get_acting_recovery_backfill(),
+ std::move(obc),
+ std::move(txn),
+ std::move(osd_op_p),
+ peering_state.get_last_peering_reset(),
+ map_epoch,
+ std::move(log_entries));
+ return std::make_tuple(std::move(submitted), all_completed.then_interruptible(
[this, last_complete=peering_state.get_info().last_complete,
at_version=osd_op_p.at_version](auto acked) {
for (const auto& peer : acked) {
}
peering_state.complete_write(at_version, last_complete);
return seastar::now();
- });
+ }));
}
void PG::fill_op_params_bump_pg_version(
}
template <class Ret, class SuccessFunc, class FailureFunc>
-PG::do_osd_ops_iertr::future<Ret> PG::do_osd_ops_execute(
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
+PG::do_osd_ops_execute(
OpsExecuter&& ox,
std::vector<OSDOp> ops,
const OpInfo &op_info,
return reload_obc(obc).handle_error_interruptible(
load_obc_ertr::assert_all{"can't live with object state messed up"});
});
+ auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
return interruptor::do_for_each(ops, [&ox](OSDOp& osd_op) {
logger().debug(
"do_osd_ops_execute: object {} - handling op {}",
std::move(txn),
std::move(osd_op_p));
});
- }).safe_then_interruptible_tuple([success_func=std::move(success_func)] {
- return std::move(success_func)();
- }, crimson::ct_error::object_corrupted::handle(
- [rollbacker, this] (const std::error_code& e) mutable {
- // this is a path for EIO. it's special because we want to fix the obejct
- // and try again. that is, the layer above `PG::do_osd_ops` is supposed to
- // restart the execution.
- return rollbacker.rollback_obc_if_modified(e).then_interruptible(
- [obc=rollbacker.get_obc(), this] {
- return repair_object(obc->obs.oi.soid,
- obc->obs.oi.version).then_interruptible([] {
- return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
- });
- });
- }), OpsExecuter::osd_op_errorator::all_same_way(
- [rollbacker, failure_func=std::move(failure_func)]
+ }).safe_then_unpack_interruptible(
+ [success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
+ (auto submitted_fut, auto all_completed_fut) mutable {
+ return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+ std::move(submitted_fut),
+ all_completed_fut.safe_then_interruptible_tuple(
+ std::move(success_func),
+ crimson::ct_error::object_corrupted::handle(
+ [rollbacker, this] (const std::error_code& e) mutable {
+ // this is a path for EIO. it's special because we want to fix the obejct
+ // and try again. that is, the layer above `PG::do_osd_ops` is supposed to
+ // restart the execution.
+ return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+ [obc=rollbacker.get_obc(), this] {
+ return repair_object(obc->obs.oi.soid,
+ obc->obs.oi.version).then_interruptible([] {
+ return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
+ });
+ });
+ }), OpsExecuter::osd_op_errorator::all_same_way(
+ [rollbacker, failure_func_ptr]
+ (const std::error_code& e) mutable {
+ return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+ [&e, failure_func_ptr] {
+ return (*failure_func_ptr)(e);
+ });
+ })
+ )
+ );
+ }, OpsExecuter::osd_op_errorator::all_same_way(
+ [rollbacker, failure_func_ptr]
(const std::error_code& e) mutable {
- return rollbacker.rollback_obc_if_modified(e).then_interruptible(
- [&e, failure_func=std::move(failure_func)] {
- return std::move(failure_func)(e);
- });
+ return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+ seastar::now(),
+ rollbacker.rollback_obc_if_modified(e).then_interruptible(
+ [&e, failure_func_ptr] {
+ return (*failure_func_ptr)(e);
+ }));
}));
}
-PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>>
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ref<MOSDOpReply>>>
PG::do_osd_ops(
Ref<MOSDOp> m,
ObjectContextRef obc,
).finally([ox_deleter=std::move(ox)] {});
}
-PG::do_osd_ops_iertr::future<>
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
PG::do_osd_ops(
ObjectContextRef obc,
std::vector<OSDOp> ops,
::crimson::interruptible::interruptible_errorator<
::crimson::osd::IOInterruptCondition,
::crimson::errorator<crimson::ct_error::eagain>>;
- do_osd_ops_iertr::future<Ref<MOSDOpReply>> do_osd_ops(
+ template <typename Ret = void>
+ using pg_rep_op_fut_t =
+ std::tuple<interruptible_future<>,
+ do_osd_ops_iertr::future<Ret>>;
+ do_osd_ops_iertr::future<pg_rep_op_fut_t<Ref<MOSDOpReply>>> do_osd_ops(
Ref<MOSDOp> m,
ObjectContextRef obc,
const OpInfo &op_info);
using do_osd_ops_failure_func_t =
std::function<do_osd_ops_iertr::future<>(const std::error_code&)>;
struct do_osd_ops_params_t;
- do_osd_ops_iertr::future<> do_osd_ops(
+ do_osd_ops_iertr::future<pg_rep_op_fut_t<>> do_osd_ops(
ObjectContextRef obc,
std::vector<OSDOp> ops,
const OpInfo &op_info,
do_osd_ops_success_func_t success_func,
do_osd_ops_failure_func_t failure_func);
template <class Ret, class SuccessFunc, class FailureFunc>
- do_osd_ops_iertr::future<Ret> do_osd_ops_execute(
+ do_osd_ops_iertr::future<pg_rep_op_fut_t<Ret>> do_osd_ops_execute(
OpsExecuter&& ox,
std::vector<OSDOp> ops,
const OpInfo &op_info,
SuccessFunc&& success_func,
FailureFunc&& failure_func);
interruptible_future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
- interruptible_future<> submit_transaction(
+ std::tuple<interruptible_future<>, interruptible_future<>>
+ submit_transaction(
const OpInfo& op_info,
const std::vector<OSDOp>& ops,
ObjectContextRef&& obc,
}));
}
-PGBackend::interruptible_future<crimson::osd::acked_peers_t>
+PGBackend::rep_op_fut_t
PGBackend::mutate_object(
std::set<pg_shard_t> pg_shards,
crimson::osd::ObjectContextRef &&obc,
using interruptible_future =
::crimson::interruptible::interruptible_future<
::crimson::osd::IOInterruptCondition, T>;
+ using rep_op_fut_t =
+ std::tuple<interruptible_future<>,
+ interruptible_future<crimson::osd::acked_peers_t>>;
PGBackend(shard_id_t shard, CollectionRef coll, crimson::os::FuturizedStore* store);
virtual ~PGBackend() = default;
static std::unique_ptr<PGBackend> create(pg_t pgid,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
osd_op_params_t& osd_op_params);
- interruptible_future<crimson::osd::acked_peers_t> mutate_object(
+ rep_op_fut_t mutate_object(
std::set<pg_shard_t> pg_shards,
crimson::osd::ObjectContextRef &&obc,
ceph::os::Transaction&& txn,
uint32_t flags) = 0;
bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn);
- virtual interruptible_future<crimson::osd::acked_peers_t>
+ virtual rep_op_fut_t
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
t);
return shard_services.get_store().do_transaction(
- pg.get_collection_ref(), std::move(t)
- ).or_terminate();
+ pg.get_collection_ref(), std::move(t)).or_terminate();
}
RecoveryBackend::interruptible_future<>
ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
}
return shard_services.get_store().do_transaction(
- pg.get_collection_ref(), std::move(t)
- ).or_terminate();
+ pg.get_collection_ref(), std::move(t)).or_terminate();
}
RecoveryBackend::interruptible_future<BackfillInterval>
return store->read(coll, ghobject_t{hoid}, off, len, flags);
}
-ReplicatedBackend::interruptible_future<crimson::osd::acked_peers_t>
+ReplicatedBackend::rep_op_fut_t
ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,
ceph::os::Transaction&& txn,
bufferlist encoded_txn;
encode(txn, encoded_txn);
- return interruptor::parallel_for_each(std::move(pg_shards),
- [=, encoded_txn=std::move(encoded_txn), txn=std::move(txn)]
- (auto pg_shard) mutable {
- if (pg_shard == whoami) {
- return shard_services.get_store().do_transaction(coll,std::move(txn));
- } else {
- auto m = crimson::net::make_message<MOSDRepOp>(
- osd_op_p.req_id,
- whoami,
- spg_t{pgid, pg_shard.shard},
- hoid,
- CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
- map_epoch,
- min_epoch,
- tid,
- osd_op_p.at_version);
- m->set_data(encoded_txn);
- pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
- encode(log_entries, m->logbl);
- m->pg_trim_to = osd_op_p.pg_trim_to;
- m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk;
- m->set_rollback_to(osd_op_p.at_version);
- // TODO: set more stuff. e.g., pg_states
- return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
- }
- }).then_interruptible([this, peers=pending_txn->second.weak_from_this()] {
- if (!peers) {
- // for now, only actingset_changed can cause peers
- // to be nullptr
- assert(peering);
- throw crimson::common::actingset_changed(peering->is_primary);
- }
- if (--peers->pending == 0) {
- peers->all_committed.set_value();
- peers->all_committed = {};
- return seastar::now();
- }
- return peers->all_committed.get_shared_future();
- }).then_interruptible([pending_txn, this] {
- auto acked_peers = std::move(pending_txn->second.acked_peers);
- pending_trans.erase(pending_txn);
- return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
- });
+ auto all_completed = interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(coll, std::move(txn)))
+ .then_interruptible([this, peers=pending_txn->second.weak_from_this()] {
+ if (!peers) {
+ // for now, only actingset_changed can cause peers
+ // to be nullptr
+ assert(peering);
+ throw crimson::common::actingset_changed(peering->is_primary);
+ }
+ if (--peers->pending == 0) {
+ peers->all_committed.set_value();
+ peers->all_committed = {};
+ return seastar::now();
+ }
+ return peers->all_committed.get_shared_future();
+ }).then_interruptible([pending_txn, this] {
+ auto acked_peers = std::move(pending_txn->second.acked_peers);
+ pending_trans.erase(pending_txn);
+ return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
+ });
+
+ for (auto pg_shard : pg_shards) {
+ if (pg_shard != whoami) {
+ auto m = crimson::net::make_message<MOSDRepOp>(
+ osd_op_p.req_id,
+ whoami,
+ spg_t{pgid, pg_shard.shard},
+ hoid,
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
+ map_epoch,
+ min_epoch,
+ tid,
+ osd_op_p.at_version);
+ m->set_data(encoded_txn);
+ pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
+ encode(log_entries, m->logbl);
+ m->pg_trim_to = osd_op_p.pg_trim_to;
+ m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk;
+ m->set_rollback_to(osd_op_p.at_version);
+ // TODO: set more stuff. e.g., pg_states
+ (void) shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
+ }
+ }
+ return {seastar::now(), std::move(all_completed)};
}
void ReplicatedBackend::on_actingset_changed(peering_info_t pi)
ll_read_ierrorator::future<ceph::bufferlist>
_read(const hobject_t& hoid, uint64_t off,
uint64_t len, uint32_t flags) override;
- interruptible_future<crimson::osd::acked_peers_t>
- _submit_transaction(std::set<pg_shard_t>&& pg_shards,
- const hobject_t& hoid,
- ceph::os::Transaction&& txn,
- osd_op_params_t&& osd_op_p,
- epoch_t min_epoch, epoch_t max_epoch,
- std::vector<pg_log_entry_t>&& log_entries) final;
+ rep_op_fut_t _submit_transaction(std::set<pg_shard_t>&& pg_shards,
+ const hobject_t& hoid,
+ ceph::os::Transaction&& txn,
+ osd_op_params_t&& osd_op_p,
+ epoch_t min_epoch, epoch_t max_epoch,
+ std::vector<pg_log_entry_t>&& log_entries) final;
const pg_t pgid;
const pg_shard_t whoami;
crimson::osd::ShardServices& shard_services;