]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: decrease the coupling between client and recovery reads in ECBackend
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 29 Jun 2023 19:01:19 +0000 (19:01 +0000)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 9 Jan 2024 15:09:53 +0000 (15:09 +0000)
`complete_read_op()` being aware about `RecoveryMessages` was too much.

TODO:
  * rename or rework `RecoveryMessages`. Since now it's also
    a callack for `ReadOp::on_complete`. I don't like that.
  * drop the `pair<RecoveryMessages*, read_request_t&>`.

Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/osd/ECBackend.cc
src/osd/ECBackend.h

index d99c989f6783b7898d5128c6823067b21aa70801..5f91d85ce904e1e59061ea8eca88992358c118ff 100644 (file)
@@ -254,10 +254,12 @@ void ECBackend::_failed_push(const hobject_t &hoid,
 
 struct OnRecoveryReadComplete :
   public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
+  struct RecoveryMessages* rm;
   ECBackend *backend;
   hobject_t hoid;
-  OnRecoveryReadComplete(ECBackend *backend, const hobject_t &hoid)
-    : backend(backend), hoid(hoid) {}
+
+  OnRecoveryReadComplete(RecoveryMessages* rm, ECBackend *backend, const hobject_t &hoid)
+    : rm(rm), backend(backend), hoid(hoid) {}
   void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
     ECBackend::read_result_t &res = in.second;
     if (!(res.r == 0 && res.errors.empty())) {
@@ -269,14 +271,18 @@ struct OnRecoveryReadComplete :
       hoid,
       res.returned.back(),
       res.attrs,
-      in.first);
+      rm);
   }
 };
 
-struct RecoveryMessages {
+struct RecoveryMessages : GenContext<int> {
+  ECBackend *ec;
   map<hobject_t,
       ECBackend::read_request_t> recovery_reads;
   map<hobject_t, set<int>> want_to_read;
+
+  RecoveryMessages(ECBackend* ec) : ec(ec) {}
+
   void recovery_read(
     ECBackend *ec,
     const hobject_t &hoid, uint64_t off, uint64_t len,
@@ -296,6 +302,7 @@ struct RecoveryMessages {
          need,
          attrs,
          new OnRecoveryReadComplete(
+           this,
            ec,
            hoid))));
   }
@@ -305,6 +312,10 @@ struct RecoveryMessages {
   ObjectStore::Transaction t;
   RecoveryMessages() {}
   ~RecoveryMessages() {}
+
+  void finish(int priority) override {
+    ec->dispatch_recovery_messages(*this, priority);
+  }
 };
 
 void ECBackend::handle_recovery_push(
@@ -590,7 +601,7 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
     m.want_to_read,
     m.recovery_reads,
     OpRequestRef(),
-    false, true);
+    false, true, new RecoveryMessages{});
 }
 
 void ECBackend::continue_recovery_op(
@@ -755,7 +766,7 @@ void ECBackend::run_recovery_op(
   int priority)
 {
   ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
-  RecoveryMessages m;
+  RecoveryMessages m{this};
   for (list<RecoveryOp>::iterator i = h->ops.begin();
        i != h->ops.end();
        ++i) {
@@ -859,14 +870,14 @@ bool ECBackend::_handle_message(
     // buffers.  It does not conflict with ECSubReadReply operator<<.
     MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
       _op->get_nonconst_req());
-    RecoveryMessages rm;
-    handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
-    dispatch_recovery_messages(rm, priority);
+    handle_sub_read_reply(op->op.from, op->op, _op->pg_trace);
+    // dispatch_recovery_messages() in the case of recovery_reads
+    // is called via the `on_complete` callback
     return true;
   }
   case MSG_OSD_PG_PUSH: {
     auto op = _op->get_req<MOSDPGPush>();
-    RecoveryMessages rm;
+    RecoveryMessages rm{this};
     for (vector<PushOp>::const_iterator i = op->pushes.begin();
         i != op->pushes.end();
         ++i) {
@@ -878,7 +889,7 @@ bool ECBackend::_handle_message(
   case MSG_OSD_PG_PUSH_REPLY: {
     const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
       _op->get_req());
-    RecoveryMessages rm;
+    RecoveryMessages rm{this};
     for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
         i != op->replies.end();
         ++i) {
@@ -1196,7 +1207,6 @@ void ECBackend::handle_sub_write_reply(
 void ECBackend::handle_sub_read_reply(
   pg_shard_t from,
   ECSubReadReply &op,
-  RecoveryMessages *m,
   const ZTracer::Trace &trace)
 {
   trace.event("ec sub read reply");
@@ -1335,13 +1345,13 @@ void ECBackend::handle_sub_read_reply(
              is_complete == rop.complete.size()) {
     dout(20) << __func__ << " Complete: " << rop << dendl;
     rop.trace.event("ec read complete");
-    complete_read_op(rop, m);
+    complete_read_op(rop);
   } else {
     dout(10) << __func__ << " readop not complete: " << rop << dendl;
   }
 }
 
-void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
+void ECBackend::complete_read_op(ReadOp &rop)
 {
   map<hobject_t, read_request_t>::iterator reqiter =
     rop.to_read.begin();
@@ -1364,6 +1374,10 @@ void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
   }
   rop.in_progress.clear();
   tid_to_read_map.erase(rop.tid);
+  if (rop.on_complete) {
+    rop.on_complete->complete(rop.priority);
+    rop.on_complete = nullptr;
+  }
 }
 
 struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
@@ -1373,10 +1387,7 @@ struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
   void finish(ThreadPool::TPHandle &handle) override {
     auto ropiter = ec->tid_to_read_map.find(tid);
     ceph_assert(ropiter != ec->tid_to_read_map.end());
-    int priority = ropiter->second.priority;
-    RecoveryMessages rm;
-    ec->complete_read_op(ropiter->second, &rm);
-    ec->dispatch_recovery_messages(rm, priority);
+    ec->complete_read_op(ropiter->second);
   }
 };
 
@@ -1808,7 +1819,8 @@ void ECBackend::start_read_op(
   map<hobject_t, read_request_t> &to_read,
   OpRequestRef _op,
   bool do_redundant_reads,
-  bool for_recovery)
+  bool for_recovery,
+  GenContext<int> *on_complete)
 {
   ceph_tid_t tid = get_parent()->get_tid();
   ceph_assert(!tid_to_read_map.count(tid));
@@ -1819,6 +1831,7 @@ void ECBackend::start_read_op(
       tid,
       do_redundant_reads,
       for_recovery,
+      on_complete,
       _op,
       std::move(want_to_read),
       std::move(to_read))).first->second;
@@ -2501,7 +2514,7 @@ void ECBackend::objects_read_and_reconstruct(
     obj_want_to_read,
     for_read_op,
     OpRequestRef(),
-    fast_read, false);
+    fast_read, false, nullptr);
   return;
 }
 
index d879f429443572b7db079325972f28c09c112621..0483fa191294ac7ad0937a0e9a0fb14e90d06b8d 100644 (file)
@@ -82,7 +82,6 @@ public:
   void handle_sub_read_reply(
     pg_shard_t from,
     ECSubReadReply &op,
-    RecoveryMessages *m,
     const ZTracer::Trace &trace
     );
 
@@ -277,6 +276,7 @@ private:
   void continue_recovery_op(
     RecoveryOp &op,
     RecoveryMessages *m);
+  friend struct RecoveryMessages;
   void dispatch_recovery_messages(RecoveryMessages &m, int priority);
   friend struct OnRecoveryReadComplete;
   void handle_recovery_read_complete(
@@ -356,6 +356,7 @@ public:
     // True if reading for recovery which could possibly reading only a subset
     // of the available shards.
     bool for_recovery;
+    GenContext<int> *on_complete;
 
     ZTracer::Trace trace;
 
@@ -375,11 +376,17 @@ public:
       ceph_tid_t tid,
       bool do_redundant_reads,
       bool for_recovery,
+      GenContext<int> *on_complete,
       OpRequestRef op,
       std::map<hobject_t, std::set<int>> &&_want_to_read,
       std::map<hobject_t, read_request_t> &&_to_read)
-      : priority(priority), tid(tid), op(op), do_redundant_reads(do_redundant_reads),
-       for_recovery(for_recovery), want_to_read(std::move(_want_to_read)),
+      : priority(priority),
+        tid(tid),
+        op(op),
+        do_redundant_reads(do_redundant_reads),
+        for_recovery(for_recovery),
+        on_complete(on_complete),
+        want_to_read(std::move(_want_to_read)),
        to_read(std::move(_to_read)) {
       for (auto &&hpair: to_read) {
        auto &returned = complete[hpair.first].returned;
@@ -400,8 +407,9 @@ public:
   void filter_read_op(
     const OSDMapRef& osdmap,
     ReadOp &op);
-  void complete_read_op(ReadOp &rop, RecoveryMessages *m);
+  void complete_read_op(ReadOp &rop);
   friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+
   std::map<ceph_tid_t, ReadOp> tid_to_read_map;
   std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
   void start_read_op(
@@ -409,7 +417,9 @@ public:
     std::map<hobject_t, std::set<int>> &want_to_read,
     std::map<hobject_t, read_request_t> &to_read,
     OpRequestRef op,
-    bool do_redundant_reads, bool for_recovery);
+    bool do_redundant_reads,
+    bool for_recovery,
+    GenContext<int> *on_complete);
 
   void do_read_op(ReadOp &rop);
   int send_all_remaining_reads(