std::vector<pg_log_entry_t>&& log_entries) final;
CollectionRef coll;
crimson::os::FuturizedStore* store;
+ seastar::future<> request_committed(const osd_reqid_t& reqid,
+ const eversion_t& version) final {
+ return seastar::now();
+ }
};
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
#include <seastar/core/future.hh>
[this, pg]() mutable {
return do_recover_missing(pg);
}).then([this, pg]() mutable {
- return with_blocking_future(handle.enter(pp(*pg).get_obc));
- }).then([this, pg]() mutable -> PG::load_obc_ertr::future<> {
- op_info.set_from_op(&*m, *pg->get_osdmap());
- return pg->with_locked_obc(m, op_info, this, [this, pg](auto obc) mutable {
- return with_blocking_future(handle.enter(pp(*pg).process)).then(
- [this, pg, obc]() mutable {
- return do_process(pg, obc);
- });
+ return pg->already_complete(m->get_reqid()).then_unpack(
+ [this, pg](bool completed, int ret) mutable
+ -> PG::load_obc_ertr::future<> {
+ if (completed) {
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), ret, pg->get_osdmap_epoch(),
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
+ return conn->send(std::move(reply));
+ } else {
+ return with_blocking_future(handle.enter(pp(*pg).get_obc)).then(
+ [this, pg]() mutable -> PG::load_obc_ertr::future<> {
+ op_info.set_from_op(&*m, *pg->get_osdmap());
+ return pg->with_locked_obc(m, op_info, this, [this, pg](auto obc) mutable {
+ return with_blocking_future(handle.enter(pp(*pg).process)).then(
+ [this, pg, obc]() mutable {
+ return do_process(pg, obc);
+ });
+ });
+ });
+ }
});
}).safe_then([pg=std::move(pg)] {
return seastar::now();
return false;
}
+seastar::future<std::tuple<bool, int>>
+PG::already_complete(const osd_reqid_t& reqid)
+{
+ eversion_t version;
+ version_t user_version;
+ int ret;
+ std::vector<pg_log_op_return_item_t> op_returns;
+
+ if (peering_state.get_pg_log().get_log().get_request(
+ reqid, &version, &user_version, &ret, &op_returns)) {
+ return backend->request_committed(reqid, version).then([ret] {
+ return seastar::make_ready_future<std::tuple<bool, int>>(true, ret);
+ });
+ } else {
+ return seastar::make_ready_future<std::tuple<bool, int>>(false, 0);
+ }
+}
+
}
return &it->second;
}
}
+ seastar::future<std::tuple<bool, int>> already_complete(const osd_reqid_t& reqid);
int get_recovery_op_priority() const {
int64_t pri = 0;
get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
namespace crimson::osd {
class ShardServices;
+ class PG;
}
class PGBackend
crimson::os::FuturizedStore* store;
bool stopping = false;
std::optional<peering_info_t> peering;
+ virtual seastar::future<> request_committed(
+ const osd_reqid_t& reqid,
+ const eversion_t& at_version) = 0;
public:
struct loaded_object_md_t {
ObjectState os;
epoch_t min_epoch, epoch_t max_epoch,
std::vector<pg_log_entry_t>&& log_entries) = 0;
friend class ReplicatedRecoveryBackend;
+ friend class ::crimson::osd::PG;
};
const ceph_tid_t tid = next_txn_id++;
auto req_id = osd_op_p.req->get_reqid();
auto pending_txn =
- pending_trans.emplace(tid, pg_shards.size()).first;
+ pending_trans.try_emplace(tid, pg_shards.size(), osd_op_p.at_version).first;
bufferlist encoded_txn;
encode(txn, encoded_txn);
peers->all_committed = {};
return seastar::now();
}
- return peers->all_committed.get_future();
+ return peers->all_committed.get_shared_future();
}).then([pending_txn, this] {
auto acked_peers = std::move(pending_txn->second.acked_peers);
pending_trans.erase(pending_txn);
pending_trans.clear();
return seastar::now();
}
+
+seastar::future<>
+ReplicatedBackend::request_committed(const osd_reqid_t& reqid,
+ const eversion_t& at_version)
+{
+ if (std::empty(pending_trans)) {
+ return seastar::now();
+ }
+ auto iter = pending_trans.begin();
+ auto& pending_txn = iter->second;
+ if (pending_txn.at_version > at_version) {
+ return seastar::now();
+ }
+ for (; iter->second.at_version < at_version; ++iter);
+ // As for now, the previous client_request with the same reqid
+ // mustn't have finished, as that would mean later client_requests
+ // has finished before earlier ones.
+ //
+ // The following line of code should be "assert(pending_txn.at_version == at_version)",
+ // as there can be only one transaction at any time in pending_trans due to
+ // PG::client_request_pg_pipeline. But there's a high possibility that we will
+ // improve the parallelism here in the future, which means there may be multiple
+ // client requests in flight, so we loosed the restriction to as follows. Correct
+ // me if I'm wrong:-)
+ assert(iter != pending_trans.end() && iter->second.at_version == at_version);
+ if (iter->second.pending) {
+ return iter->second.all_committed.get_shared_future();
+ } else {
+ return seastar::now();
+ }
+}
ceph_tid_t next_txn_id = 0;
class pending_on_t : public seastar::weakly_referencable<pending_on_t> {
public:
- pending_on_t(size_t pending)
- : pending{static_cast<unsigned>(pending)}
+ pending_on_t(size_t pending, const eversion_t& at_version)
+ : pending{static_cast<unsigned>(pending)}, at_version(at_version)
{}
unsigned pending;
+ // The order of pending_txns' at_version must be the same as their
+ // corresponding ceph_tid_t, as we rely on this condition for checking
+ // whether a client request is already completed. To put it another
+ // way, client requests at_version must be updated synchorously/simultaneously
+ // with ceph_tid_t.
+ const eversion_t at_version;
crimson::osd::acked_peers_t acked_peers;
- seastar::promise<> all_committed;
+ seastar::shared_promise<> all_committed;
};
using pending_transactions_t = std::map<ceph_tid_t, pending_on_t>;
pending_transactions_t pending_trans;
+
+ seastar::future<> request_committed(
+ const osd_reqid_t& reqid, const eversion_t& at_version) final;
};