co_return;
}
- auto [submitted, all_completed] = co_await pg->do_osd_ops(
- m, r_conn, obc, op_info, snapc
- ).handle_error_interruptible(
- crimson::ct_error::eagain::handle([] {
- ceph_assert(0 == "not handled");
- return std::make_tuple(
- interruptor::now(),
- PG::do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>());
+ auto ox = seastar::make_lw_shared<OpsExecuter>(
+ pg, obc, op_info, *m, r_conn, snapc);
+ auto ret = co_await pg->run_executer(
+ ox, obc, op_info, m->ops
+ ).si_then([]() -> std::optional<std::error_code> {
+ return std::nullopt;
+ }).handle_error_interruptible(crimson::ct_error::all_same_way(
+ [](auto e) -> std::optional<std::error_code> {
+ return e;
})
);
- co_await std::move(submitted);
- co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+ auto should_log_error = [](std::error_code e) -> bool {
+ switch (e.value()) {
+ case EDQUOT:
+ case ENOSPC:
+ case EAGAIN:
+ return false;
+ default:
+ return true;
+ }
+ };
- auto reply = co_await std::move(
- all_completed
- ).handle_error_interruptible(
- crimson::ct_error::eagain::handle([] {
- ceph_assert(0 == "not handled");
- return MURef<MOSDOpReply>();
- })
- );
+ if (ret && !should_log_error(*ret)) {
+ co_await reply_op_error(pg, -ret->value());
+ co_return;
+ }
+
+ {
+ auto all_completed = interruptor::now();
+ if (ret) {
+ assert(should_log_error(*ret));
+ if (op_info.may_write()) {
+ auto rep_tid = pg->shard_services.get_tid();
+ auto version = co_await pg->submit_error_log(
+ m, op_info, obc, *ret, rep_tid);
+
+ all_completed = pg->complete_error_log(
+ rep_tid, version);
+ }
+ // simply return the error below, leaving all_completed alone
+ } else {
+ auto submitted = interruptor::now();
+ std::tie(submitted, all_completed) = co_await pg->submit_executer(
+ std::move(ox), m->ops);
+ co_await std::move(submitted);
+ }
+ co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+
+ co_await std::move(all_completed);
+ }
co_await ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this);
- DEBUGDPP("{}.{}: sending response",
- *pg, *this, this_instance_id);
- // TODO: gate the crosscore sending
- co_await interruptor::make_interruptible(
- get_foreign_connection().send_with_throttling(std::move(reply))
- );
+
+ if (ret) {
+ int err = -ret->value();
+ DEBUGDPP("{}: replying with error {}", *pg, *this, err);
+
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(), err, pg->get_osdmap_epoch(), 0, false);
+
+ if (!m->ops.empty() && m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
+ reply->set_result(0);
+ }
+
+ // For all ops except for CMPEXT, the correct error value is encoded
+ // in e. For CMPEXT, osdop.rval has the actual error value.
+ if (err == -ct_error::cmp_fail_error_value) {
+ assert(!m->ops.empty());
+ for (auto &osdop : m->ops) {
+ if (osdop.rval < 0) {
+ reply->set_result(osdop.rval);
+ break;
+ }
+ }
+ }
+
+ reply->set_enoent_reply_versions(
+ pg->peering_state.get_info().last_update,
+ pg->peering_state.get_info().last_user_version);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+
+ // TODO: gate the crosscore sending
+ co_await interruptor::make_interruptible(
+ get_foreign_connection().send_with_throttling(std::move(reply)));
+ } else {
+ int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
+ if (op_info.may_read() && result >= 0) {
+ for (auto &osdop : m->ops) {
+ if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+ result = osdop.rval.code;
+ break;
+ }
+ }
+ } else if (result > 0 && op_info.may_write() && !op_info.allows_returnvec()) {
+ result = 0;
+ } else if (result < 0 &&
+ (m->ops.empty() ?
+ 0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+ result = 0;
+ }
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(),
+ result,
+ pg->get_osdmap_epoch(),
+ 0,
+ false);
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ if (obc->obs.exists) {
+ reply->set_reply_versions(pg->peering_state.get_info().last_update,
+ obc->obs.oi.user_version);
+ } else {
+ reply->set_reply_versions(pg->peering_state.get_info().last_update,
+ pg->peering_state.get_info().last_user_version);
+ }
+
+ DEBUGDPP("{}.{}: sending response {}",
+ *pg, *this, this_instance_id, *m);
+ // TODO: gate the crosscore sending
+ co_await interruptor::make_interruptible(
+ get_foreign_connection().send_with_throttling(std::move(reply))
+ );
+ }
}
bool ClientRequest::is_misdirected(const PG& pg) const