From 1793bd9ce640559ecb91e10d15434596158c3fa3 Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Mon, 8 Jun 2020 17:58:47 +0800 Subject: [PATCH] crimson: restart ongoing client_request after peering Signed-off-by: Xuehan Xu --- src/crimson/common/exception.h | 13 ++++ src/crimson/osd/ec_backend.h | 1 + .../osd/osd_operations/client_request.cc | 77 +++++++++++-------- src/crimson/osd/pg.cc | 6 ++ src/crimson/osd/pg.h | 4 +- src/crimson/osd/pg_backend.cc | 7 ++ src/crimson/osd/pg_backend.h | 6 ++ src/crimson/osd/replicated_backend.cc | 29 +++++-- src/crimson/osd/replicated_backend.h | 5 +- 9 files changed, 106 insertions(+), 42 deletions(-) diff --git a/src/crimson/common/exception.h b/src/crimson/common/exception.h index 28faf75f1b998..05caf5ebd0c2c 100644 --- a/src/crimson/common/exception.h +++ b/src/crimson/common/exception.h @@ -18,6 +18,19 @@ public: } }; +class actingset_changed final : public std::exception { +public: + actingset_changed(bool sp) : still_primary(sp) {} + const char* what() const noexcept final { + return "acting set changed"; + } + bool is_primary() const { + return still_primary; + } +private: + const bool still_primary; +}; + template inline seastar::future<> handle_system_shutdown(Func&& func, Args&&... args) { diff --git a/src/crimson/osd/ec_backend.h b/src/crimson/osd/ec_backend.h index a213a3f1ee4ca..e15b19970cd32 100644 --- a/src/crimson/osd/ec_backend.h +++ b/src/crimson/osd/ec_backend.h @@ -20,6 +20,7 @@ public: seastar::future<> stop() final { return seastar::now(); } + void on_actingset_changed(peering_info_t pi) final {} private: ll_read_errorator::future _read(const hobject_t& hoid, uint64_t off, diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 8f8075f4efe70..142a2f0af06bf 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -59,38 +59,49 @@ seastar::future<> ClientRequest::start() IRef opref = this; return crimson::common::handle_system_shutdown( [this, opref=std::move(opref)]() mutable { - return with_blocking_future(handle.enter(cp().await_map)) - .then([this]() { - return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch())); - }).then([this](epoch_t epoch) { - return with_blocking_future(handle.enter(cp().get_pg)); - }).then([this] { - return with_blocking_future(osd.wait_for_pg(m->get_spg())); - }).then([this, opref=std::move(opref)](Ref pgref) { - return seastar::do_with( - std::move(pgref), std::move(opref), [this](auto& pgref, auto& opref) { - PG &pg = *pgref; + return seastar::repeat([this, opref]() mutable { + return with_blocking_future(handle.enter(cp().await_map)) + .then([this]() { + return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch())); + }).then([this](epoch_t epoch) { + return with_blocking_future(handle.enter(cp().get_pg)); + }).then([this] { + return with_blocking_future(osd.wait_for_pg(m->get_spg())); + }).then([this, opref](Ref pgref) { + PG &pg = *pgref; + return with_blocking_future( + handle.enter(pp(pg).await_map) + ).then([this, &pg]() mutable { return with_blocking_future( - handle.enter(pp(pg).await_map) - ).then([this, &pg]() mutable { - return with_blocking_future( - pg.osdmap_gate.wait_for_map(m->get_map_epoch())); - }).then([this, &pg](auto map) mutable { - return with_blocking_future( - handle.enter(pp(pg).wait_for_active)); - }).then([this, &pg]() mutable { - return with_blocking_future(pg.wait_for_active_blocker.wait()); - }).then([this, &pgref]() mutable { - if (m->finish_decode()) { - m->clear_payload(); - } - if (is_pg_op()) { - return process_pg_op(pgref); - } else { - return process_op(pgref); - } - }); + pg.osdmap_gate.wait_for_map(m->get_map_epoch())); + }).then([this, &pg](auto map) mutable { + return with_blocking_future( + handle.enter(pp(pg).wait_for_active)); + }).then([this, &pg]() mutable { + return with_blocking_future(pg.wait_for_active_blocker.wait()); + }).then([this, pgref=std::move(pgref)]() mutable { + if (m->finish_decode()) { + m->clear_payload(); + } + if (is_pg_op()) { + return process_pg_op(pgref); + } else { + return process_op(pgref); + } }); + }).then([] { + return seastar::stop_iteration::yes; + }).handle_exception_type([](crimson::common::actingset_changed& e) { + if (e.is_primary()) { + crimson::get_logger(ceph_subsys_osd).debug( + "operation restart, acting set changed"); + return seastar::stop_iteration::no; + } else { + crimson::get_logger(ceph_subsys_osd).debug( + "operation abort, up primary changed"); + return seastar::stop_iteration::yes; + } + }); }); }); } @@ -99,7 +110,7 @@ seastar::future<> ClientRequest::process_pg_op( Ref &pg) { return pg->do_pg_ops(m) - .then([this](Ref reply) { + .then([this, pg=std::move(pg)](Ref reply) { return conn->send(reply); }); } @@ -110,7 +121,7 @@ seastar::future<> ClientRequest::process_op( PG& pg = *pgref; return with_blocking_future( handle.enter(pp(pg).recover_missing) - ).then([this, &pg, pgref=std::move(pgref)] { + ).then([this, &pg, pgref] { eversion_t ver; const hobject_t& soid = m->get_hobj(); if (pg.is_unreadable_object(soid, &ver)) { @@ -136,7 +147,7 @@ seastar::future<> ClientRequest::process_op( return conn->send(reply); }); }); - }).safe_then([] { + }).safe_then([pgref=std::move(pgref)] { return seastar::now(); }, PG::load_obc_ertr::all_same_way([](auto &code) { logger().error("ClientRequest saw error code {}", code); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 6f2da55c4f66f..6935123bfad9c 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -267,6 +267,7 @@ void PG::on_activate_complete() get_osdmap_epoch(), PeeringState::AllReplicasRecovered{}); } + backend->on_activate_complete(); } void PG::prepare_write(pg_info_t &info, @@ -918,4 +919,9 @@ seastar::future<> PG::stop() }); } +void PG::on_change(ceph::os::Transaction &t) { + recovery_backend->on_peering_interval_change(t); + backend->on_actingset_changed({ is_primary() }); +} + } diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index d4beaf9a52b58..d4a47549c5c64 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -302,9 +302,7 @@ public: void on_role_change() final { // Not needed yet } - void on_change(ceph::os::Transaction &t) final { - recovery_backend->on_peering_interval_change(t); - } + void on_change(ceph::os::Transaction &t) final; void on_activate(interval_set to_trim) final; void on_activate_complete() final; void on_new_interval() final { diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index f203a72f114f3..9eef59a2ddbc4 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -129,6 +129,9 @@ PGBackend::mutate_object( epoch_t map_epoch, std::vector&& log_entries) { + if (__builtin_expect((bool)peering, false)) { + throw crimson::common::actingset_changed(peering->is_primary); + } logger().trace("mutate_object: num_ops={}", txn.get_num_ops()); if (obc->obs.exists) { #if 0 @@ -693,3 +696,7 @@ PGBackend::fiemap( return store->fiemap(c, oid, off, len); } +void PGBackend::on_activate_complete() { + peering.reset(); +} + diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index bb2637fb328a2..4a1442ab7e758 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -137,11 +137,17 @@ public: virtual void got_rep_op_reply(const MOSDRepOpReply&) {} virtual seastar::future<> stop() = 0; + struct peering_info_t { + bool is_primary; + }; + virtual void on_actingset_changed(peering_info_t pi) = 0; + virtual void on_activate_complete(); protected: const shard_id_t shard; CollectionRef coll; crimson::os::FuturizedStore* store; bool stopping = false; + std::optional peering; public: struct loaded_object_md_t { ObjectState os; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 780d6a746195b..51fe69394fa65 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -50,11 +50,14 @@ ReplicatedBackend::_submit_transaction(std::set&& pg_shards, if (__builtin_expect(stopping, false)) { throw crimson::common::system_shutdown_exception(); } + if (__builtin_expect((bool)peering, false)) { + throw crimson::common::actingset_changed(peering->is_primary); + } const ceph_tid_t tid = next_txn_id++; auto req_id = osd_op_p.req->get_reqid(); auto pending_txn = - pending_trans.emplace(tid, pending_on_t{pg_shards.size()}).first; + pending_trans.emplace(tid, pg_shards.size()).first; bufferlist encoded_txn; encode(txn, encoded_txn); @@ -78,11 +81,17 @@ ReplicatedBackend::_submit_transaction(std::set&& pg_shards, // TODO: set more stuff. e.g., pg_states return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch); } - }).then([&peers=pending_txn->second] { - if (--peers.pending == 0) { - peers.all_committed.set_value(); + }).then([this, peers=pending_txn->second.weak_from_this()] { + if (!peers) { + // for now, only actingset_changed can cause peers + // to be nullptr + assert(peering); + throw crimson::common::actingset_changed(peering->is_primary); } - return peers.all_committed.get_future(); + if (--peers->pending == 0) { + peers->all_committed.set_value(); + } + return peers->all_committed.get_future(); }).then([pending_txn, this] { pending_txn->second.all_committed = {}; auto acked_peers = std::move(pending_txn->second.acked_peers); @@ -91,6 +100,15 @@ ReplicatedBackend::_submit_transaction(std::set&& pg_shards, }); } +void ReplicatedBackend::on_actingset_changed(peering_info_t pi) +{ + peering.emplace(pi); + crimson::common::actingset_changed e_actingset_changed{peering->is_primary}; + for (auto& [tid, pending_txn] : pending_trans) { + pending_txn.all_committed.set_exception(e_actingset_changed); + } +} + void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply) { auto found = pending_trans.find(reply.get_tid()); @@ -118,5 +136,6 @@ seastar::future<> ReplicatedBackend::stop() pending_on.all_committed.set_exception( crimson::common::system_shutdown_exception()); } + pending_trans.clear(); return seastar::now(); } diff --git a/src/crimson/osd/replicated_backend.h b/src/crimson/osd/replicated_backend.h index dbefb30300672..01c0bba6490ef 100644 --- a/src/crimson/osd/replicated_backend.h +++ b/src/crimson/osd/replicated_backend.h @@ -5,6 +5,7 @@ #include #include +#include #include "include/buffer_fwd.h" #include "osd/osd_types.h" @@ -23,6 +24,7 @@ public: crimson::osd::ShardServices& shard_services); void got_rep_op_reply(const MOSDRepOpReply& reply) final; seastar::future<> stop() final; + void on_actingset_changed(peering_info_t pi) final; private: ll_read_errorator::future _read(const hobject_t& hoid, uint64_t off, @@ -39,7 +41,8 @@ private: const pg_shard_t whoami; crimson::osd::ShardServices& shard_services; ceph_tid_t next_txn_id = 0; - struct pending_on_t { + class pending_on_t : public seastar::weakly_referencable { + public: pending_on_t(size_t pending) : pending{static_cast(pending)} {} -- 2.39.5