]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: make client_requests idempotent 39356/head
authorXuehan Xu <xxhdx1985126@gmail.com>
Tue, 2 Feb 2021 11:25:01 +0000 (19:25 +0800)
committerKefu Chai <kchai@redhat.com>
Thu, 18 Feb 2021 06:37:52 +0000 (14:37 +0800)
When redoing client requests, they might have already taken effect
on the underlying disk. This commit deals with that situation by
making those requests immediately reply to clients if they are already
done

Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
src/crimson/osd/ec_backend.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.h
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_backend.h

index 2db1d88063b42c495e22caad207685528d2a2d39..2161e061dac415f5d047131b55168851fdcd6afa 100644 (file)
@@ -35,4 +35,8 @@ private:
                      std::vector<pg_log_entry_t>&& log_entries) final;
   CollectionRef coll;
   crimson::os::FuturizedStore* store;
+  seastar::future<> request_committed(const osd_reqid_t& reqid,
+                                      const eversion_t& version) final {
+    return seastar::now();
+  }
 };
index ee215c0033fca005d62a6a7ea2cc1c6296b96acf..357ad513f7694125fd83b2151f63bfcebf7f68fd 100644 (file)
@@ -1,5 +1,5 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
 
 #include <seastar/core/future.hh>
 
@@ -122,14 +122,26 @@ seastar::future<> ClientRequest::process_op(Ref<PG> &pg)
     [this, pg]() mutable {
     return do_recover_missing(pg);
   }).then([this, pg]() mutable {
-    return with_blocking_future(handle.enter(pp(*pg).get_obc));
-  }).then([this, pg]() mutable -> PG::load_obc_ertr::future<> {
-    op_info.set_from_op(&*m, *pg->get_osdmap());
-    return pg->with_locked_obc(m, op_info, this, [this, pg](auto obc) mutable {
-      return with_blocking_future(handle.enter(pp(*pg).process)).then(
-       [this, pg, obc]() mutable {
-        return do_process(pg, obc);
-      });
+    return pg->already_complete(m->get_reqid()).then_unpack(
+      [this, pg](bool completed, int ret) mutable
+      -> PG::load_obc_ertr::future<> {
+      if (completed) {
+        auto reply = make_message<MOSDOpReply>(
+          m.get(), ret, pg->get_osdmap_epoch(),
+          CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
+        return conn->send(std::move(reply));
+      } else {
+        return with_blocking_future(handle.enter(pp(*pg).get_obc)).then(
+          [this, pg]() mutable -> PG::load_obc_ertr::future<> {
+          op_info.set_from_op(&*m, *pg->get_osdmap());
+          return pg->with_locked_obc(m, op_info, this, [this, pg](auto obc) mutable {
+            return with_blocking_future(handle.enter(pp(*pg).process)).then(
+              [this, pg, obc]() mutable {
+              return do_process(pg, obc);
+            });
+          });
+        });
+      }
     });
   }).safe_then([pg=std::move(pg)] {
     return seastar::now();
index 27d49ef1bc5666b1f1aae000fc52dde826f5410f..d40e957469db8e3e1523f39f318597100e7da0ec 100644 (file)
@@ -1122,4 +1122,22 @@ bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const {
   return false;
 }
 
+seastar::future<std::tuple<bool, int>>
+PG::already_complete(const osd_reqid_t& reqid)
+{
+  eversion_t version;
+  version_t user_version;
+  int ret;
+  std::vector<pg_log_op_return_item_t> op_returns;
+
+  if (peering_state.get_pg_log().get_log().get_request(
+       reqid, &version, &user_version, &ret, &op_returns)) {
+    return backend->request_committed(reqid, version).then([ret] {
+      return seastar::make_ready_future<std::tuple<bool, int>>(true, ret);
+    });
+  } else {
+    return seastar::make_ready_future<std::tuple<bool, int>>(false, 0);
+  }
+}
+
 }
index ec8d73a9175a1850fa32cfe206373691e06a59a6..0910014756f9d0f6a0e9ec2e32f8a556b9290265 100644 (file)
@@ -624,6 +624,7 @@ public:
        return &it->second;
     }
   }
+  seastar::future<std::tuple<bool, int>> already_complete(const osd_reqid_t& reqid);
   int get_recovery_op_priority() const {
     int64_t pri = 0;
     get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
index 01604a64410f8a41f27034a4166e991ee676e3aa..c02f35f3979e75f8ae870f6ad594fc6cf0967761 100644 (file)
@@ -30,6 +30,7 @@ namespace ceph::os {
 
 namespace crimson::osd {
   class ShardServices;
+  class PG;
 }
 
 class PGBackend
@@ -207,6 +208,9 @@ protected:
   crimson::os::FuturizedStore* store;
   bool stopping = false;
   std::optional<peering_info_t> peering;
+  virtual seastar::future<> request_committed(
+    const osd_reqid_t& reqid,
+    const eversion_t& at_version) = 0;
 public:
   struct loaded_object_md_t {
     ObjectState os;
@@ -232,4 +236,5 @@ private:
                      epoch_t min_epoch, epoch_t max_epoch,
                      std::vector<pg_log_entry_t>&& log_entries) = 0;
   friend class ReplicatedRecoveryBackend;
+  friend class ::crimson::osd::PG;
 };
index 91854bcab141da0bb69296a7908969162ca6e9ad..63ee96bfae21ff6774955338581f55c318458fbd 100644 (file)
@@ -57,7 +57,7 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
   const ceph_tid_t tid = next_txn_id++;
   auto req_id = osd_op_p.req->get_reqid();
   auto pending_txn =
-    pending_trans.emplace(tid, pg_shards.size()).first;
+    pending_trans.try_emplace(tid, pg_shards.size(), osd_op_p.at_version).first;
   bufferlist encoded_txn;
   encode(txn, encoded_txn);
 
@@ -93,7 +93,7 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
        peers->all_committed = {};
        return seastar::now();
       }
-      return peers->all_committed.get_future();
+      return peers->all_committed.get_shared_future();
     }).then([pending_txn, this] {
       auto acked_peers = std::move(pending_txn->second.acked_peers);
       pending_trans.erase(pending_txn);
@@ -142,3 +142,34 @@ seastar::future<> ReplicatedBackend::stop()
   pending_trans.clear();
   return seastar::now();
 }
+
+seastar::future<>
+ReplicatedBackend::request_committed(const osd_reqid_t& reqid,
+                                   const eversion_t& at_version)
+{
+  if (std::empty(pending_trans)) {
+    return seastar::now();
+  }
+  auto iter = pending_trans.begin();
+  auto& pending_txn = iter->second;
+  if (pending_txn.at_version > at_version) {
+    return seastar::now();
+  }
+  for (; iter->second.at_version < at_version; ++iter);
+  // As for now, the previous client_request with the same reqid
+  // mustn't have finished, as that would mean later client_requests
+  // has finished before earlier ones.
+  //
+  // The following line of code should be "assert(pending_txn.at_version == at_version)",
+  // as there can be only one transaction at any time in pending_trans due to
+  // PG::client_request_pg_pipeline. But there's a high possibility that we will
+  // improve the parallelism here in the future, which means there may be multiple
+  // client requests in flight, so we loosed the restriction to as follows. Correct
+  // me if I'm wrong:-)
+  assert(iter != pending_trans.end() && iter->second.at_version == at_version);
+  if (iter->second.pending) {
+    return iter->second.all_committed.get_shared_future();
+  } else {
+    return seastar::now();
+  }
+}
index aa6d0a09191e93c4a8967065257aa7c3915737ec..6b1b57e528505f6fc4290d49c9aac0bd7d2a8340 100644 (file)
@@ -43,13 +43,22 @@ private:
   ceph_tid_t next_txn_id = 0;
   class pending_on_t : public seastar::weakly_referencable<pending_on_t> {
   public:
-    pending_on_t(size_t pending)
-      : pending{static_cast<unsigned>(pending)}
+    pending_on_t(size_t pending, const eversion_t& at_version)
+      : pending{static_cast<unsigned>(pending)}, at_version(at_version)
     {}
     unsigned pending;
+    // The order of pending_txns' at_version must be the same as their
+    // corresponding ceph_tid_t, as we rely on this condition for checking
+    // whether a client request is already completed. To put it another
+    // way, client requests at_version must be updated synchorously/simultaneously
+    // with ceph_tid_t.
+    const eversion_t at_version;
     crimson::osd::acked_peers_t acked_peers;
-    seastar::promise<> all_committed;
+    seastar::shared_promise<> all_committed;
   };
   using pending_transactions_t = std::map<ceph_tid_t, pending_on_t>;
   pending_transactions_t pending_trans;
+
+  seastar::future<> request_committed(
+    const osd_reqid_t& reqid, const eversion_t& at_version) final;
 };