]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson: add pglog related logic to crimson's data write path
authorXuehan Xu <xxhdx1985126@163.com>
Mon, 10 Feb 2020 15:16:08 +0000 (23:16 +0800)
committerXuehan Xu <xxhdx1985126@163.com>
Wed, 19 Feb 2020 08:05:24 +0000 (16:05 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@163.com>
12 files changed:
src/crimson/osd/ec_backend.cc
src/crimson/osd/ec_backend.h
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h
src/crimson/osd/osd_operations/osdop_params.h [new file with mode: 0644]
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_backend.h
src/osd/PeeringState.h

index 911dc250e8a09df72d5917cac48ad63a38f9787a..c6516d50a8e8fdc60f0d28594c6127114a8cde1e 100644 (file)
@@ -26,9 +26,9 @@ seastar::future<crimson::osd::acked_peers_t>
 ECBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
                                const hobject_t& hoid,
                                ceph::os::Transaction&& txn,
-                               osd_reqid_t req_id,
+                               const osd_op_params_t& osd_op_p,
                                epoch_t min_epoch, epoch_t max_epoch,
-                               eversion_t ver)
+                              std::vector<pg_log_entry_t>&& log_entries)
 {
   // todo
   return seastar::make_ready_future<crimson::osd::acked_peers_t>();
index c7548de0749ab053ed0c57666ac22e3b4bd5d56b..d9451d2296b6e4d9ae9703bee357ffeaeea7f87e 100644 (file)
@@ -26,9 +26,9 @@ private:
   _submit_transaction(std::set<pg_shard_t>&& pg_shards,
                      const hobject_t& hoid,
                      ceph::os::Transaction&& txn,
-                     osd_reqid_t req_id,
+                     const osd_op_params_t& req,
                      epoch_t min_epoch, epoch_t max_epoch,
-                     eversion_t ver) final;
+                     std::vector<pg_log_entry_t>&& log_entries) final;
   CollectionRef coll;
   crimson::os::FuturizedStore* store;
 };
index c3d797e581598bbf3ee5b88eae8c163120a536f0..748f98027eb174841901ac0ec6d9e7f5f4a2594d 100644 (file)
@@ -669,25 +669,25 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op)
   case CEPH_OSD_OP_CREATE:
     return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
       return backend.create(os, osd_op, txn);
-    });
+    }, true);
   case CEPH_OSD_OP_WRITE:
     return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
       return backend.write(os, osd_op, txn);
-    });
+    }, true);
   case CEPH_OSD_OP_WRITEFULL:
     return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
       return backend.writefull(os, osd_op, txn);
-    });
+    }, true);
   case CEPH_OSD_OP_SETALLOCHINT:
     return osd_op_errorator::now();
   case CEPH_OSD_OP_SETXATTR:
     return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
       return backend.setxattr(os, osd_op, txn);
-    });
+    }, true);
   case CEPH_OSD_OP_DELETE:
     return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
       return backend.remove(os, txn);
-    });
+    }, true);
   case CEPH_OSD_OP_CALL:
     return this->do_op_call(osd_op);
   case CEPH_OSD_OP_STAT:
@@ -722,13 +722,13 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op)
 #endif
     return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
       return backend.omap_set_vals(os, osd_op, txn);
-    });
+    }, true);
 
   // watch/notify
   case CEPH_OSD_OP_WATCH:
     return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
       return do_op_watch(osd_op, os, txn);
-    });
+    }, false);
   case CEPH_OSD_OP_NOTIFY:
     return do_read_op([this, &osd_op] (auto&, const auto& os) {
       return do_op_notify(osd_op, os);
index a66df94a502986168c3bd8f1bd7ca894e3c2d584..11ee868e0aae3d2d1e33da56861fa0ea9be6c073 100644 (file)
@@ -83,6 +83,7 @@ private:
   PG& pg;
   PGBackend& backend;
   Ref<MOSDOp> msg;
+  bool user_modify = false;
   ceph::os::Transaction txn;
 
   size_t num_read = 0;    ///< count read ops
@@ -145,8 +146,9 @@ private:
   }
 
   template <class Func>
-  auto do_write_op(Func&& f) {
+  auto do_write_op(Func&& f, bool um) {
     ++num_write;
+    user_modify = um;
     return std::forward<Func>(f)(backend, obc->obs, txn);
   }
 
@@ -220,10 +222,21 @@ auto OpsExecuter::with_effect_on_obc(
 
 template <typename Func>
 OpsExecuter::osd_op_errorator::future<> OpsExecuter::submit_changes(Func&& f) && {
+  assert(obc);
+  osd_op_params_t osd_op_params(std::move(msg));
+  eversion_t at_version = pg.next_version();
+
+  osd_op_params.at_version = at_version;
+  osd_op_params.pg_trim_to = pg.get_pg_trim_to();
+  osd_op_params.min_last_complete_ondisk = pg.get_min_last_complete_ondisk();
+  osd_op_params.last_complete = pg.get_info().last_complete;
+  if (user_modify)
+    osd_op_params.user_at_version = at_version.version;
+
   if (__builtin_expect(op_effects.empty(), true)) {
-    return std::forward<Func>(f)(std::move(txn), std::move(obc));
+    return std::forward<Func>(f)(std::move(txn), std::move(obc), std::move(osd_op_params));
   }
-  return std::forward<Func>(f)(std::move(txn), std::move(obc)).safe_then([this] {
+  return std::forward<Func>(f)(std::move(txn), std::move(obc), std::move(osd_op_params)).safe_then([this] {
     // let's do the cleaning of `op_effects` in destructor
     return crimson::do_for_each(op_effects, [] (auto& op_effect) {
       return op_effect->execute();
diff --git a/src/crimson/osd/osd_operations/osdop_params.h b/src/crimson/osd/osd_operations/osdop_params.h
new file mode 100644 (file)
index 0000000..b50fb2b
--- /dev/null
@@ -0,0 +1,25 @@
+#pragma once
+
+#include "messages/MOSDOp.h"
+#include "osd/osd_types.h"
+#include "crimson/common/type_helpers.h"
+
+// The fields in this struct are parameters that may be needed in multiple
+// level of processing. I inclosed all those parameters in this struct to
+// avoid passing each of them as a method parameter.
+struct osd_op_params_t {
+  Ref<MOSDOp> req;
+  eversion_t at_version;
+  eversion_t pg_trim_to;
+  eversion_t min_last_complete_ondisk;
+  eversion_t last_complete;
+  version_t user_at_version;
+  bool user_modify = false;
+
+  osd_op_params_t(Ref<MOSDOp>&& req) : req(req) {}
+  osd_op_params_t(Ref<MOSDOp>&& req, eversion_t at_version, eversion_t pg_trim_to,
+      eversion_t mlcod, eversion_t lc, version_t user_at_version) :
+    req(req), at_version(at_version), pg_trim_to(pg_trim_to),
+    min_last_complete_ondisk(mlcod), last_complete(lc),
+    user_at_version(user_at_version) {}
+};
index 991002b90b2075387a055208957b0d16fd29ddf9..7a651ff22c7c832c2e7a49d056ac2db57f85e092 100644 (file)
@@ -36,6 +36,7 @@
 #include "crimson/osd/pg_meta.h"
 #include "crimson/osd/pg_backend.h"
 #include "crimson/osd/ops_executer.h"
+#include "crimson/osd/osd_operations/osdop_params.h"
 #include "crimson/osd/osd_operations/peering_event.h"
 
 namespace {
@@ -487,21 +488,33 @@ blocking_future<> PG::WaitForActiveBlocker::wait()
 
 seastar::future<> PG::submit_transaction(ObjectContextRef&& obc,
                                         ceph::os::Transaction&& txn,
-                                        const MOSDOp& req)
+                                        const osd_op_params_t& osd_op_p)
 {
   epoch_t map_epoch = get_osdmap_epoch();
-  eversion_t at_version{map_epoch, projected_last_update.version + 1};
+
+  std::vector<pg_log_entry_t> log_entries;
+  log_entries.emplace_back(obc->obs.exists ?
+                     pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
+                   obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
+                   osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
+                   osd_op_p.req->get_reqid(), osd_op_p.req->get_mtime(), 0);
+  peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
+                                               txn, true, false);
+
   return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
                                std::move(obc),
                                std::move(txn),
-                               req,
+                               std::move(osd_op_p),
                                peering_state.get_last_peering_reset(),
                                map_epoch,
-                               at_version).then([this](auto acked) {
+                               std::move(log_entries)).then(
+    [this, last_complete=peering_state.get_info().last_complete,
+      at_version=osd_op_p.at_version](auto acked) {
     for (const auto& peer : acked) {
       peering_state.update_peer_last_complete_ondisk(
         peer.shard, peer.last_complete_ondisk);
     }
+    peering_state.complete_write(at_version, last_complete);
     return seastar::now();
   });
 }
@@ -515,6 +528,7 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
                                                    : m->get_hobj();
   auto ox =
     std::make_unique<OpsExecuter>(obc, *this/* as const& */, m);
+
   return crimson::do_for_each(
     m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) {
     logger().debug(
@@ -528,10 +542,10 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
       "do_osd_ops: {} - object {} all operations successful",
       *m,
       obc->obs.oi.soid);
-    return std::move(*ox).submit_changes(
-      [this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> {
-        // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
-        if (txn.empty()) {
+    return std::move(*ox).submit_changes([this, m]
+      (auto&& txn, auto&& obc, auto&& osd_op_p) -> osd_op_errorator::future<> {
+       // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
+       if (txn.empty()) {
          logger().debug(
            "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
            *m,
@@ -542,8 +556,9 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
            "do_osd_ops: {} - object {} submitting txn",
            *m,
            obc->obs.oi.soid);
-          return submit_transaction(std::move(obc), std::move(txn), *m);
-        }
+          return submit_transaction(std::move(obc), std::move(txn),
+                                   std::move(osd_op_p));
+        }
       });
   }).safe_then([m, obc, this, ox_deleter = std::move(ox)] {
     auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
@@ -787,6 +802,11 @@ seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
   ceph::os::Transaction txn;
   auto encoded_txn = req->get_data().cbegin();
   decode(txn, encoded_txn);
+  auto p = req->logbl.cbegin();
+  std::vector<pg_log_entry_t> log_entries;
+  decode(log_entries, p);
+  peering_state.append_log(std::move(log_entries), req->pg_trim_to,
+      req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false);
   return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
     .then([req, lcod=peering_state.get_info().last_complete, this] {
       peering_state.update_last_complete_ondisk(lcod);
index 1ced9b22e77d94568a5c790dd94ccf5a64935015..36cf22073ee57b30ce727ec9f6dcb1a64bc42f7a 100644 (file)
@@ -30,6 +30,8 @@ class OSDMap;
 class MQuery;
 class PGBackend;
 class PGPeeringEvent;
+class osd_op_params_t;
+
 namespace recovery {
   class Context;
 }
@@ -94,6 +96,18 @@ public:
     return peering_state.get_osdmap_epoch();
   }
 
+  eversion_t get_pg_trim_to() const {
+    return peering_state.get_pg_trim_to();
+  }
+
+  eversion_t get_min_last_complete_ondisk() const {
+    return peering_state.get_min_last_complete_ondisk();
+  }
+
+  const pg_info_t& get_info() const {
+    return peering_state.get_info();
+  }
+
   // DoutPrefixProvider
   std::ostream& gen_prefix(std::ostream& out) const final {
     return out << *this;
@@ -474,7 +488,7 @@ private:
                                             uint64_t limit);
   seastar::future<> submit_transaction(ObjectContextRef&& obc,
                                       ceph::os::Transaction&& txn,
-                                      const MOSDOp& req);
+                                      const osd_op_params_t& oop);
 
 private:
   OSDMapGate osdmap_gate;
@@ -484,6 +498,10 @@ private:
 
 public:
   cached_map_t get_osdmap() { return osdmap; }
+  eversion_t next_version() {
+    return eversion_t(projected_last_update.epoch,
+                     ++projected_last_update.version);
+  }
 
 private:
   std::unique_ptr<PGBackend> backend;
index 3394daeaccf6679e6e87174df85cd7ffc6bf9f8a..72f32f61881692c66841f8216f7668070fddefd4 100644 (file)
@@ -119,10 +119,10 @@ PGBackend::mutate_object(
   std::set<pg_shard_t> pg_shards,
   crimson::osd::ObjectContextRef &&obc,
   ceph::os::Transaction&& txn,
-  const MOSDOp& m,
+  const osd_op_params_t& osd_op_p,
   epoch_t min_epoch,
   epoch_t map_epoch,
-  eversion_t ver)
+  std::vector<pg_log_entry_t>&& log_entries)
 {
   logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
   if (obc->obs.exists) {
@@ -131,8 +131,13 @@ PGBackend::mutate_object(
     obc->obs.oi.prior_version = ctx->obs->oi.version;
 #endif
 
-    obc->obs.oi.last_reqid = m.get_reqid();
-    obc->obs.oi.mtime = m.get_mtime();
+    auto& m = osd_op_p.req;
+    obc->obs.oi.prior_version = obc->obs.oi.version;
+    obc->obs.oi.version = osd_op_p.at_version;
+    if (osd_op_p.user_at_version > obc->obs.oi.user_version)
+      obc->obs.oi.user_version = osd_op_p.user_at_version;
+    obc->obs.oi.last_reqid = m->get_reqid();
+    obc->obs.oi.mtime = m->get_mtime();
     obc->obs.oi.local_mtime = ceph_clock_now();
 
     // object_info_t
@@ -148,7 +153,7 @@ PGBackend::mutate_object(
   }
   return _submit_transaction(
     std::move(pg_shards), obc->obs.oi.soid, std::move(txn),
-    m.get_reqid(), min_epoch, map_epoch, ver);
+    std::move(osd_op_p), min_epoch, map_epoch, std::move(log_entries));
 }
 
 static inline bool _read_verify_data(
index d72997eff66ab51697c1496f4a39a9665e752326..28b39c95c507db5dc67569cbb5bf8fdd1ed1c586 100644 (file)
@@ -14,6 +14,7 @@
 #include "crimson/common/shared_lru.h"
 #include "osd/osd_types.h"
 #include "crimson/osd/object_context.h"
+#include "crimson/osd/osd_operations/osdop_params.h"
 
 struct hobject_t;
 class MOSDRepOpReply;
@@ -79,10 +80,10 @@ public:
     std::set<pg_shard_t> pg_shards,
     crimson::osd::ObjectContextRef &&obc,
     ceph::os::Transaction&& txn,
-    const MOSDOp& m,
+    const osd_op_params_t& osd_op_p,
     epoch_t min_epoch,
     epoch_t map_epoch,
-    eversion_t ver);
+    std::vector<pg_log_entry_t>&& log_entries);
   seastar::future<std::vector<hobject_t>, hobject_t> list_objects(
     const hobject_t& start,
     uint64_t limit) const;
@@ -143,7 +144,7 @@ private:
   _submit_transaction(std::set<pg_shard_t>&& pg_shards,
                      const hobject_t& hoid,
                      ceph::os::Transaction&& txn,
-                     osd_reqid_t req_id,
+                     const osd_op_params_t& osd_op_p,
                      epoch_t min_epoch, epoch_t max_epoch,
-                     eversion_t ver) = 0;
+                     std::vector<pg_log_entry_t>&& log_entries) = 0;
 };
index 18c98faeadb2ed92d2b9469fb7c43ab1e8397815..b1b311e61aae4abec64cd3425b5c747f9aca52eb 100644 (file)
@@ -6,6 +6,8 @@
 #include "crimson/os/cyanstore/cyan_object.h"
 #include "crimson/os/futurized_store.h"
 #include "crimson/osd/shard_services.h"
+#include "crimson/osd/pg.h"
+#include "osd/PeeringState.h"
 
 namespace {
   seastar::logger& logger() {
@@ -36,11 +38,12 @@ seastar::future<crimson::osd::acked_peers_t>
 ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
                                        const hobject_t& hoid,
                                        ceph::os::Transaction&& txn,
-                                       osd_reqid_t req_id,
+                                       const osd_op_params_t& osd_op_p,
                                        epoch_t min_epoch, epoch_t map_epoch,
-                                       eversion_t ver)
+                                      std::vector<pg_log_entry_t>&& log_entries)
 {
   const ceph_tid_t tid = next_txn_id++;
+  auto req_id = osd_op_p.req->get_reqid();
   auto pending_txn =
     pending_trans.emplace(tid, pending_on_t{pg_shards.size()}).first;
   bufferlist encoded_txn;
@@ -56,9 +59,13 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
                                          spg_t{pgid, pg_shard.shard}, hoid,
                                          CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
                                          map_epoch, min_epoch,
-                                         tid, ver);
+                                         tid, osd_op_p.at_version);
         m->set_data(encoded_txn);
         pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
+       encode(log_entries, m->logbl);
+       m->pg_trim_to = osd_op_p.pg_trim_to;
+       m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk;
+       m->set_rollback_to(osd_op_p.at_version);
         // TODO: set more stuff. e.g., pg_states
         return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
       }
index c61d2b88d0ef4842cef385b8c3615c4e31af585b..53a0038da8a6b8addaf414d63e7998e127b9bfa1 100644 (file)
@@ -31,9 +31,9 @@ private:
   _submit_transaction(std::set<pg_shard_t>&& pg_shards,
                      const hobject_t& hoid,
                      ceph::os::Transaction&& txn,
-                     osd_reqid_t req_id,
+                     const osd_op_params_t& osd_op_p,
                      epoch_t min_epoch, epoch_t max_epoch,
-                     eversion_t ver) final;
+                     std::vector<pg_log_entry_t>&& log_entries) final;
   const pg_t pgid;
   const pg_shard_t whoami;
   crimson::osd::ShardServices& shard_services;
index 03faa5c77342cfc794c484c8bfb5591d5321a320..9b8f3883bfcebf4c983399d815d631f789801bbf 100644 (file)
@@ -1765,6 +1765,17 @@ public:
     std::optional<eversion_t> trim_to,
     std::optional<eversion_t> roll_forward_to);
 
+  void append_log_with_trim_to_updated(
+    std::vector<pg_log_entry_t>&& log_entries,
+    eversion_t roll_forward_to,
+    ObjectStore::Transaction &t,
+    bool transaction_applied,
+    bool async) {
+    update_trim_to();
+    append_log(std::move(log_entries), pg_trim_to, roll_forward_to,
+       min_last_complete_ondisk, t, transaction_applied, async);
+  }
+
   /**
    * Updates local log to reflect new write from primary.
    */