From 14b322ec02285dd20d16d4968d3852d3a614994a Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Tue, 2 Feb 2021 19:25:01 +0800 Subject: [PATCH] crimson/osd: make client_requests idempotent When redoing client requests, they might have already taken effect on the underlying disk. This commit deals with that situation by making those requests immediately reply to clients if they are already done Signed-off-by: Xuehan Xu --- src/crimson/osd/ec_backend.h | 4 +++ .../osd/osd_operations/client_request.cc | 32 +++++++++++------ src/crimson/osd/pg.cc | 18 ++++++++++ src/crimson/osd/pg.h | 1 + src/crimson/osd/pg_backend.h | 5 +++ src/crimson/osd/replicated_backend.cc | 35 +++++++++++++++++-- src/crimson/osd/replicated_backend.h | 15 ++++++-- 7 files changed, 95 insertions(+), 15 deletions(-) diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index 2db1d88063b..2161e061dac 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -35,4 +35,8 @@ private: std::vector&& log_entries) final; CollectionRef coll; crimson::os::FuturizedStore* store; + seastar::future<> request_committed(const osd_reqid_t& reqid, + const eversion_t& version) final { + return seastar::now(); + } }; diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index ee215c0033f..357ad513f76 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -1,5 +1,5 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab #include @@ -122,14 +122,26 @@ seastar::future<> ClientRequest::process_op(Ref &pg) [this, pg]() mutable { return do_recover_missing(pg); }).then([this, pg]() mutable { - return with_blocking_future(handle.enter(pp(*pg).get_obc)); - }).then([this, pg]() mutable -> PG::load_obc_ertr::future<> { - op_info.set_from_op(&*m, *pg->get_osdmap()); - return pg->with_locked_obc(m, op_info, this, [this, pg](auto obc) mutable { - return with_blocking_future(handle.enter(pp(*pg).process)).then( - [this, pg, obc]() mutable { - return do_process(pg, obc); - }); + return pg->already_complete(m->get_reqid()).then_unpack( + [this, pg](bool completed, int ret) mutable + -> PG::load_obc_ertr::future<> { + if (completed) { + auto reply = make_message( + m.get(), ret, pg->get_osdmap_epoch(), + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false); + return conn->send(std::move(reply)); + } else { + return with_blocking_future(handle.enter(pp(*pg).get_obc)).then( + [this, pg]() mutable -> PG::load_obc_ertr::future<> { + op_info.set_from_op(&*m, *pg->get_osdmap()); + return pg->with_locked_obc(m, op_info, this, [this, pg](auto obc) mutable { + return with_blocking_future(handle.enter(pp(*pg).process)).then( + [this, pg, obc]() mutable { + return do_process(pg, obc); + }); + }); + }); + } }); }).safe_then([pg=std::move(pg)] { return seastar::now(); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 27d49ef1bc5..d40e957469d 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1122,4 +1122,22 @@ bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const { return false; } +seastar::future> +PG::already_complete(const osd_reqid_t& reqid) +{ + eversion_t version; + version_t user_version; + int ret; + std::vector op_returns; + + if (peering_state.get_pg_log().get_log().get_request( + reqid, &version, &user_version, &ret, &op_returns)) { + return backend->request_committed(reqid, version).then([ret] { + return seastar::make_ready_future>(true, ret); + }); + } else { + return seastar::make_ready_future>(false, 0); + } +} + } diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index ec8d73a9175..0910014756f 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -624,6 +624,7 @@ public: return &it->second; } } + seastar::future> already_complete(const osd_reqid_t& reqid); int get_recovery_op_priority() const { int64_t pri = 0; get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri); diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index 01604a64410..c02f35f3979 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -30,6 +30,7 @@ namespace ceph::os { namespace crimson::osd { class ShardServices; + class PG; } class PGBackend @@ -207,6 +208,9 @@ protected: crimson::os::FuturizedStore* store; bool stopping = false; std::optional peering; + virtual seastar::future<> request_committed( + const osd_reqid_t& reqid, + const eversion_t& at_version) = 0; public: struct loaded_object_md_t { ObjectState os; @@ -232,4 +236,5 @@ private: epoch_t min_epoch, epoch_t max_epoch, std::vector&& log_entries) = 0; friend class ReplicatedRecoveryBackend; + friend class ::crimson::osd::PG; }; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 91854bcab14..63ee96bfae2 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -57,7 +57,7 @@ ReplicatedBackend::_submit_transaction(std::set&& pg_shards, const ceph_tid_t tid = next_txn_id++; auto req_id = osd_op_p.req->get_reqid(); auto pending_txn = - pending_trans.emplace(tid, pg_shards.size()).first; + pending_trans.try_emplace(tid, pg_shards.size(), osd_op_p.at_version).first; bufferlist encoded_txn; encode(txn, encoded_txn); @@ -93,7 +93,7 @@ ReplicatedBackend::_submit_transaction(std::set&& pg_shards, peers->all_committed = {}; return seastar::now(); } - return peers->all_committed.get_future(); + return peers->all_committed.get_shared_future(); }).then([pending_txn, this] { auto acked_peers = std::move(pending_txn->second.acked_peers); pending_trans.erase(pending_txn); @@ -142,3 +142,34 @@ seastar::future<> ReplicatedBackend::stop() pending_trans.clear(); return seastar::now(); } + +seastar::future<> +ReplicatedBackend::request_committed(const osd_reqid_t& reqid, + const eversion_t& at_version) +{ + if (std::empty(pending_trans)) { + return seastar::now(); + } + auto iter = pending_trans.begin(); + auto& pending_txn = iter->second; + if (pending_txn.at_version > at_version) { + return seastar::now(); + } + for (; iter->second.at_version < at_version; ++iter); + // As for now, the previous client_request with the same reqid + // mustn't have finished, as that would mean later client_requests + // has finished before earlier ones. + // + // The following line of code should be "assert(pending_txn.at_version == at_version)", + // as there can be only one transaction at any time in pending_trans due to + // PG::client_request_pg_pipeline. But there's a high possibility that we will + // improve the parallelism here in the future, which means there may be multiple + // client requests in flight, so we loosed the restriction to as follows. Correct + // me if I'm wrong:-) + assert(iter != pending_trans.end() && iter->second.at_version == at_version); + if (iter->second.pending) { + return iter->second.all_committed.get_shared_future(); + } else { + return seastar::now(); + } +} diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index aa6d0a09191..6b1b57e5285 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -43,13 +43,22 @@ private: ceph_tid_t next_txn_id = 0; class pending_on_t : public seastar::weakly_referencable { public: - pending_on_t(size_t pending) - : pending{static_cast(pending)} + pending_on_t(size_t pending, const eversion_t& at_version) + : pending{static_cast(pending)}, at_version(at_version) {} unsigned pending; + // The order of pending_txns' at_version must be the same as their + // corresponding ceph_tid_t, as we rely on this condition for checking + // whether a client request is already completed. To put it another + // way, client requests at_version must be updated synchorously/simultaneously + // with ceph_tid_t. + const eversion_t at_version; crimson::osd::acked_peers_t acked_peers; - seastar::promise<> all_committed; + seastar::shared_promise<> all_committed; }; using pending_transactions_t = std::map; pending_transactions_t pending_trans; + + seastar::future<> request_committed( + const osd_reqid_t& reqid, const eversion_t& at_version) final; }; -- 2.39.5