]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: optimize crimson-osd's client requests process parallelism
authorXuehan Xu <xxhdx1985126@gmail.com>
Thu, 11 Mar 2021 05:21:31 +0000 (13:21 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Sun, 16 May 2021 06:47:56 +0000 (14:47 +0800)
Make client requests go to the concurrent pipeline stage "wait_repop" once they
are "submitted" to the underlying objectstore, which means their on-disk order
is guaranteed, so that successive client requests can go into the "process"
pipeline stage.

Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
12 files changed:
src/crimson/osd/ec_backend.cc
src/crimson/osd/ec_backend.h
src/crimson/osd/ops_executer.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/internal_client_request.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h
src/crimson/osd/recovery_backend.cc
src/crimson/osd/replicated_backend.cc
src/crimson/osd/replicated_backend.h

index 5cc2ffaf5a8a5492cd39d267ee41a392b8b28f23..9ea0d1e6564a16ffe426dd7010d9de04773ebe9c 100644 (file)
@@ -22,7 +22,7 @@ ECBackend::_read(const hobject_t& hoid,
   return seastar::make_ready_future<bufferlist>();
 }
 
-ECBackend::interruptible_future<crimson::osd::acked_peers_t>
+ECBackend::rep_op_fut_t
 ECBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
                                const hobject_t& hoid,
                                ceph::os::Transaction&& txn,
@@ -31,5 +31,6 @@ ECBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
                               std::vector<pg_log_entry_t>&& log_entries)
 {
   // todo
-  return seastar::make_ready_future<crimson::osd::acked_peers_t>();
+  return {seastar::now(),
+         seastar::make_ready_future<crimson::osd::acked_peers_t>()};
 }
index 44da609cb8e60a84b230bfa58b2991988d9266c5..4b736622b4c1886bc022b81a0c323425b7f7e8ed 100644 (file)
@@ -24,7 +24,7 @@ public:
 private:
   ll_read_ierrorator::future<ceph::bufferlist>
   _read(const hobject_t& hoid, uint64_t off, uint64_t len, uint32_t flags) override;
-  interruptible_future<crimson::osd::acked_peers_t>
+  rep_op_fut_t
   _submit_transaction(std::set<pg_shard_t>&& pg_shards,
                      const hobject_t& hoid,
                      ceph::os::Transaction&& txn,
index d9d854b8780cb18e4eec050a4013e020b0baef98..d78354a4e2451c49a8302e17f6a384077e9a8dde 100644 (file)
@@ -261,8 +261,12 @@ public:
   interruptible_errorated_future<osd_op_errorator>
   execute_op(OSDOp& osd_op);
 
+  using rep_op_fut_tuple =
+    std::tuple<interruptible_future<>, osd_op_ierrorator::future<>>;
+  using rep_op_fut_t =
+    interruptible_future<rep_op_fut_tuple>;
   template <typename MutFunc>
-  osd_op_ierrorator::future<> flush_changes_n_do_ops_effects(
+  rep_op_fut_t flush_changes_n_do_ops_effects(
     Ref<PG> pg,
     MutFunc&& mut_func) &&;
 
@@ -325,32 +329,42 @@ auto OpsExecuter::with_effect_on_obc(
 }
 
 template <typename MutFunc>
-OpsExecuter::osd_op_ierrorator::future<>
+OpsExecuter::rep_op_fut_t
 OpsExecuter::flush_changes_n_do_ops_effects(Ref<PG> pg, MutFunc&& mut_func) &&
 {
   const bool want_mutate = !txn.empty();
   // osd_op_params are instantiated by every wr-like operation.
   assert(osd_op_params || !want_mutate);
   assert(obc);
-  auto maybe_mutated = interruptor::make_interruptible(osd_op_errorator::now());
+  rep_op_fut_t maybe_mutated =
+    interruptor::make_ready_future<rep_op_fut_tuple>(
+       seastar::now(),
+       interruptor::make_interruptible(osd_op_errorator::now()));
   if (want_mutate) {
     osd_op_params->req_id = msg->get_reqid();
     osd_op_params->mtime = msg->get_mtime();
-    maybe_mutated = std::forward<MutFunc>(mut_func)(std::move(txn),
+    auto [submitted, all_completed] = std::forward<MutFunc>(mut_func)(std::move(txn),
                                                     std::move(obc),
                                                     std::move(*osd_op_params),
                                                     user_modify);
+    maybe_mutated = interruptor::make_ready_future<rep_op_fut_tuple>(
+       std::move(submitted),
+       osd_op_ierrorator::future<>(std::move(all_completed)));
   }
   if (__builtin_expect(op_effects.empty(), true)) {
     return maybe_mutated;
   } else {
-    return maybe_mutated.safe_then_interruptible([pg=std::move(pg),
-                                                  this] () mutable {
-      // let's do the cleaning of `op_effects` in destructor
-      return interruptor::do_for_each(op_effects,
-                                      [pg=std::move(pg)] (auto& op_effect) {
-        return op_effect->execute(pg);
-      });
+    return maybe_mutated.then_unpack_interruptible(
+      [this, pg=std::move(pg)](auto&& submitted, auto&& all_completed) mutable {
+      return interruptor::make_ready_future<rep_op_fut_tuple>(
+         std::move(submitted),
+         all_completed.safe_then_interruptible([this, pg=std::move(pg)] {
+           // let's do the cleaning of `op_effects` in destructor
+           return interruptor::do_for_each(op_effects,
+             [pg=std::move(pg)](auto& op_effect) {
+             return op_effect->execute(pg);
+           });
+         }));
     });
   }
 }
index d178256e4d53a595d253155af11a2fde053d9098..b7ac62044f9b001bdec1f39f8299a75d7404e0ad 100644 (file)
@@ -202,9 +202,25 @@ ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
       return conn->send(std::move(reply));
     }
   }
-  return pg->do_osd_ops(m, obc, op_info).safe_then_interruptible(
-      [this](Ref<MOSDOpReply> reply) -> interruptible_future<> {
-    return conn->send(std::move(reply));
+  return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible(
+    [this, pg](auto submitted, auto all_completed) mutable {
+    return submitted.then_interruptible(
+      [this, pg] {
+        return with_blocking_future_interruptible<IOInterruptCondition>(
+            handle.enter(pp(*pg).wait_repop));
+    }).then_interruptible(
+      [this, pg, all_completed=std::move(all_completed)]() mutable {
+      return all_completed.safe_then_interruptible(
+        [this, pg](Ref<MOSDOpReply> reply) {
+        return with_blocking_future_interruptible<IOInterruptCondition>(
+            handle.enter(pp(*pg).send_reply)).then_interruptible(
+              [this, reply=std::move(reply)] {
+              return conn->send(std::move(reply));
+            });
+      }, crimson::ct_error::eagain::handle([this, pg]() mutable {
+        return process_op(pg);
+      }));
+    });
   }, crimson::ct_error::eagain::handle([this, pg]() mutable {
     return process_op(pg);
   }));
index 9ea028f2e4dd5be28505c65d14bf001e5049b155..fcc64c545a404ed87cda5ad12ffd28d25ef26c1c 100644 (file)
@@ -81,9 +81,12 @@ seastar::future<> InternalClientRequest::start()
                     [] (const std::error_code& e) {
                       return PG::do_osd_ops_iertr::now();
                     }
-                  ).safe_then_interruptible(
-                    [] {
-                      return interruptor::now();
+                  ).safe_then_unpack_interruptible(
+                    [](auto submitted, auto all_completed) {
+                      return all_completed.handle_error_interruptible(
+                        crimson::ct_error::eagain::handle([] {
+                          return seastar::now();
+                        }));
                     }, crimson::ct_error::eagain::handle([] {
                       return interruptor::now();
                     })
index 35b41fbc2944b87df9fbe9f5dbf2b16887cffbcd..3ff19636d78290dcecc42c9caead46a92b420ff3 100644 (file)
@@ -566,7 +566,9 @@ seastar::future<> PG::WaitForActiveBlocker::stop()
   return seastar::now();
 }
 
-PG::interruptible_future<> PG::submit_transaction(
+std::tuple<PG::interruptible_future<>,
+           PG::interruptible_future<>>
+PG::submit_transaction(
   const OpInfo& op_info,
   const std::vector<OSDOp>& ops,
   ObjectContextRef&& obc,
@@ -574,8 +576,9 @@ PG::interruptible_future<> PG::submit_transaction(
   osd_op_params_t&& osd_op_p)
 {
   if (__builtin_expect(stopping, false)) {
-    return seastar::make_exception_future<>(
-       crimson::common::system_shutdown_exception());
+    return {seastar::make_exception_future<>(
+              crimson::common::system_shutdown_exception()),
+            seastar::now()};
   }
 
   epoch_t map_epoch = get_osdmap_epoch();
@@ -603,13 +606,15 @@ PG::interruptible_future<> PG::submit_transaction(
   peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
                                                txn, true, false);
 
-  return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
-                               std::move(obc),
-                               std::move(txn),
-                               std::move(osd_op_p),
-                               peering_state.get_last_peering_reset(),
-                               map_epoch,
-                               std::move(log_entries)).then_interruptible(
+  auto [submitted, all_completed] = backend->mutate_object(
+      peering_state.get_acting_recovery_backfill(),
+      std::move(obc),
+      std::move(txn),
+      std::move(osd_op_p),
+      peering_state.get_last_peering_reset(),
+      map_epoch,
+      std::move(log_entries));
+  return std::make_tuple(std::move(submitted), all_completed.then_interruptible(
     [this, last_complete=peering_state.get_info().last_complete,
       at_version=osd_op_p.at_version](auto acked) {
     for (const auto& peer : acked) {
@@ -618,7 +623,7 @@ PG::interruptible_future<> PG::submit_transaction(
     }
     peering_state.complete_write(at_version, last_complete);
     return seastar::now();
-  });
+  }));
 }
 
 void PG::fill_op_params_bump_pg_version(
@@ -697,7 +702,8 @@ PG::interruptible_future<> PG::repair_object(
 }
 
 template <class Ret, class SuccessFunc, class FailureFunc>
-PG::do_osd_ops_iertr::future<Ret> PG::do_osd_ops_execute(
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
+PG::do_osd_ops_execute(
   OpsExecuter&& ox,
   std::vector<OSDOp> ops,
   const OpInfo &op_info,
@@ -708,6 +714,7 @@ PG::do_osd_ops_iertr::future<Ret> PG::do_osd_ops_execute(
     return reload_obc(obc).handle_error_interruptible(
       load_obc_ertr::assert_all{"can't live with object state messed up"});
   });
+  auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
   return interruptor::do_for_each(ops, [&ox](OSDOp& osd_op) {
     logger().debug(
       "do_osd_ops_execute: object {} - handling op {}",
@@ -735,31 +742,48 @@ PG::do_osd_ops_iertr::future<Ret> PG::do_osd_ops_execute(
           std::move(txn),
           std::move(osd_op_p));
     });
-  }).safe_then_interruptible_tuple([success_func=std::move(success_func)] {
-    return std::move(success_func)();
-  }, crimson::ct_error::object_corrupted::handle(
-    [rollbacker, this] (const std::error_code& e) mutable {
-    // this is a path for EIO. it's special because we want to fix the obejct
-    // and try again. that is, the layer above `PG::do_osd_ops` is supposed to
-    // restart the execution.
-    return rollbacker.rollback_obc_if_modified(e).then_interruptible(
-      [obc=rollbacker.get_obc(), this] {
-      return repair_object(obc->obs.oi.soid,
-                           obc->obs.oi.version).then_interruptible([] {
-        return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
-      });
-    });
-  }), OpsExecuter::osd_op_errorator::all_same_way(
-    [rollbacker, failure_func=std::move(failure_func)]
+  }).safe_then_unpack_interruptible(
+    [success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
+    (auto submitted_fut, auto all_completed_fut) mutable {
+    return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+        std::move(submitted_fut),
+        all_completed_fut.safe_then_interruptible_tuple(
+          std::move(success_func),
+          crimson::ct_error::object_corrupted::handle(
+            [rollbacker, this] (const std::error_code& e) mutable {
+            // this is a path for EIO. it's special because we want to fix the obejct
+            // and try again. that is, the layer above `PG::do_osd_ops` is supposed to
+            // restart the execution.
+            return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+              [obc=rollbacker.get_obc(), this] {
+              return repair_object(obc->obs.oi.soid,
+                                   obc->obs.oi.version).then_interruptible([] {
+                return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
+              });
+            });
+          }), OpsExecuter::osd_op_errorator::all_same_way(
+            [rollbacker, failure_func_ptr]
+            (const std::error_code& e) mutable {
+            return rollbacker.rollback_obc_if_modified(e).then_interruptible(
+              [&e, failure_func_ptr] {
+              return (*failure_func_ptr)(e);
+            });
+          })
+        )
+      );
+  }, OpsExecuter::osd_op_errorator::all_same_way(
+    [rollbacker, failure_func_ptr]
     (const std::error_code& e) mutable {
-    return rollbacker.rollback_obc_if_modified(e).then_interruptible(
-      [&e, failure_func=std::move(failure_func)] {
-      return std::move(failure_func)(e);
-    });
+    return PG::do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ret>>(
+        seastar::now(),
+        rollbacker.rollback_obc_if_modified(e).then_interruptible(
+          [&e, failure_func_ptr] {
+          return (*failure_func_ptr)(e);
+        }));
   }));
 }
 
-PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>>
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ref<MOSDOpReply>>>
 PG::do_osd_ops(
   Ref<MOSDOp> m,
   ObjectContextRef obc,
@@ -803,7 +827,7 @@ PG::do_osd_ops(
   ).finally([ox_deleter=std::move(ox)] {});
 }
 
-PG::do_osd_ops_iertr::future<>
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
 PG::do_osd_ops(
   ObjectContextRef obc,
   std::vector<OSDOp> ops,
index 45958c4b8e2ffea1a9707effb55fa2c4cf89c91c..45655d25e336dc825b4a4d2724faf62cef7bb6ba 100644 (file)
@@ -573,7 +573,11 @@ private:
     ::crimson::interruptible::interruptible_errorator<
       ::crimson::osd::IOInterruptCondition,
       ::crimson::errorator<crimson::ct_error::eagain>>;
-  do_osd_ops_iertr::future<Ref<MOSDOpReply>> do_osd_ops(
+  template <typename Ret = void>
+  using pg_rep_op_fut_t =
+    std::tuple<interruptible_future<>,
+               do_osd_ops_iertr::future<Ret>>;
+  do_osd_ops_iertr::future<pg_rep_op_fut_t<Ref<MOSDOpReply>>> do_osd_ops(
     Ref<MOSDOp> m,
     ObjectContextRef obc,
     const OpInfo &op_info);
@@ -582,7 +586,7 @@ private:
   using do_osd_ops_failure_func_t =
     std::function<do_osd_ops_iertr::future<>(const std::error_code&)>;
   struct do_osd_ops_params_t;
-  do_osd_ops_iertr::future<> do_osd_ops(
+  do_osd_ops_iertr::future<pg_rep_op_fut_t<>> do_osd_ops(
     ObjectContextRef obc,
     std::vector<OSDOp> ops,
     const OpInfo &op_info,
@@ -590,14 +594,15 @@ private:
     do_osd_ops_success_func_t success_func,
     do_osd_ops_failure_func_t failure_func);
   template <class Ret, class SuccessFunc, class FailureFunc>
-  do_osd_ops_iertr::future<Ret> do_osd_ops_execute(
+  do_osd_ops_iertr::future<pg_rep_op_fut_t<Ret>> do_osd_ops_execute(
     OpsExecuter&& ox,
     std::vector<OSDOp> ops,
     const OpInfo &op_info,
     SuccessFunc&& success_func,
     FailureFunc&& failure_func);
   interruptible_future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
-  interruptible_future<> submit_transaction(
+  std::tuple<interruptible_future<>, interruptible_future<>>
+  submit_transaction(
     const OpInfo& op_info,
     const std::vector<OSDOp>& ops,
     ObjectContextRef&& obc,
index ab87a6e7e3eef1839fcaf350fea3f7ccb550aac1..14546b83195b110cf9a0d410f419a370f7c48b92 100644 (file)
@@ -119,7 +119,7 @@ PGBackend::load_metadata(const hobject_t& oid)
       }));
 }
 
-PGBackend::interruptible_future<crimson::osd::acked_peers_t>
+PGBackend::rep_op_fut_t
 PGBackend::mutate_object(
   std::set<pg_shard_t> pg_shards,
   crimson::osd::ObjectContextRef &&obc,
index 7f5a527545519e36de02baf40d1c5dd5bf9d4e11..699bad16167dcc7070bced44f278fba729663953 100644 (file)
@@ -58,6 +58,9 @@ public:
   using interruptible_future =
     ::crimson::interruptible::interruptible_future<
       ::crimson::osd::IOInterruptCondition, T>;
+  using rep_op_fut_t =
+    std::tuple<interruptible_future<>,
+              interruptible_future<crimson::osd::acked_peers_t>>;
   PGBackend(shard_id_t shard, CollectionRef coll, crimson::os::FuturizedStore* store);
   virtual ~PGBackend() = default;
   static std::unique_ptr<PGBackend> create(pg_t pgid,
@@ -158,7 +161,7 @@ public:
     const OSDOp& osd_op,
     ceph::os::Transaction& trans,
     osd_op_params_t& osd_op_params);
-  interruptible_future<crimson::osd::acked_peers_t> mutate_object(
+  rep_op_fut_t mutate_object(
     std::set<pg_shard_t> pg_shards,
     crimson::osd::ObjectContextRef &&obc,
     ceph::os::Transaction&& txn,
@@ -279,7 +282,7 @@ private:
     uint32_t flags) = 0;
 
   bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn);
-  virtual interruptible_future<crimson::osd::acked_peers_t>
+  virtual rep_op_fut_t
   _submit_transaction(std::set<pg_shard_t>&& pg_shards,
                      const hobject_t& hoid,
                      ceph::os::Transaction&& txn,
index d49c03e33ddde6f36659c80138f975a6c033fecc..5a6e8e0141bd224180d00eabe8b158792fa8e37e 100644 (file)
@@ -104,8 +104,7 @@ RecoveryBackend::handle_backfill_progress(
     m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
     t);
   return shard_services.get_store().do_transaction(
-    pg.get_collection_ref(), std::move(t)
-  ).or_terminate();
+    pg.get_collection_ref(), std::move(t)).or_terminate();
 }
 
 RecoveryBackend::interruptible_future<>
@@ -153,8 +152,7 @@ RecoveryBackend::handle_backfill_remove(
              ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard));
   }
   return shard_services.get_store().do_transaction(
-    pg.get_collection_ref(), std::move(t)
-  ).or_terminate();
+    pg.get_collection_ref(), std::move(t)).or_terminate();
 }
 
 RecoveryBackend::interruptible_future<BackfillInterval>
index db4ac59c645e37f7e8e6ae20a05337d9548d0e67..66ebfd2ff4f4682c6a20f509cf4bd14fec0c5cc9 100644 (file)
@@ -39,7 +39,7 @@ ReplicatedBackend::_read(const hobject_t& hoid,
   return store->read(coll, ghobject_t{hoid}, off, len, flags);
 }
 
-ReplicatedBackend::interruptible_future<crimson::osd::acked_peers_t>
+ReplicatedBackend::rep_op_fut_t
 ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
                                        const hobject_t& hoid,
                                        ceph::os::Transaction&& txn,
@@ -60,49 +60,50 @@ ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
   bufferlist encoded_txn;
   encode(txn, encoded_txn);
 
-  return interruptor::parallel_for_each(std::move(pg_shards),
-    [=, encoded_txn=std::move(encoded_txn), txn=std::move(txn)]
-    (auto pg_shard) mutable {
-      if (pg_shard == whoami) {
-        return shard_services.get_store().do_transaction(coll,std::move(txn));
-      } else {
-        auto m = crimson::net::make_message<MOSDRepOp>(
-          osd_op_p.req_id,
-          whoami,
-          spg_t{pgid, pg_shard.shard},
-          hoid,
-          CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
-          map_epoch,
-          min_epoch,
-          tid,
-          osd_op_p.at_version);
-        m->set_data(encoded_txn);
-        pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
-       encode(log_entries, m->logbl);
-       m->pg_trim_to = osd_op_p.pg_trim_to;
-       m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk;
-       m->set_rollback_to(osd_op_p.at_version);
-        // TODO: set more stuff. e.g., pg_states
-        return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
-      }
-    }).then_interruptible([this, peers=pending_txn->second.weak_from_this()] {
-      if (!peers) {
-       // for now, only actingset_changed can cause peers
-       // to be nullptr
-       assert(peering);
-       throw crimson::common::actingset_changed(peering->is_primary);
-      }
-      if (--peers->pending == 0) {
-        peers->all_committed.set_value();
-       peers->all_committed = {};
-       return seastar::now();
-      }
-      return peers->all_committed.get_shared_future();
-    }).then_interruptible([pending_txn, this] {
-      auto acked_peers = std::move(pending_txn->second.acked_peers);
-      pending_trans.erase(pending_txn);
-      return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
-    });
+  auto all_completed = interruptor::make_interruptible(
+      shard_services.get_store().do_transaction(coll, std::move(txn)))
+  .then_interruptible([this, peers=pending_txn->second.weak_from_this()] {
+    if (!peers) {
+      // for now, only actingset_changed can cause peers
+      // to be nullptr
+      assert(peering);
+      throw crimson::common::actingset_changed(peering->is_primary);
+    }
+    if (--peers->pending == 0) {
+      peers->all_committed.set_value();
+      peers->all_committed = {};
+      return seastar::now();
+    }
+    return peers->all_committed.get_shared_future();
+  }).then_interruptible([pending_txn, this] {
+    auto acked_peers = std::move(pending_txn->second.acked_peers);
+    pending_trans.erase(pending_txn);
+    return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
+  });
+
+  for (auto pg_shard : pg_shards) {
+    if (pg_shard != whoami) {
+      auto m = crimson::net::make_message<MOSDRepOp>(
+       osd_op_p.req_id,
+       whoami,
+       spg_t{pgid, pg_shard.shard},
+       hoid,
+       CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
+       map_epoch,
+       min_epoch,
+       tid,
+       osd_op_p.at_version);
+      m->set_data(encoded_txn);
+      pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
+      encode(log_entries, m->logbl);
+      m->pg_trim_to = osd_op_p.pg_trim_to;
+      m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk;
+      m->set_rollback_to(osd_op_p.at_version);
+      // TODO: set more stuff. e.g., pg_states
+      (void) shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
+    }
+  }
+  return {seastar::now(), std::move(all_completed)};
 }
 
 void ReplicatedBackend::on_actingset_changed(peering_info_t pi)
index 1e2ae752a594a2dd009ecda9da6268eff2c19094..b768fed717b7db1534320ec41e94d5e78e161538 100644 (file)
@@ -29,13 +29,12 @@ private:
   ll_read_ierrorator::future<ceph::bufferlist>
     _read(const hobject_t& hoid, uint64_t off,
          uint64_t len, uint32_t flags) override;
-  interruptible_future<crimson::osd::acked_peers_t>
-  _submit_transaction(std::set<pg_shard_t>&& pg_shards,
-                     const hobject_t& hoid,
-                     ceph::os::Transaction&& txn,
-                     osd_op_params_t&& osd_op_p,
-                     epoch_t min_epoch, epoch_t max_epoch,
-                     std::vector<pg_log_entry_t>&& log_entries) final;
+  rep_op_fut_t _submit_transaction(std::set<pg_shard_t>&& pg_shards,
+    const hobject_t& hoid,
+    ceph::os::Transaction&& txn,
+    osd_op_params_t&& osd_op_p,
+    epoch_t min_epoch, epoch_t max_epoch,
+    std::vector<pg_log_entry_t>&& log_entries) final;
   const pg_t pgid;
   const pg_shard_t whoami;
   crimson::osd::ShardServices& shard_services;