]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: settle RMWPipeline and RMWPipeline within ECBackend
authorRadosław Zarzyński <rzarzyns@redhat.com>
Tue, 19 Dec 2023 17:13:56 +0000 (18:13 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 10 Jan 2024 17:30:28 +0000 (17:30 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/osd/ECBackend.cc
src/osd/ECBackend.h
src/osd/ECCommon.h
src/osd/OSDCap.h
src/osd/Session.h

index 4e8f0b4e019a4b67b92a72be5de5c57095c9e06d..b63ab0cca1cf4305a1ded9987a712229fac3ec00 100644 (file)
@@ -840,7 +840,7 @@ bool ECBackend::_handle_message(
     MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
       _op->get_nonconst_req());
     parent->maybe_preempt_replica_scrub(op->op.soid);
-    handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
+    handle_sub_write(op->op.from, _op, op->op, _op->pg_trace, *get_parent()->get_eclistener());
     return true;
   }
   case MSG_OSD_EC_WRITE_REPLY: {
@@ -958,7 +958,8 @@ void ECBackend::handle_sub_write(
   pg_shard_t from,
   OpRequestRef msg,
   ECSubWrite &op,
-  const ZTracer::Trace &trace)
+  const ZTracer::Trace &trace,
+  ECListener&)
 {
   if (msg) {
     msg->mark_event("sub_op_started");
@@ -1454,10 +1455,15 @@ void ECCommon::ReadPipeline::filter_read_op(
      *    the pull on the affected objects and pushes from in-memory buffers
      *    on any now complete unaffected objects.
      */
+#ifndef WITH_SEASTAR
     get_parent()->schedule_recovery_work(
       get_parent()->bless_unlocked_gencontext(
         new FinishReadOp(*this, op.tid)),
       1);
+#else
+    // TODO
+    ceph_abort_msg("not yet implemented");
+#endif
   }
 }
 
index df8cc6dc54aa1bc839da1814a3e2ccdadb5f64ac..739e35563bfd20b7310199d7a2ed9e60da5a43c1 100644 (file)
@@ -68,7 +68,8 @@ public:
     pg_shard_t from,
     OpRequestRef msg,
     ECSubWrite &op,
-    const ZTracer::Trace &trace
+    const ZTracer::Trace &trace,
+    ECListener& eclistener
     ) override;
   void handle_sub_read(
     pg_shard_t from,
index bdbc059d42bbf63a283c76703d157d4aeb63b7e3..a7585ea13dddb7573bd0e4d42178e9d9490f2a08 100644 (file)
 #include <boost/intrusive/set.hpp>
 #include <boost/intrusive/list.hpp>
 
-#include "OSD.h"
-#include "PGBackend.h"
 #include "erasure-code/ErasureCodeInterface.h"
 #include "ECUtil.h"
+#if WITH_SEASTAR
+#include "ExtentCache.h"
+#include "crimson/osd/object_context.h"
+#include "os/Transaction.h"
+#include "osd/OSDMap.h"
+#include "osd/osd_op_util.h"
+
+struct ECTransaction {
+  struct WritePlan {
+    bool invalidates_cache = false; // Yes, both are possible
+    std::map<hobject_t,extent_set> to_read;
+    std::map<hobject_t,extent_set> will_write; // superset of to_read
+
+    std::map<hobject_t,ECUtil::HashInfoRef> hash_infos;
+  };
+};
+
+typedef void* OpRequestRef;
+typedef crimson::osd::ObjectContextRef ObjectContextRef;
+#else
+#include "common/WorkQueue.h"
+#endif
+
 #include "ECTransaction.h"
 #include "ExtentCache.h"
 
 //forward declaration
 struct ECSubWrite;
-struct ECSubWriteReply;
-struct ECSubRead;
-struct ECSubReadReply;
+struct PGLog;
 
 // ECListener -- an interface decoupling the pipelines from
 // particular implementation of ECBackend (crimson vs cassical).
@@ -46,9 +65,14 @@ struct ECListener {
   virtual void cancel_pull(
     const hobject_t &soid) = 0;
   // XXX
+#ifndef WITH_SEASTAR
+  virtual GenContext<ThreadPool::TPHandle&> *bless_unlocked_gencontext(
+    GenContext<ThreadPool::TPHandle&> *c) = 0;
+
   virtual void schedule_recovery_work(
     GenContext<ThreadPool::TPHandle&> *c,
     uint64_t cost) = 0;
+#endif
 
   virtual epoch_t get_interval_start_epoch() const = 0;
   virtual const std::set<pg_shard_t> &get_acting_shards() const = 0;
@@ -95,6 +119,21 @@ struct ECListener {
   virtual void apply_stats(
      const hobject_t &soid,
      const object_stat_sum_t &delta_stats) = 0;
+
+  // new batch
+  virtual bool is_missing_object(const hobject_t& oid) const = 0;
+  virtual void add_local_next_event(const pg_log_entry_t& e) = 0;
+  virtual void log_operation(
+    std::vector<pg_log_entry_t>&& logv,
+    const std::optional<pg_hit_set_history_t> &hset_history,
+    const eversion_t &trim_to,
+    const eversion_t &roll_forward_to,
+    const eversion_t &min_last_complete_ondisk,
+    bool transaction_applied,
+    ObjectStore::Transaction &t,
+    bool async = false) = 0;
+  virtual void op_applied(
+    const eversion_t &applied_version) = 0;
 };
 
 struct ECCommon {
@@ -104,7 +143,8 @@ struct ECCommon {
     pg_shard_t from,
     OpRequestRef msg,
     ECSubWrite &op,
-    const ZTracer::Trace &trace
+    const ZTracer::Trace &trace,
+    ECListener& eclistener
     ) = 0;
 
   virtual void objects_read_and_reconstruct(
@@ -123,7 +163,7 @@ struct ECCommon {
       bool want_attrs)
       : to_read(to_read), need(need), want_attrs(want_attrs) {}
   };
-  friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
+  friend std::ostream &operator<<(std::ostream &lhs, const read_request_t &rhs);
   struct ReadOp;
   /**
    * Low level async read mechanism
@@ -328,7 +368,7 @@ struct ECCommon {
       std::map<shard_id_t, pg_shard_t> &shards,
       bool for_recovery);
 
-    friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+    friend std::ostream &operator<<(std::ostream &lhs, const ReadOp &rhs);
     friend struct FinishReadOp;
 
     void get_want_to_read_shards(std::set<int> *want_to_read) const;
@@ -424,13 +464,13 @@ struct ECCommon {
         pg_t pgid,
         const ECUtil::stripe_info_t &sinfo,
         std::map<hobject_t,extent_map> *written,
-        std::map<shard_id_t, ObjectStore::Transaction> *transactions,
+        std::map<shard_id_t, ceph::os::Transaction> *transactions,
         DoutPrefixProvider *dpp,
         const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0;
     };
     using OpRef = std::unique_ptr<Op>;
     using op_list = boost::intrusive::list<Op>;
-    friend ostream &operator<<(ostream &lhs, const Op &rhs);
+    friend std::ostream &operator<<(std::ostream &lhs, const Op &rhs);
 
     ExtentCache cache;
     std::map<ceph_tid_t, OpRef> tid_to_op_map; /// Owns Op structure
@@ -472,7 +512,7 @@ struct ECCommon {
       void clear() {
         pipeline_state = CACHE_VALID;
       }
-      friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
+      friend std::ostream &operator<<(std::ostream &lhs, const pipeline_state_t &rhs);
     } pipeline_state;
 
     op_list waiting_state;        /// writes waiting on pipe_state
@@ -520,7 +560,7 @@ struct ECCommon {
       ECSubWrite &op,
       const ZTracer::Trace &trace
     ) {
-      ec_backend.handle_sub_write(from, std::move(msg), op, trace);
+      ec_backend.handle_sub_write(from, std::move(msg), op, trace, *get_parent());
     }
     // end of iface
 
index caf6cd788d734350a2b479bb8f64a09819923bc9..8aed09adf9bb671172cdac387deac0f3c63e8a08 100644 (file)
 #include <ostream>
 using std::ostream;
 
-#include "include/types.h"
-#include "OpRequest.h"
-
 #include <list>
 #include <vector>
 #include <boost/optional.hpp>
 #include <boost/fusion/include/adapt_struct.hpp>
 
+#include "include/types.h"
+#include "osd/osd_op_util.h"
+
+
 static const __u8 OSD_CAP_R     = (1 << 1);      // read
 static const __u8 OSD_CAP_W     = (1 << 2);      // write
 static const __u8 OSD_CAP_CLS_R = (1 << 3);      // class read
index 3c3eae211cf511226632d5ff2906728afb23129f..9fa9c6554563da3c37de13ede984dac21cc81a21 100644 (file)
@@ -20,6 +20,7 @@
 #include "global/global_context.h"
 #include "include/spinlock.h"
 #include "OSDCap.h"
+#include "OpRequest.h"
 #include "Watch.h"
 #include "OSDMap.h"
 #include "PeeringState.h"