]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: replicate transaction to peers
authorKefu Chai <kchai@redhat.com>
Sat, 13 Jul 2019 02:39:48 +0000 (10:39 +0800)
committerKefu Chai <kchai@redhat.com>
Mon, 12 Aug 2019 10:01:46 +0000 (18:01 +0800)
* handle `MOSDRepOpReply` message in osd, and pass it all the way down
  to `PGBackend`.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/acked_peers.h [new file with mode: 0644]
src/crimson/osd/ec_backend.cc
src/crimson/osd/ec_backend.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_backend.h

diff --git a/src/crimson/osd/acked_peers.h b/src/crimson/osd/acked_peers.h
new file mode 100644 (file)
index 0000000..9019901
--- /dev/null
@@ -0,0 +1,25 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/version.hpp>
+#if BOOST_VERSION >= 106900
+#include <boost/container/small_vector.hpp>
+#else
+#include <vector>
+#endif
+
+namespace ceph::osd {
+  struct peer_shard_t {
+    pg_shard_t shard;
+    eversion_t last_complete_ondisk;
+  };
+#if BOOST_VERSION >= 106900
+  // small_vector is is_nothrow_move_constructible<> since 1.69
+  // 2 + 1 = 3, which is the default value of "osd_pool_default_size"
+  using acked_peers_t = boost::container::small_vector<peer_shard_t, 2>;
+#else
+  using acked_peers_t = std::vector<peer_shard_t>;
+#endif
+}
index 66f8dbdff7f841c9e871a4941fa3f080f2f6cbce..010b52121c7c419773b13bbab61c94b94db4e7b4 100644 (file)
@@ -1,12 +1,14 @@
 #include "ec_backend.h"
+
 #include "crimson/os/cyan_collection.h"
+#include "crimson/osd/shard_services.h"
 
 ECBackend::ECBackend(shard_id_t shard,
                      ECBackend::CollectionRef coll,
-                     ceph::os::FuturizedStore* store,
+                     ceph::osd::ShardServices& shard_services,
                      const ec_profile_t&,
                      uint64_t)
-  : PGBackend{shard, coll, store}
+  : PGBackend{shard, coll, &shard_services.get_store()}
 {
   // todo
 }
@@ -19,3 +21,15 @@ seastar::future<bufferlist> ECBackend::_read(const hobject_t& hoid,
   // todo
   return seastar::make_ready_future<bufferlist>();
 }
+
+seastar::future<ceph::osd::acked_peers_t>
+ECBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
+                               const hobject_t& hoid,
+                               ceph::os::Transaction&& txn,
+                               osd_reqid_t req_id,
+                               epoch_t min_epoch, epoch_t max_epoch,
+                               eversion_t ver)
+{
+  // todo
+  return seastar::make_ready_future<ceph::osd::acked_peers_t>();
+}
index 4bf6b113a79c74600c80c88c947b1d03b84b8bcb..107c048217b160daeebe5b34ff8812c5216887d2 100644 (file)
@@ -13,7 +13,8 @@ class ECBackend : public PGBackend
 {
 public:
   ECBackend(shard_id_t shard,
-           CollectionRef, ceph::os::FuturizedStore*,
+           CollectionRef coll,
+           ceph::osd::ShardServices& shard_services,
            const ec_profile_t& ec_profile,
            uint64_t stripe_width);
 private:
@@ -21,6 +22,13 @@ private:
                                          uint64_t off,
                                          uint64_t len,
                                          uint32_t flags) override;
+  seastar::future<ceph::osd::acked_peers_t>
+  _submit_transaction(std::set<pg_shard_t>&& pg_shards,
+                     const hobject_t& hoid,
+                     ceph::os::Transaction&& txn,
+                     osd_reqid_t req_id,
+                     epoch_t min_epoch, epoch_t max_epoch,
+                     eversion_t ver) final;
   CollectionRef coll;
   ceph::os::FuturizedStore* store;
 };
index 044df1a80d9a89bf80193ede750d94beb96cac7e..05fc4d06d11ef7e8a37b632a22bfc55672a11da3 100644 (file)
 #include "messages/MOSDMap.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDPGLog.h"
+#include "messages/MOSDRepOpReply.h"
 #include "messages/MPGStats.h"
 
+#include "os/Transaction.h"
+#include "osd/PGPeeringEvent.h"
+#include "osd/PeeringState.h"
+
 #include "crimson/mon/MonClient.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/Messenger.h"
 #include "crimson/os/cyan_collection.h"
 #include "crimson/os/cyan_object.h"
 #include "crimson/os/futurized_store.h"
-#include "os/Transaction.h"
 #include "crimson/osd/heartbeat.h"
 #include "crimson/osd/osd_meta.h"
 #include "crimson/osd/pg.h"
 #include "crimson/osd/pg_backend.h"
 #include "crimson/osd/pg_meta.h"
-#include "osd/PGPeeringEvent.h"
-#include "osd/PeeringState.h"
 #include "crimson/osd/osd_operations/compound_peering_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
 #include "crimson/osd/osd_operations/pg_advance_map.h"
@@ -477,6 +479,8 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m)
     return seastar::now();
   case MSG_OSD_PG_LOG:
     return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
+  case MSG_OSD_REPOPREPLY:
+    return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
   default:
     logger().info("{} unhandled message {}", __func__, *m);
     return seastar::now();
@@ -852,6 +856,18 @@ seastar::future<> OSD::handle_osd_op(ceph::net::Connection* conn,
   return seastar::now();
 }
 
+seastar::future<> OSD::handle_rep_op_reply(ceph::net::Connection* conn,
+                                          Ref<MOSDRepOpReply> m)
+{
+  const auto& pgs = pg_map.get_pgs();
+  if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) {
+    pg->second->handle_rep_op_reply(conn, *m);
+  } else {
+    logger().warn("stale reply: {}", *m);
+  }
+  return seastar::now();
+}
+
 bool OSD::should_restart() const
 {
   if (!osdmap->is_up(whoami)) {
index 6345b33dc7b33692e007e3edcb284abea1c8adfd..5578e268a50d04ab7fc636e386af8851f35108e0 100644 (file)
@@ -34,6 +34,7 @@
 
 class MOSDMap;
 class MOSDOp;
+class MOSDRepOpReply;
 class OSDMap;
 class OSDMeta;
 class Heartbeat;
@@ -164,6 +165,8 @@ private:
                                    Ref<MOSDMap> m);
   seastar::future<> handle_osd_op(ceph::net::Connection* conn,
                                  Ref<MOSDOp> m);
+  seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn,
+                                       Ref<MOSDRepOpReply> m);
   seastar::future<> handle_pg_log(ceph::net::Connection* conn,
                                  Ref<MOSDPGLog> m);
 
index 2b89a78513db6eb0c3bce47d7cb999d2adec6418..2efcc416b36ffe450fafc4730111484012d2a2c0 100644 (file)
@@ -80,10 +80,11 @@ PG::PG(
     osdmap{osdmap},
     backend(
       PGBackend::create(
-        pgid,
+       pgid.pgid,
+       pg_shard,
        pool,
        coll_ref,
-       &shard_services.get_store(),
+       shard_services,
        profile)),
     peering_state(
       shard_services.get_cct(),
@@ -411,6 +412,27 @@ seastar::future<bufferlist> PG::do_pgnls(bufferlist& indata,
   });
 }
 
+seastar::future<> PG::submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
+                                        ceph::os::Transaction&& txn,
+                                        const MOSDOp& req)
+{
+  epoch_t map_epoch = get_osdmap_epoch();
+  eversion_t at_version{map_epoch, projected_last_update.version + 1};
+  return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
+                               std::move(os),
+                               std::move(txn),
+                               req,
+                               peering_state.get_last_peering_reset(),
+                               map_epoch,
+                               at_version).then([this](auto acked) {
+    for (const auto& peer : acked) {
+      peering_state.update_peer_last_complete_ondisk(
+        peer.shard, peer.last_complete_ondisk);
+    }
+    return seastar::now();
+  });
+}
+
 seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
 {
   return seastar::do_with(std::move(m), ceph::os::Transaction{},
@@ -425,8 +447,11 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
         return do_osd_op(*pos, osd_op, txn);
       }).then([&txn,m,this,os=std::move(os)]() mutable {
         // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
-        return txn.empty() ? seastar::now()
-                           : backend->mutate_object(std::move(os), std::move(txn), *m);
+       if (txn.empty()) {
+         return seastar::now();
+       } else {
+         return submit_transaction(std::move(os), std::move(txn), *m);
+       }
       });
     }).then([m,this] {
       auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
@@ -459,4 +484,10 @@ seastar::future<> PG::handle_op(ceph::net::Connection* conn,
   });
 }
 
+void PG::handle_rep_op_reply(ceph::net::Connection* conn,
+                            const MOSDRepOpReply& m)
+{
+  backend->got_rep_op_reply(m);
+}
+
 }
index 5f3692b3729c4723d3122cd18ca54595556b10d3..21e724c2a009f4d7e28e41b36c6cff42630c0c8d 100644 (file)
@@ -417,6 +417,9 @@ public:
   void handle_initialize(PeeringCtx &rctx);
   seastar::future<> handle_op(ceph::net::Connection* conn,
                              Ref<MOSDOp> m);
+  void handle_rep_op_reply(ceph::net::Connection* conn,
+                          const MOSDRepOpReply& m);
+
   void print(std::ostream& os) const;
 
 private:
@@ -431,6 +434,9 @@ private:
   seastar::future<ceph::bufferlist> do_pgnls(ceph::bufferlist& indata,
                                             const std::string& nspace,
                                             uint64_t limit);
+  seastar::future<> submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
+                                      ceph::os::Transaction&& txn,
+                                      const MOSDOp& req);
 
 private:
   OSDMapGate osdmap_gate;
index c31e88539667e10b723c0dc265f39eba81c21125..fe1431d7f5d69e154dc50d506061aabd07f1b52b 100644 (file)
@@ -25,17 +25,19 @@ namespace {
   }
 }
 
-std::unique_ptr<PGBackend> PGBackend::create(const spg_t pgid,
+std::unique_ptr<PGBackend> PGBackend::create(pg_t pgid,
+                                            const pg_shard_t pg_shard,
                                              const pg_pool_t& pool,
                                             ceph::os::CollectionRef coll,
-                                             ceph::os::FuturizedStore* store,
+                                            ceph::osd::ShardServices& shard_services,
                                              const ec_profile_t& ec_profile)
 {
   switch (pool.type) {
   case pg_pool_t::TYPE_REPLICATED:
-    return std::make_unique<ReplicatedBackend>(pgid.shard, coll, store);
+    return std::make_unique<ReplicatedBackend>(pgid, pg_shard,
+                                              coll, shard_services);
   case pg_pool_t::TYPE_ERASURE:
-    return std::make_unique<ECBackend>(pgid.shard, coll, store,
+    return std::make_unique<ECBackend>(pg_shard.shard, coll, shard_services,
                                        std::move(ec_profile),
                                        pool.stripe_width);
   default:
@@ -157,11 +159,15 @@ PGBackend::_load_ss(const hobject_t& oid)
   });
 }
 
-seastar::future<>
+seastar::future<ceph::osd::acked_peers_t>
 PGBackend::mutate_object(
+  std::set<pg_shard_t> pg_shards,
   cached_os_t&& os,
   ceph::os::Transaction&& txn,
-  const MOSDOp& m)
+  const MOSDOp& m,
+  epoch_t min_epoch,
+  epoch_t map_epoch,
+  eversion_t ver)
 {
   logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
   if (os->exists) {
@@ -185,7 +191,8 @@ PGBackend::mutate_object(
     // reset cached ObjectState without enforcing eviction
     os->oi = object_info_t(os->oi.soid);
   }
-  return store->do_transaction(coll, std::move(txn));
+  return _submit_transaction(std::move(pg_shards), os->oi.soid, std::move(txn),
+                            m.get_reqid(), min_epoch, map_epoch, ver);
 }
 
 seastar::future<>
index 69380a64a4dfe175f058dff25763267fd6e97961..fbc2653d74c0be50b1e82904f2e34f30ea085cd1 100644 (file)
 
 #include "crimson/os/futurized_store.h"
 #include "crimson/os/cyan_collection.h"
+#include "crimson/osd/acked_peers.h"
 #include "crimson/common/shared_lru.h"
 #include "os/Transaction.h"
 #include "osd/osd_types.h"
 #include "osd/osd_internal_types.h"
 
 struct hobject_t;
+class MOSDRepOpReply;
+
+namespace ceph::osd {
+  class ShardServices;
+}
 
 class PGBackend
 {
@@ -26,10 +32,11 @@ protected:
 public:
   PGBackend(shard_id_t shard, CollectionRef coll, ceph::os::FuturizedStore* store);
   virtual ~PGBackend() = default;
-  static std::unique_ptr<PGBackend> create(const spg_t pgid,
+  static std::unique_ptr<PGBackend> create(pg_t pgid,
+                                          const pg_shard_t pg_shard,
                                           const pg_pool_t& pool,
                                           ceph::os::CollectionRef coll,
-                                          ceph::os::FuturizedStore* store,
+                                          ceph::osd::ShardServices& shard_services,
                                           const ec_profile_t& ec_profile);
   using cached_os_t = boost::local_shared_ptr<ObjectState>;
   seastar::future<cached_os_t> get_object_state(const hobject_t& oid);
@@ -51,14 +58,20 @@ public:
     ObjectState& os,
     const OSDOp& osd_op,
     ceph::os::Transaction& trans);
-  seastar::future<> mutate_object(
+  seastar::future<ceph::osd::acked_peers_t> mutate_object(
+    std::set<pg_shard_t> pg_shards,
     cached_os_t&& os,
     ceph::os::Transaction&& txn,
-    const MOSDOp& m);
+    const MOSDOp& m,
+    epoch_t min_epoch,
+    epoch_t map_epoch,
+    eversion_t ver);
   seastar::future<std::vector<hobject_t>, hobject_t> list_objects(
     const hobject_t& start,
     uint64_t limit);
 
+  virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
+
 protected:
   const shard_id_t shard;
   CollectionRef coll;
@@ -75,4 +88,11 @@ private:
                                            size_t length,
                                            uint32_t flags) = 0;
   bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn);
+  virtual seastar::future<ceph::osd::acked_peers_t>
+  _submit_transaction(std::set<pg_shard_t>&& pg_shards,
+                     const hobject_t& hoid,
+                     ceph::os::Transaction&& txn,
+                     osd_reqid_t req_id,
+                     epoch_t min_epoch, epoch_t max_epoch,
+                     eversion_t ver) = 0;
 };
index 2a6b9b7fc6e85ba81ff43aa13ae9b77e54a4e6e3..44ca03af29f1ffd429c5cb6400ec4b1ae94771b9 100644 (file)
@@ -1,13 +1,27 @@
 #include "replicated_backend.h"
 
+#include "messages/MOSDRepOpReply.h"
+
+#include "crimson/common/log.h"
 #include "crimson/os/cyan_collection.h"
 #include "crimson/os/cyan_object.h"
 #include "crimson/os/futurized_store.h"
+#include "crimson/osd/shard_services.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+}
 
-ReplicatedBackend::ReplicatedBackend(shard_id_t shard,
+ReplicatedBackend::ReplicatedBackend(pg_t pgid,
+                                     pg_shard_t whoami,
                                      ReplicatedBackend::CollectionRef coll,
-                                     ceph::os::FuturizedStore* store)
-  : PGBackend{shard, coll, store}
+                                     ceph::osd::ShardServices& shard_services)
+  : PGBackend{whoami.shard, coll, &shard_services.get_store()},
+    pgid{pgid},
+    whoami{whoami},
+    shard_services{shard_services}
 {}
 
 seastar::future<bufferlist> ReplicatedBackend::_read(const hobject_t& hoid,
@@ -17,3 +31,65 @@ seastar::future<bufferlist> ReplicatedBackend::_read(const hobject_t& hoid,
 {
   return store->read(coll, ghobject_t{hoid}, off, len, flags);
 }
+
+seastar::future<ceph::osd::acked_peers_t>
+ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
+                                       const hobject_t& hoid,
+                                       ceph::os::Transaction&& txn,
+                                       osd_reqid_t req_id,
+                                       epoch_t min_epoch, epoch_t map_epoch,
+                                       eversion_t ver)
+{
+  const ceph_tid_t tid = next_txn_id++;
+  auto pending_txn =
+    pending_trans.emplace(tid, pending_on_t{pg_shards.size()}).first;
+  bufferlist encoded_txn;
+  encode(txn, encoded_txn);
+
+  return seastar::parallel_for_each(std::move(pg_shards),
+    [=, encoded_txn=std::move(encoded_txn), txn=std::move(txn)]
+    (auto pg_shard) mutable {
+      if (pg_shard == whoami) {
+        return shard_services.get_store().do_transaction(coll,std::move(txn));
+      } else {
+        auto m = make_message<MOSDRepOp>(req_id, whoami,
+                                         spg_t{pgid, pg_shard.shard}, hoid,
+                                         CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
+                                         map_epoch, min_epoch,
+                                         tid, ver);
+        m->set_data(encoded_txn);
+        pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
+        // TODO: set more stuff. e.g., pg_states
+        return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
+      }
+    }).then([&peers=pending_txn->second] {
+      if (--peers.pending == 0) {
+        peers.all_committed.set_value();
+      }
+      return peers.all_committed.get_future();
+    }).then([tid, pending_txn, this] {
+      pending_txn->second.all_committed = {};
+      auto acked_peers = std::move(pending_txn->second.acked_peers);
+      pending_trans.erase(pending_txn);
+      return seastar::make_ready_future<ceph::osd::acked_peers_t>(std::move(acked_peers));
+    });
+}
+
+void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply)
+{
+  auto found = pending_trans.find(reply.get_tid());
+  if (found == pending_trans.end()) {
+    logger().warn("{}: no matched pending rep op: {}", __func__, reply);
+    return;
+  }
+  auto& peers = found->second;
+  for (auto& peer : peers.acked_peers) {
+    if (peer.shard == reply.from) {
+      peer.last_complete_ondisk = reply.get_last_complete_ondisk();
+      if (--peers.pending == 0) {
+        peers.all_committed.set_value();    
+      }
+      return;
+    }
+  }
+}
index e96060f948f15f9a96e1950b6dfe5eaf003fe697..ca9ccb1db4f291ad565a9dc4c14371c14412cf76 100644 (file)
@@ -7,17 +7,45 @@
 #include <seastar/core/future.hh>
 #include "include/buffer_fwd.h"
 #include "osd/osd_types.h"
+
+#include "acked_peers.h"
 #include "pg_backend.h"
 
+namespace ceph::osd {
+  class ShardServices;
+}
+
 class ReplicatedBackend : public PGBackend
 {
 public:
-  ReplicatedBackend(shard_id_t shard,
+  ReplicatedBackend(pg_t pgid, pg_shard_t whoami,
                    CollectionRef coll,
-                   ceph::os::FuturizedStore* store);
+                   ceph::osd::ShardServices& shard_services);
+  void got_rep_op_reply(const MOSDRepOpReply& reply) final;
 private:
   seastar::future<ceph::bufferlist> _read(const hobject_t& hoid,
                                          uint64_t off,
                                          uint64_t len,
                                          uint32_t flags) override;
+  seastar::future<ceph::osd::acked_peers_t>
+  _submit_transaction(std::set<pg_shard_t>&& pg_shards,
+                     const hobject_t& hoid,
+                     ceph::os::Transaction&& txn,
+                     osd_reqid_t req_id,
+                     epoch_t min_epoch, epoch_t max_epoch,
+                     eversion_t ver) final;
+  const pg_t pgid;
+  const pg_shard_t whoami;
+  ceph::osd::ShardServices& shard_services;
+  ceph_tid_t next_txn_id = 0;
+  struct pending_on_t {
+    pending_on_t(size_t pending)
+      : pending{static_cast<unsigned>(pending)}
+    {}
+    unsigned pending;
+    ceph::osd::acked_peers_t acked_peers;
+    seastar::promise<> all_committed;
+  };
+  using pending_transactions_t = std::map<ceph_tid_t, pending_on_t>;
+  pending_transactions_t pending_trans;
 };