return std::move(fut);
}
-PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>>
-PG::do_osd_ops(
+template <class Ret, class SuccessFunc, class FailureFunc>
+PG::do_osd_ops_iertr::future<Ret> PG::do_osd_ops_execute(
+ OpsExecuter&& ox,
+ std::vector<OSDOp> ops,
Ref<MOSDOp> m,
ObjectContextRef obc,
- const OpInfo &op_info)
+ const OpInfo &op_info,
+ SuccessFunc&& success_func,
+ FailureFunc&& failure_func)
{
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
-
- using osd_op_ierrorator = OpsExecuter::osd_op_ierrorator;
- using osd_op_errorator = OpsExecuter::osd_op_errorator;
- const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
- : m->get_hobj();
- auto ox = std::make_unique<OpsExecuter>(
- obc, op_info, get_pool().info, get_backend(), *m);
- return interruptor::do_for_each(
- m->ops.begin(), m->ops.end(), [m, ox = ox.get()](OSDOp& osd_op) {
+ return interruptor::do_for_each(ops, [m, &ox](OSDOp& osd_op) {
logger().debug(
- "do_osd_ops: {} - object {} - handling op {}",
+ "do_osd_ops_execute: {} - object {} - handling op {}",
*m,
- ox->get_target(),
+ ox.get_target(),
ceph_osd_op_name(osd_op.op.op));
- return ox->execute_op(osd_op);
- }).safe_then_interruptible([this, m, ox = ox.get(), &op_info] {
+ return ox.execute_op(osd_op);
+ }).safe_then_interruptible([this, m, &ox, &op_info] {
logger().debug(
- "do_osd_ops: {} - object {} all operations successful",
+ "do_osd_ops_execute: {} - object {} all operations successful",
*m,
- ox->get_target());
- return std::move(*ox).flush_changes_n_do_ops_effects(
+ ox.get_target());
+ return std::move(ox).flush_changes_n_do_ops_effects(
Ref<PG>{this},
[this, m, &op_info] (auto&& txn,
auto&& obc,
auto&& osd_op_p,
- bool user_modify) -> osd_op_ierrorator::future<> {
+ bool user_modify) {
logger().debug(
- "do_osd_ops: {} - object {} submitting txn",
+ "do_osd_ops_execute: {} - object {} submitting txn",
*m,
obc->get_oid());
fill_op_params_bump_pg_version(osd_op_p, std::move(m), user_modify);
std::move(obc),
std::move(txn),
std::move(osd_op_p));
- });
- }).safe_then_interruptible_tuple([this, m, obc, rvec = op_info.allows_returnvec()]()
- -> PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>> {
- // TODO: should stop at the first op which returns a negative retval,
- // cmpext uses it for returning the index of first unmatched byte
- int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
- if (result > 0 && !rvec) {
- result = 0;
- }
- auto reply = make_message<MOSDOpReply>(m.get(),
- result,
- get_osdmap_epoch(),
- 0,
- false);
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- logger().debug(
- "do_osd_ops: {} - object {} sending reply",
- *m,
- obc->obs.oi.soid);
- return PG::do_osd_ops_ertr::make_ready_future<Ref<MOSDOpReply>>(
- std::move(reply));
+ });
+ }).safe_then_interruptible_tuple([success_func=std::move(success_func)] {
+ return std::move(success_func)();
}, crimson::ct_error::object_corrupted::handle([m, obc, this] {
- return repair_object(m, obc->obs.oi.soid, obc->obs.oi.version)
- .then_interruptible([] {
- return PG::do_osd_ops_ertr::future<Ref<MOSDOpReply>>(
- crimson::ct_error::eagain::make());
+ return repair_object(m, obc->obs.oi.soid, obc->obs.oi.version).then_interruptible([] {
+ return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
});
- }), osd_op_errorator::all_same_way([ox = std::move(ox),
- m,
- obc,
- this] (const std::error_code& e) {
- return handle_failed_op(e, std::move(obc), *ox, *m);
+ }), OpsExecuter::osd_op_errorator::all_same_way(
+ [&ox, m, obc, failure_func=std::move(failure_func), this] (const std::error_code& e) mutable {
+ const bool need_reload_obc = ox.has_seen_write();
+ logger().debug(
+ "do_osd_ops_execute: {} - object {} got error {}, {}; need_reload_obc {}",
+ m,
+ obc->obs.oi.soid,
+ e.value(),
+ e.message(),
+ need_reload_obc);
+ return (
+ need_reload_obc ? reload_obc(*obc)
+ : interruptor::make_interruptible(load_obc_ertr::now())
+ ).safe_then_interruptible([&e, failure_func=std::move(failure_func)] {
+ return std::move(failure_func)(e);
+ }, load_obc_ertr::assert_all{ "can't live with object state messed up" });
}));
}
+PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>>
+PG::do_osd_ops(
+ Ref<MOSDOp> m,
+ ObjectContextRef obc,
+ const OpInfo &op_info)
+{
+ if (__builtin_expect(stopping, false)) {
+ throw crimson::common::system_shutdown_exception();
+ }
+ auto ox = std::make_unique<OpsExecuter>(
+ obc, op_info, get_pool().info, get_backend(), *m);
+ return do_osd_ops_execute<Ref<MOSDOpReply>>(
+ std::move(*ox), m->ops, m, obc, op_info,
+ [this, m, obc, rvec = op_info.allows_returnvec()] {
+ // TODO: should stop at the first op which returns a negative retval,
+ // cmpext uses it for returning the index of first unmatched byte
+ int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
+ if (result > 0 && !rvec) {
+ result = 0;
+ }
+ auto reply = make_message<MOSDOpReply>(m.get(),
+ result,
+ get_osdmap_epoch(),
+ 0,
+ false);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ logger().debug(
+ "do_osd_ops: {} - object {} sending reply",
+ *m,
+ obc->obs.oi.soid);
+ return do_osd_ops_iertr::make_ready_future<Ref<MOSDOpReply>>(
+ std::move(reply));
+ },
+ [m, this] (const std::error_code& e) {
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+ reply->set_enoent_reply_versions(
+ peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ return do_osd_ops_iertr::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }
+ ).finally([ox_deleter=std::move(ox)] {});
+}
+
PG::interruptible_future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
{
if (__builtin_expect(stopping, false)) {