]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: switch ClientRequest::do_request to use *_executer rather than do_osd_ops
authorSamuel Just <sjust@redhat.com>
Thu, 26 Sep 2024 22:15:48 +0000 (15:15 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 15 Oct 2024 03:37:26 +0000 (20:37 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd_operations/client_request.cc

index 6eed04df6a5acb5388b716ef53dba44d540ea1d1..c226222fa0c75a828818df551310d567f524077e 100644 (file)
@@ -502,36 +502,129 @@ ClientRequest::do_process(
     co_return;
   }
 
-  auto [submitted, all_completed] = co_await pg->do_osd_ops(
-    m, r_conn, obc, op_info, snapc
-  ).handle_error_interruptible(
-    crimson::ct_error::eagain::handle([] {
-      ceph_assert(0 == "not handled");
-      return std::make_tuple(
-       interruptor::now(),
-       PG::do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>());
+  auto ox = seastar::make_lw_shared<OpsExecuter>(
+    pg, obc, op_info, *m, r_conn, snapc);
+  auto ret = co_await pg->run_executer(
+    ox, obc, op_info, m->ops
+  ).si_then([]() -> std::optional<std::error_code> {
+    return std::nullopt;
+  }).handle_error_interruptible(crimson::ct_error::all_same_way(
+    [](auto e) -> std::optional<std::error_code> {
+      return e;
     })
   );
-  co_await std::move(submitted);
 
-  co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+  auto should_log_error = [](std::error_code e) -> bool {
+    switch (e.value()) {
+    case EDQUOT:
+    case ENOSPC:
+    case EAGAIN:
+      return false;
+    default:
+      return true;
+    }
+  };
 
-  auto reply = co_await std::move(
-    all_completed
-  ).handle_error_interruptible(
-    crimson::ct_error::eagain::handle([] {
-      ceph_assert(0 == "not handled");
-      return MURef<MOSDOpReply>();
-    })
-  );
+  if (ret && !should_log_error(*ret)) {
+    co_await reply_op_error(pg, -ret->value());
+    co_return;
+  }
+
+  {
+    auto all_completed = interruptor::now();
+    if (ret) {
+      assert(should_log_error(*ret));
+      if (op_info.may_write()) {
+       auto rep_tid = pg->shard_services.get_tid();
+       auto version = co_await pg->submit_error_log(
+         m, op_info, obc, *ret, rep_tid);
+
+       all_completed = pg->complete_error_log(
+         rep_tid, version);
+      }
+      // simply return the error below, leaving all_completed alone
+    } else {
+      auto submitted = interruptor::now();
+      std::tie(submitted, all_completed) = co_await pg->submit_executer(
+       std::move(ox), m->ops);
+      co_await std::move(submitted);
+    }
+    co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+
+    co_await std::move(all_completed);
+  }
 
   co_await ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this);
-  DEBUGDPP("{}.{}: sending response",
-          *pg, *this, this_instance_id);
-  // TODO: gate the crosscore sending
-  co_await interruptor::make_interruptible(
-    get_foreign_connection().send_with_throttling(std::move(reply))
-  );
+
+  if (ret) {
+    int err = -ret->value();
+    DEBUGDPP("{}: replying with error {}", *pg, *this, err);
+
+    auto reply = crimson::make_message<MOSDOpReply>(
+      m.get(), err, pg->get_osdmap_epoch(), 0, false);
+
+    if (!m->ops.empty() && m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK) {
+      reply->set_result(0);
+    }
+
+    // For all ops except for CMPEXT, the correct error value is encoded
+    // in e. For CMPEXT, osdop.rval has the actual error value.
+    if (err == -ct_error::cmp_fail_error_value) {
+      assert(!m->ops.empty());
+      for (auto &osdop : m->ops) {
+       if (osdop.rval < 0) {
+         reply->set_result(osdop.rval);
+         break;
+       }
+      }
+    }
+
+    reply->set_enoent_reply_versions(
+      pg->peering_state.get_info().last_update,
+      pg->peering_state.get_info().last_user_version);
+    reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+    
+    // TODO: gate the crosscore sending
+    co_await interruptor::make_interruptible(
+      get_foreign_connection().send_with_throttling(std::move(reply)));
+  } else {
+    int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
+    if (op_info.may_read() && result >= 0) {
+      for (auto &osdop : m->ops) {
+       if (osdop.rval < 0 && !(osdop.op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+         result = osdop.rval.code;
+         break;
+       }
+      }
+    } else if (result > 0 && op_info.may_write() && !op_info.allows_returnvec()) {
+      result = 0;
+    } else if (result < 0 &&
+            (m->ops.empty() ?
+             0 : m->ops.back().op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
+      result = 0;
+    }
+    auto reply = crimson::make_message<MOSDOpReply>(
+      m.get(),
+      result,
+      pg->get_osdmap_epoch(),
+      0,
+      false);
+    reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+    if (obc->obs.exists) {
+      reply->set_reply_versions(pg->peering_state.get_info().last_update,
+                               obc->obs.oi.user_version);
+    } else {
+      reply->set_reply_versions(pg->peering_state.get_info().last_update,
+                               pg->peering_state.get_info().last_user_version);
+    }
+    
+    DEBUGDPP("{}.{}: sending response {}",
+            *pg, *this, this_instance_id, *m);
+    // TODO: gate the crosscore sending
+    co_await interruptor::make_interruptible(
+      get_foreign_connection().send_with_throttling(std::move(reply))
+    );
+  }
 }
 
 bool ClientRequest::is_misdirected(const PG& pg) const