]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: restart ongoing client_request after peering
authorXuehan Xu <xxhdx1985126@163.com>
Mon, 8 Jun 2020 09:58:47 +0000 (17:58 +0800)
committerXuehan Xu <xxhdx1985126@163.com>
Thu, 18 Jun 2020 01:50:50 +0000 (09:50 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@163.com>
src/crimson/common/exception.h
src/crimson/osd/ec_backend.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_backend.h

index 28faf75f1b9981484668db237c79936b8730418d..05caf5ebd0c2c6e4e3a6132892dfeab8a59b763f 100644 (file)
@@ -18,6 +18,19 @@ public:
   }
 };
 
+class actingset_changed final : public std::exception {
+public:
+  actingset_changed(bool sp) : still_primary(sp) {}
+  const char* what() const noexcept final {
+    return "acting set changed";
+  }
+  bool is_primary() const {
+    return still_primary;
+  }
+private:
+  const bool still_primary;
+};
+
 template<typename Func, typename... Args>
 inline seastar::future<> handle_system_shutdown(Func&& func, Args&&... args)
 {
index a213a3f1ee4ca16e326398d5071d2fcd3d5580c5..e15b19970cd321f0d8624c136c1aac0361b0c8fe 100644 (file)
@@ -20,6 +20,7 @@ public:
   seastar::future<> stop() final {
     return seastar::now();
   }
+  void on_actingset_changed(peering_info_t pi) final {}
 private:
   ll_read_errorator::future<ceph::bufferlist> _read(const hobject_t& hoid,
                                                     uint64_t off,
index 8f8075f4efe708861d2559713d7ffdce087d5b7a..142a2f0af06bf2971d05906355d37b47dbcc3461 100644 (file)
@@ -59,38 +59,49 @@ seastar::future<> ClientRequest::start()
   IRef opref = this;
   return crimson::common::handle_system_shutdown(
     [this, opref=std::move(opref)]() mutable {
-    return with_blocking_future(handle.enter(cp().await_map))
-    .then([this]() {
-      return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
-    }).then([this](epoch_t epoch) {
-      return with_blocking_future(handle.enter(cp().get_pg));
-    }).then([this] {
-      return with_blocking_future(osd.wait_for_pg(m->get_spg()));
-    }).then([this, opref=std::move(opref)](Ref<PG> pgref) {
-      return seastar::do_with(
-       std::move(pgref), std::move(opref), [this](auto& pgref, auto& opref) {
-         PG &pg = *pgref;
+    return seastar::repeat([this, opref]() mutable {
+      return with_blocking_future(handle.enter(cp().await_map))
+      .then([this]() {
+       return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
+      }).then([this](epoch_t epoch) {
+       return with_blocking_future(handle.enter(cp().get_pg));
+      }).then([this] {
+       return with_blocking_future(osd.wait_for_pg(m->get_spg()));
+      }).then([this, opref](Ref<PG> pgref) {
+       PG &pg = *pgref;
+       return with_blocking_future(
+         handle.enter(pp(pg).await_map)
+       ).then([this, &pg]() mutable {
          return with_blocking_future(
-           handle.enter(pp(pg).await_map)
-         ).then([this, &pg]() mutable {
-           return with_blocking_future(
-             pg.osdmap_gate.wait_for_map(m->get_map_epoch()));
-         }).then([this, &pg](auto map) mutable {
-           return with_blocking_future(
-             handle.enter(pp(pg).wait_for_active));
-         }).then([this, &pg]() mutable {
-           return with_blocking_future(pg.wait_for_active_blocker.wait());
-         }).then([this, &pgref]() mutable {
-           if (m->finish_decode()) {
-             m->clear_payload();
-           }
-           if (is_pg_op()) {
-             return process_pg_op(pgref);
-           } else {
-             return process_op(pgref);
-           }
-         });
+           pg.osdmap_gate.wait_for_map(m->get_map_epoch()));
+       }).then([this, &pg](auto map) mutable {
+         return with_blocking_future(
+           handle.enter(pp(pg).wait_for_active));
+       }).then([this, &pg]() mutable {
+         return with_blocking_future(pg.wait_for_active_blocker.wait());
+       }).then([this, pgref=std::move(pgref)]() mutable {
+         if (m->finish_decode()) {
+           m->clear_payload();
+         }
+         if (is_pg_op()) {
+           return process_pg_op(pgref);
+         } else {
+           return process_op(pgref);
+         }
        });
+      }).then([] {
+       return seastar::stop_iteration::yes;
+      }).handle_exception_type([](crimson::common::actingset_changed& e) {
+       if (e.is_primary()) {
+         crimson::get_logger(ceph_subsys_osd).debug(
+             "operation restart, acting set changed");
+         return seastar::stop_iteration::no;
+       } else {
+         crimson::get_logger(ceph_subsys_osd).debug(
+             "operation abort, up primary changed");
+         return seastar::stop_iteration::yes;
+       }
+      });
     });
   });
 }
@@ -99,7 +110,7 @@ seastar::future<> ClientRequest::process_pg_op(
   Ref<PG> &pg)
 {
   return pg->do_pg_ops(m)
-    .then([this](Ref<MOSDOpReply> reply) {
+    .then([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) {
       return conn->send(reply);
     });
 }
@@ -110,7 +121,7 @@ seastar::future<> ClientRequest::process_op(
   PG& pg = *pgref;
   return with_blocking_future(
     handle.enter(pp(pg).recover_missing)
-  ).then([this, &pg, pgref=std::move(pgref)] {
+  ).then([this, &pg, pgref] {
     eversion_t ver;
     const hobject_t& soid = m->get_hobj();
     if (pg.is_unreadable_object(soid, &ver)) {
@@ -136,7 +147,7 @@ seastar::future<> ClientRequest::process_op(
          return conn->send(reply);
        });
       });
-  }).safe_then([] {
+  }).safe_then([pgref=std::move(pgref)] {
     return seastar::now();
   }, PG::load_obc_ertr::all_same_way([](auto &code) {
     logger().error("ClientRequest saw error code {}", code);
index 6f2da55c4f66f6d82f5c76ccd4d01a0dc1d3dc70..6935123bfad9c6454a048f74bf4f7cc571a79e16 100644 (file)
@@ -267,6 +267,7 @@ void PG::on_activate_complete()
       get_osdmap_epoch(),
       PeeringState::AllReplicasRecovered{});
   }
+  backend->on_activate_complete();
 }
 
 void PG::prepare_write(pg_info_t &info,
@@ -918,4 +919,9 @@ seastar::future<> PG::stop()
   });
 }
 
+void PG::on_change(ceph::os::Transaction &t) {
+  recovery_backend->on_peering_interval_change(t);
+  backend->on_actingset_changed({ is_primary() });
+}
+
 }
index d4beaf9a52b5862213dcfa878099cf7df8aca197..d4a47549c5c64a434db8d7a310093ef870ba555f 100644 (file)
@@ -302,9 +302,7 @@ public:
   void on_role_change() final {
     // Not needed yet
   }
-  void on_change(ceph::os::Transaction &t) final {
-    recovery_backend->on_peering_interval_change(t);
-  }
+  void on_change(ceph::os::Transaction &t) final;
   void on_activate(interval_set<snapid_t> to_trim) final;
   void on_activate_complete() final;
   void on_new_interval() final {
index f203a72f114f36f8c4215791219e4d2b492d46ca..9eef59a2ddbc411a79035ac03620ea13a62c9c12 100644 (file)
@@ -129,6 +129,9 @@ PGBackend::mutate_object(
   epoch_t map_epoch,
   std::vector<pg_log_entry_t>&& log_entries)
 {
+  if (__builtin_expect((bool)peering, false)) {
+    throw crimson::common::actingset_changed(peering->is_primary);
+  }
   logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
   if (obc->obs.exists) {
 #if 0
@@ -693,3 +696,7 @@ PGBackend::fiemap(
   return store->fiemap(c, oid, off, len);
 }
 
+void PGBackend::on_activate_complete() {
+  peering.reset();
+}
+
index bb2637fb328a243a33e668b02e6c1c3ff38fe74d..4a1442ab7e7585601cd94362b3f29e33adc40acf 100644 (file)
@@ -137,11 +137,17 @@ public:
 
   virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
   virtual seastar::future<> stop() = 0;
+  struct peering_info_t {
+    bool is_primary;
+  };
+  virtual void on_actingset_changed(peering_info_t pi) = 0;
+  virtual void on_activate_complete();
 protected:
   const shard_id_t shard;
   CollectionRef coll;
   crimson::os::FuturizedStore* store;
   bool stopping = false;
+  std::optional<peering_info_t> peering;
 public:
   struct loaded_object_md_t {
     ObjectState os;
index 780d6a746195b8281ddc37622a88f67ad515ac9d..51fe69394fa65085302e36cc63932571c627ad10 100644 (file)
@@ -50,11 +50,14 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
   if (__builtin_expect(stopping, false)) {
     throw crimson::common::system_shutdown_exception();
   }
+  if (__builtin_expect((bool)peering, false)) {
+    throw crimson::common::actingset_changed(peering->is_primary);
+  }
 
   const ceph_tid_t tid = next_txn_id++;
   auto req_id = osd_op_p.req->get_reqid();
   auto pending_txn =
-    pending_trans.emplace(tid, pending_on_t{pg_shards.size()}).first;
+    pending_trans.emplace(tid, pg_shards.size()).first;
   bufferlist encoded_txn;
   encode(txn, encoded_txn);
 
@@ -78,11 +81,17 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
         // TODO: set more stuff. e.g., pg_states
         return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
       }
-    }).then([&peers=pending_txn->second] {
-      if (--peers.pending == 0) {
-        peers.all_committed.set_value();
+    }).then([this, peers=pending_txn->second.weak_from_this()] {
+      if (!peers) {
+       // for now, only actingset_changed can cause peers
+       // to be nullptr
+       assert(peering);
+       throw crimson::common::actingset_changed(peering->is_primary);
       }
-      return peers.all_committed.get_future();
+      if (--peers->pending == 0) {
+        peers->all_committed.set_value();
+      }
+      return peers->all_committed.get_future();
     }).then([pending_txn, this] {
       pending_txn->second.all_committed = {};
       auto acked_peers = std::move(pending_txn->second.acked_peers);
@@ -91,6 +100,15 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
     });
 }
 
+void ReplicatedBackend::on_actingset_changed(peering_info_t pi)
+{
+  peering.emplace(pi);
+  crimson::common::actingset_changed e_actingset_changed{peering->is_primary};
+  for (auto& [tid, pending_txn] : pending_trans) {
+    pending_txn.all_committed.set_exception(e_actingset_changed);
+  }
+}
+
 void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply)
 {
   auto found = pending_trans.find(reply.get_tid());
@@ -118,5 +136,6 @@ seastar::future<> ReplicatedBackend::stop()
     pending_on.all_committed.set_exception(
        crimson::common::system_shutdown_exception());
   }
+  pending_trans.clear();
   return seastar::now();
 }
index dbefb30300672a35869d8c4a0d2326a7830f5f91..01c0bba6490ef13ec88f31aa980b26938bd400a4 100644 (file)
@@ -5,6 +5,7 @@
 
 #include <boost/intrusive_ptr.hpp>
 #include <seastar/core/future.hh>
+#include <seastar/core/weak_ptr.hh>
 #include "include/buffer_fwd.h"
 #include "osd/osd_types.h"
 
@@ -23,6 +24,7 @@ public:
                    crimson::osd::ShardServices& shard_services);
   void got_rep_op_reply(const MOSDRepOpReply& reply) final;
   seastar::future<> stop() final;
+  void on_actingset_changed(peering_info_t pi) final;
 private:
   ll_read_errorator::future<ceph::bufferlist> _read(const hobject_t& hoid,
                                                    uint64_t off,
@@ -39,7 +41,8 @@ private:
   const pg_shard_t whoami;
   crimson::osd::ShardServices& shard_services;
   ceph_tid_t next_txn_id = 0;
-  struct pending_on_t {
+  class pending_on_t : public seastar::weakly_referencable<pending_on_t> {
+  public:
     pending_on_t(size_t pending)
       : pending{static_cast<unsigned>(pending)}
     {}