]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: rework the callback infrastructure around read_pipeline
authorRadosław Zarzyński <rzarzyns@redhat.com>
Mon, 11 Sep 2023 12:00:16 +0000 (14:00 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 10 Jan 2024 17:23:13 +0000 (17:23 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/osd/ECBackend.cc
src/osd/ECBackend.h

index 6c7364f5f835e46eb1e4329019088971ecd08a4a..386c1eeabe32a037d50c2bf10f97fe768e4b2648 100644 (file)
@@ -254,48 +254,17 @@ void ECBackend::_failed_push(const hobject_t &hoid, ECBackend::read_result_t &re
   get_parent()->on_failed_pull(fl, hoid, v);
 }
 
-struct OnRecoveryReadComplete :
-  public GenContext<ECBackend::read_result_t&> {
-  struct RecoveryMessages* rm;
-  ECBackend *backend;
-  hobject_t hoid;
-
-  OnRecoveryReadComplete() = delete;
-  OnRecoveryReadComplete(RecoveryMessages* rm, ECBackend *backend, const hobject_t &hoid)
-    : rm(rm), backend(backend), hoid(hoid) {}
-  void finish(ECBackend::read_result_t &res) override {
-    if (!(res.r == 0 && res.errors.empty())) {
-        backend->_failed_push(hoid, res);
-        return;
-    }
-    ceph_assert(res.returned.size() == 1);
-    backend->handle_recovery_read_complete(
-      hoid,
-      res.returned.back(),
-      res.attrs,
-      rm);
-  }
-};
-
-struct RecoveryMessages : GenContext<int> {
-  ECBackend *ec;
+struct RecoveryMessages {
   map<hobject_t,
       ECBackend::read_request_t> recovery_reads;
-  RecoveryMessages* next_recovery_messages = nullptr;
   map<hobject_t, set<int>> want_to_read;
 
-  RecoveryMessages(ECBackend* ec) : ec(ec) {}
-
   void recovery_read(
-    ECBackend *ec,
     const hobject_t &hoid, uint64_t off, uint64_t len,
     set<int> &&_want_to_read,
     const map<pg_shard_t, vector<pair<int, int>>> &need,
     bool attrs)
   {
-    if (!next_recovery_messages) {
-      next_recovery_messages = new RecoveryMessages{ec};
-    }
     list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
     to_read.push_back(boost::make_tuple(off, len, 0));
     ceph_assert(!recovery_reads.count(hoid));
@@ -306,22 +275,12 @@ struct RecoveryMessages : GenContext<int> {
        ECBackend::read_request_t(
          to_read,
          need,
-         attrs,
-         new OnRecoveryReadComplete(
-           next_recovery_messages,
-           ec,
-           hoid))));
+         attrs)));
   }
 
   map<pg_shard_t, vector<PushOp> > pushes;
   map<pg_shard_t, vector<PushReplyOp> > push_replies;
   ObjectStore::Transaction t;
-  RecoveryMessages() = delete;
-  ~RecoveryMessages() {}
-
-  void finish(int priority) override {
-    ec->dispatch_recovery_messages(*this, priority);
-  }
 };
 
 void ECBackend::handle_recovery_push(
@@ -556,6 +515,36 @@ struct SendPushReplies : public Context {
   }
 };
 
+struct RecoveryReadCompleter : ECBackend::ReadCompleter {
+  RecoveryReadCompleter(ECBackend& backend)
+    : backend(backend) {}
+
+  void finish_single_request(
+    const hobject_t &hoid,
+    ECBackend::read_result_t &res,
+    list<boost::tuple<uint64_t, uint64_t, uint32_t> >) override
+  {
+    if (!(res.r == 0 && res.errors.empty())) {
+      backend._failed_push(hoid, res);
+      return;
+    }
+    ceph_assert(res.returned.size() == 1);
+    backend.handle_recovery_read_complete(
+      hoid,
+      res.returned.back(),
+      res.attrs,
+      &rm);
+  }
+
+  void finish(int priority) && override
+  {
+    backend.dispatch_recovery_messages(rm, priority);
+  }
+
+  ECBackend& backend;
+  RecoveryMessages rm;
+};
+
 void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
 {
   for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
@@ -607,8 +596,9 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
     m.want_to_read,
     m.recovery_reads,
     OpRequestRef(),
-    false, true, m.next_recovery_messages);
-  m.next_recovery_messages = nullptr;
+    false,
+    true,
+    std::make_unique<RecoveryReadCompleter>(*this));
 }
 
 void ECBackend::continue_recovery_op(
@@ -656,7 +646,6 @@ void ECBackend::continue_recovery_op(
        return;
       }
       m->recovery_read(
-       this,
        op.hoid,
        op.recovery_progress.data_recovered_to,
        amount,
@@ -773,7 +762,7 @@ void ECBackend::run_recovery_op(
   int priority)
 {
   ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
-  RecoveryMessages m{this};
+  RecoveryMessages m;
   for (list<RecoveryOp>::iterator i = h->ops.begin();
        i != h->ops.end();
        ++i) {
@@ -884,7 +873,7 @@ bool ECBackend::_handle_message(
   }
   case MSG_OSD_PG_PUSH: {
     auto op = _op->get_req<MOSDPGPush>();
-    RecoveryMessages rm{this};
+    RecoveryMessages rm;
     for (vector<PushOp>::const_iterator i = op->pushes.begin();
         i != op->pushes.end();
         ++i) {
@@ -896,7 +885,7 @@ bool ECBackend::_handle_message(
   case MSG_OSD_PG_PUSH_REPLY: {
     const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
       _op->get_req());
-    RecoveryMessages rm{this};
+    RecoveryMessages rm;
     for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
         i != op->replies.end();
         ++i) {
@@ -1366,11 +1355,14 @@ void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop)
     rop.complete.begin();
   ceph_assert(rop.to_read.size() == rop.complete.size());
   for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
-    if (reqiter->second.cb) {
-      reqiter->second.cb->complete(resiter->second);
-      reqiter->second.cb = nullptr;
-    }
-  }
+    rop.on_complete->finish_single_request(
+      reqiter->first,
+      resiter->second,
+      reqiter->second.to_read);
+  }
+  ceph_assert(rop.on_complete);
+  std::move(*rop.on_complete).finish(rop.priority);
+  rop.on_complete = nullptr;
   // if the read op is over. clean all the data of this tid.
   for (set<pg_shard_t>::iterator iter = rop.in_progress.begin();
     iter != rop.in_progress.end();
@@ -1379,10 +1371,6 @@ void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop)
   }
   rop.in_progress.clear();
   tid_to_read_map.erase(rop.tid);
-  if (rop.on_complete) {
-    rop.on_complete->complete(rop.priority);
-    rop.on_complete = nullptr;
-  }
 }
 
 struct FinishReadOp : public GenContext<ThreadPool::TPHandle&>  {
@@ -1445,10 +1433,6 @@ void ECBackend::ReadPipeline::filter_read_op(
     read_request_t &req = op.to_read.find(*i)->second;
     dout(10) << __func__ << ": canceling " << req
             << "  for obj " << *i << dendl;
-    ceph_assert(req.cb);
-    delete req.cb;
-    req.cb = nullptr;
-
     op.to_read.erase(*i);
     op.complete.erase(*i);
     on_erase(*i);
@@ -1516,13 +1500,6 @@ void ECBackend::ReadPipeline::on_change()
        i != tid_to_read_map.end();
        ++i) {
     dout(10) << __func__ << ": cancelling " << i->second << dendl;
-    for (map<hobject_t, read_request_t>::iterator j =
-          i->second.to_read.begin();
-        j != i->second.to_read.end();
-        ++j) {
-      delete j->second.cb;
-      j->second.cb = nullptr;
-    }
   }
   tid_to_read_map.clear();
   shard_to_read_map.clear();
@@ -1843,7 +1820,7 @@ void ECBackend::ReadPipeline::start_read_op(
   OpRequestRef _op,
   bool do_redundant_reads,
   bool for_recovery,
-  GenContext<int> *on_complete)
+  std::unique_ptr<ECBackend::ReadCompleter> on_complete)
 {
   ceph_tid_t tid = get_parent()->get_tid();
   ceph_assert(!tid_to_read_map.count(tid));
@@ -1854,7 +1831,7 @@ void ECBackend::ReadPipeline::start_read_op(
       tid,
       do_redundant_reads,
       for_recovery,
-      on_complete,
+      std::move(on_complete),
       _op,
       std::move(want_to_read),
       std::move(to_read))).first->second;
@@ -2429,19 +2406,17 @@ void ECBackend::objects_read_async(
           on_complete)));
 }
 
-struct CallClientContexts :
-  public GenContext<ECBackend::read_result_t&> {
-  hobject_t hoid;
-  ECBackend::ReadPipeline &read_pipeline;
-  ECBackend::ClientAsyncReadStatus *status;
-  list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
-  CallClientContexts(
-    hobject_t hoid,
-    ECBackend::ReadPipeline &read_pipeline,
-    ECBackend::ClientAsyncReadStatus *status,
-    const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
-      : hoid(hoid), read_pipeline(read_pipeline), status(status), to_read(to_read) {}
-  void finish(ECBackend::read_result_t &res) override {
+struct ClientReadCompleter : ECBackend::ReadCompleter {
+  ClientReadCompleter(ECBackend::ReadPipeline &read_pipeline,
+                      ECBackend::ClientAsyncReadStatus *status)
+    : read_pipeline(read_pipeline),
+      status(status) {}
+
+  void finish_single_request(
+    const hobject_t &hoid,
+    ECBackend::read_result_t &res,
+    list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) override
+  {
     extent_map result;
     if (res.r != 0)
       goto out;
@@ -2484,8 +2459,17 @@ out:
     status->complete_object(hoid, res.r, std::move(result));
     read_pipeline.kick_reads();
   }
+
+  void finish(int priority) && override
+  {
+    // NOP
+  }
+
+  ECBackend::ReadPipeline &read_pipeline;
+  ECBackend::ClientAsyncReadStatus *status;
 };
 
+
 void ECBackend::objects_read_and_reconstruct(
   const map<hobject_t,
     std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
@@ -2541,19 +2525,13 @@ void ECBackend::ReadPipeline::objects_read_and_reconstruct(
       &shards);
     ceph_assert(r == 0);
 
-    CallClientContexts *c = new CallClientContexts(
-      to_read.first,
-      *this,
-      &(in_progress_client_reads.back()),
-      to_read.second);
     for_read_op.insert(
       make_pair(
        to_read.first,
        read_request_t(
          to_read.second,
          shards,
-         false,
-         c)));
+         false)));
     obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
   }
 
@@ -2562,7 +2540,9 @@ void ECBackend::ReadPipeline::objects_read_and_reconstruct(
     obj_want_to_read,
     for_read_op,
     OpRequestRef(),
-    fast_read, false, nullptr);
+    fast_read,
+    false,
+    std::make_unique<ClientReadCompleter>(*this, &(in_progress_client_reads.back())));
   }();
   return;
 }
@@ -2585,7 +2565,6 @@ int ECBackend::ReadPipeline::send_all_remaining_reads(
 
   list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
     rop.to_read.find(hoid)->second.to_read;
-  GenContext<read_result_t&> *c = rop.to_read.find(hoid)->second.cb;
 
   // (Note cuixf) If we need to read attrs and we read failed, try to read again.
   bool want_attrs =
@@ -2601,8 +2580,7 @@ int ECBackend::ReadPipeline::send_all_remaining_reads(
       read_request_t(
        offsets,
        shards,
-       want_attrs,
-       c)));
+       want_attrs)));
   return 0;
 }
 
index 94267144ba4cd62999cbeeaf8bd13257af9d2fb0..b859414160bd7209a96b0bc9285a387910941adc 100644 (file)
@@ -272,6 +272,7 @@ private:
   friend struct RecoveryMessages;
   void dispatch_recovery_messages(RecoveryMessages &m, int priority);
   friend struct OnRecoveryReadComplete;
+  friend struct RecoveryReadCompleter;
   void handle_recovery_read_complete(
     const hobject_t &hoid,
     boost::tuple<uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > &to_read,
@@ -321,17 +322,25 @@ public:
     const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
     std::map<pg_shard_t, std::vector<std::pair<int, int>>> need;
     bool want_attrs;
-    GenContext<read_result_t&> *cb;
     read_request_t(
       const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
       const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need,
-      bool want_attrs,
-      GenContext<read_result_t&> *cb)
-      : to_read(to_read), need(need), want_attrs(want_attrs),
-       cb(cb) {}
+      bool want_attrs)
+      : to_read(to_read), need(need), want_attrs(want_attrs) {}
   };
   friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
 
+  struct ReadCompleter {
+    virtual void finish_single_request(
+      const hobject_t &hoid,
+      ECBackend::read_result_t &res,
+      std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) = 0;
+
+    virtual void finish(int priority) && = 0;
+
+    virtual ~ReadCompleter() = default;
+  };
+
   struct ReadOp {
     int priority;
     ceph_tid_t tid;
@@ -343,7 +352,7 @@ public:
     // True if reading for recovery which could possibly reading only a subset
     // of the available shards.
     bool for_recovery;
-    GenContext<int> *on_complete;
+    std::unique_ptr<ReadCompleter> on_complete;
 
     ZTracer::Trace trace;
 
@@ -363,7 +372,7 @@ public:
       ceph_tid_t tid,
       bool do_redundant_reads,
       bool for_recovery,
-      GenContext<int> *on_complete,
+      std::unique_ptr<ReadCompleter> _on_complete,
       OpRequestRef op,
       std::map<hobject_t, std::set<int>> &&_want_to_read,
       std::map<hobject_t, read_request_t> &&_to_read)
@@ -372,7 +381,7 @@ public:
         op(op),
         do_redundant_reads(do_redundant_reads),
         for_recovery(for_recovery),
-        on_complete(on_complete),
+        on_complete(std::move(_on_complete)),
         want_to_read(std::move(_want_to_read)),
        to_read(std::move(_to_read)) {
       for (auto &&hpair: to_read) {
@@ -387,7 +396,7 @@ public:
       }
     }
     ReadOp() = delete;
-    ReadOp(const ReadOp &) = default;
+    ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr
     ReadOp(ReadOp &&) = default;
   };
 
@@ -417,7 +426,7 @@ public:
       OpRequestRef op,
       bool do_redundant_reads,
       bool for_recovery,
-      GenContext<int> *on_complete);
+      std::unique_ptr<ReadCompleter> on_complete);
 
     void do_read_op(ReadOp &rop);