]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: finish shuffling ECCommon::ReadPipeline to ECCommon.cc
authorRadosław Zarzyński <rzarzyns@redhat.com>
Thu, 28 Sep 2023 16:22:41 +0000 (18:22 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 10 Jan 2024 17:30:28 +0000 (17:30 +0000)
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
src/osd/ECBackend.cc
src/osd/ECCommon.cc

index afa5d5bba44e66e4f859b342a46101027858c820..e29d181ec0b66ccca1ca807f33ad3224497de7ed 100644 (file)
@@ -55,12 +55,6 @@ using ceph::Formatter;
 static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
   return pgb->get_parent()->gen_dbg_prefix(*_dout);
 }
-static ostream& _prefix(std::ostream *_dout, ECCommon::RMWPipeline *rmw_pipeline) {
-  return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout);
-}
-static ostream& _prefix(std::ostream *_dout, ECCommon::ReadPipeline *read_pipeline) {
-  return read_pipeline->get_parent()->gen_dbg_prefix(*_dout);
-}
 
 struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
   list<ECBackend::RecoveryOp> ops;
@@ -92,14 +86,6 @@ static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
   return lhs << "]";
 }
 
-static ostream &operator<<(
-  ostream &lhs,
-  const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
-{
-  return lhs << "(" << rhs.get<0>() << ", "
-            << rhs.get<1>() << ", " << rhs.get<2>() << ")";
-}
-
 ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
 {
   return lhs << "RecoveryOp("
@@ -1589,70 +1575,6 @@ void ECBackend::objects_read_async(
           on_complete)));
 }
 
-struct ClientReadCompleter : ECCommon::ReadCompleter {
-  ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline,
-                      ECCommon::ClientAsyncReadStatus *status)
-    : read_pipeline(read_pipeline),
-      status(status) {}
-
-  void finish_single_request(
-    const hobject_t &hoid,
-    ECCommon::read_result_t &res,
-    list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) override
-  {
-    extent_map result;
-    if (res.r != 0)
-      goto out;
-    ceph_assert(res.returned.size() == to_read.size());
-    ceph_assert(res.errors.empty());
-    for (auto &&read: to_read) {
-      pair<uint64_t, uint64_t> adjusted =
-       read_pipeline.sinfo.offset_len_to_stripe_bounds(
-         make_pair(read.get<0>(), read.get<1>()));
-      ceph_assert(res.returned.front().get<0>() == adjusted.first);
-      ceph_assert(res.returned.front().get<1>() == adjusted.second);
-      map<int, bufferlist> to_decode;
-      bufferlist bl;
-      for (map<pg_shard_t, bufferlist>::iterator j =
-            res.returned.front().get<2>().begin();
-          j != res.returned.front().get<2>().end();
-          ++j) {
-       to_decode[j->first.shard] = std::move(j->second);
-      }
-      int r = ECUtil::decode(
-       read_pipeline.sinfo,
-       read_pipeline.ec_impl,
-       to_decode,
-       &bl);
-      if (r < 0) {
-        res.r = r;
-        goto out;
-      }
-      bufferlist trimmed;
-      trimmed.substr_of(
-       bl,
-       read.get<0>() - adjusted.first,
-       std::min(read.get<1>(),
-           bl.length() - (read.get<0>() - adjusted.first)));
-      result.insert(
-       read.get<0>(), trimmed.length(), std::move(trimmed));
-      res.returned.pop_front();
-    }
-out:
-    status->complete_object(hoid, res.r, std::move(result));
-    read_pipeline.kick_reads();
-  }
-
-  void finish(int priority) && override
-  {
-    // NOP
-  }
-
-  ECCommon::ReadPipeline &read_pipeline;
-  ECCommon::ClientAsyncReadStatus *status;
-};
-
-
 void ECBackend::objects_read_and_reconstruct(
   const map<hobject_t,
     std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
@@ -1664,111 +1586,6 @@ void ECBackend::objects_read_and_reconstruct(
     reads, fast_read, std::move(func));
 }
 
-void ECCommon::ReadPipeline::get_want_to_read_shards(
-  std::set<int> *want_to_read) const
-{
-  const std::vector<int> &chunk_mapping = ec_impl->get_chunk_mapping();
-  for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) {
-    int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i;
-    want_to_read->insert(chunk);
-  }
-}
-
-void ECCommon::ReadPipeline::objects_read_and_reconstruct(
-  const map<hobject_t,
-    std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
-  > &reads,
-  bool fast_read,
-  GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
-{
-  in_progress_client_reads.emplace_back(
-    reads.size(), std::move(func));
-  if (!reads.size()) {
-    kick_reads();
-    return;
-  }
-
-  map<hobject_t, set<int>> obj_want_to_read;
-  set<int> want_to_read;
-  get_want_to_read_shards(&want_to_read);
-    
-  map<hobject_t, read_request_t> for_read_op;
-  for (auto &&to_read: reads) {
-    map<pg_shard_t, vector<pair<int, int>>> shards;
-    int r = get_min_avail_to_read_shards(
-      to_read.first,
-      want_to_read,
-      false,
-      fast_read,
-      &shards);
-    ceph_assert(r == 0);
-
-    for_read_op.insert(
-      make_pair(
-       to_read.first,
-       read_request_t(
-         to_read.second,
-         shards,
-         false)));
-    obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
-  }
-
-  start_read_op(
-    CEPH_MSG_PRIO_DEFAULT,
-    obj_want_to_read,
-    for_read_op,
-    OpRequestRef(),
-    fast_read,
-    false,
-    std::make_unique<ClientReadCompleter>(*this, &(in_progress_client_reads.back())));
-}
-
-
-int ECCommon::ReadPipeline::send_all_remaining_reads(
-  const hobject_t &hoid,
-  ReadOp &rop)
-{
-  set<int> already_read;
-  const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
-  for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
-    already_read.insert(i->shard);
-  dout(10) << __func__ << " have/error shards=" << already_read << dendl;
-  map<pg_shard_t, vector<pair<int, int>>> shards;
-  int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
-                              rop.complete[hoid], &shards, rop.for_recovery);
-  if (r)
-    return r;
-
-  list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
-    rop.to_read.find(hoid)->second.to_read;
-
-  // (Note cuixf) If we need to read attrs and we read failed, try to read again.
-  bool want_attrs =
-    rop.to_read.find(hoid)->second.want_attrs &&
-    (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty());
-  if (want_attrs) {
-    dout(10) << __func__ << " want attrs again" << dendl;
-  }
-
-  rop.to_read.erase(hoid);
-  rop.to_read.insert(make_pair(
-      hoid,
-      read_request_t(
-       offsets,
-       shards,
-       want_attrs)));
-  return 0;
-}
-
-void ECCommon::ReadPipeline::kick_reads()
-{
-  while (in_progress_client_reads.size() &&
-         in_progress_client_reads.front().is_complete()) {
-    in_progress_client_reads.front().run();
-    in_progress_client_reads.pop_front();
-  }
-}
-
 void ECBackend::kick_reads() {
   read_pipeline.kick_reads();
 }
index 7ed15a061f8105bf4118fbce06be89ec4955e392..e1bbfce7796ce82952038236b0d9c2c05cb3f481 100644 (file)
@@ -565,6 +565,175 @@ void ECCommon::ReadPipeline::do_read_op(ReadOp &op)
   dout(10) << __func__ << ": started " << op << dendl;
 }
 
+void ECCommon::ReadPipeline::get_want_to_read_shards(
+  std::set<int> *want_to_read) const
+{
+  const std::vector<int> &chunk_mapping = ec_impl->get_chunk_mapping();
+  for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) {
+    int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i;
+    want_to_read->insert(chunk);
+  }
+}
+
+struct ClientReadCompleter : ECCommon::ReadCompleter {
+  ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline,
+                      ECCommon::ClientAsyncReadStatus *status)
+    : read_pipeline(read_pipeline),
+      status(status) {}
+
+  void finish_single_request(
+    const hobject_t &hoid,
+    ECCommon::read_result_t &res,
+    list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) override
+  {
+    extent_map result;
+    if (res.r != 0)
+      goto out;
+    ceph_assert(res.returned.size() == to_read.size());
+    ceph_assert(res.errors.empty());
+    for (auto &&read: to_read) {
+      pair<uint64_t, uint64_t> adjusted =
+       read_pipeline.sinfo.offset_len_to_stripe_bounds(
+         make_pair(read.get<0>(), read.get<1>()));
+      ceph_assert(res.returned.front().get<0>() == adjusted.first);
+      ceph_assert(res.returned.front().get<1>() == adjusted.second);
+      map<int, bufferlist> to_decode;
+      bufferlist bl;
+      for (map<pg_shard_t, bufferlist>::iterator j =
+            res.returned.front().get<2>().begin();
+          j != res.returned.front().get<2>().end();
+          ++j) {
+       to_decode[j->first.shard] = std::move(j->second);
+      }
+      int r = ECUtil::decode(
+       read_pipeline.sinfo,
+       read_pipeline.ec_impl,
+       to_decode,
+       &bl);
+      if (r < 0) {
+        res.r = r;
+        goto out;
+      }
+      bufferlist trimmed;
+      trimmed.substr_of(
+       bl,
+       read.get<0>() - adjusted.first,
+       std::min(read.get<1>(),
+           bl.length() - (read.get<0>() - adjusted.first)));
+      result.insert(
+       read.get<0>(), trimmed.length(), std::move(trimmed));
+      res.returned.pop_front();
+    }
+out:
+    status->complete_object(hoid, res.r, std::move(result));
+    read_pipeline.kick_reads();
+  }
+
+  void finish(int priority) && override
+  {
+    // NOP
+  }
+
+  ECCommon::ReadPipeline &read_pipeline;
+  ECCommon::ClientAsyncReadStatus *status;
+};
+
+void ECCommon::ReadPipeline::objects_read_and_reconstruct(
+  const map<hobject_t,
+    std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+  > &reads,
+  bool fast_read,
+  GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
+{
+  in_progress_client_reads.emplace_back(
+    reads.size(), std::move(func));
+  if (!reads.size()) {
+    kick_reads();
+    return;
+  }
+
+  map<hobject_t, set<int>> obj_want_to_read;
+  set<int> want_to_read;
+  get_want_to_read_shards(&want_to_read);
+    
+  map<hobject_t, read_request_t> for_read_op;
+  for (auto &&to_read: reads) {
+    map<pg_shard_t, vector<pair<int, int>>> shards;
+    int r = get_min_avail_to_read_shards(
+      to_read.first,
+      want_to_read,
+      false,
+      fast_read,
+      &shards);
+    ceph_assert(r == 0);
+
+    for_read_op.insert(
+      make_pair(
+       to_read.first,
+       read_request_t(
+         to_read.second,
+         shards,
+         false)));
+    obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
+  }
+
+  start_read_op(
+    CEPH_MSG_PRIO_DEFAULT,
+    obj_want_to_read,
+    for_read_op,
+    OpRequestRef(),
+    fast_read,
+    false,
+    std::make_unique<ClientReadCompleter>(*this, &(in_progress_client_reads.back())));
+}
+
+
+int ECCommon::ReadPipeline::send_all_remaining_reads(
+  const hobject_t &hoid,
+  ReadOp &rop)
+{
+  set<int> already_read;
+  const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
+  for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
+    already_read.insert(i->shard);
+  dout(10) << __func__ << " have/error shards=" << already_read << dendl;
+  map<pg_shard_t, vector<pair<int, int>>> shards;
+  int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
+                              rop.complete[hoid], &shards, rop.for_recovery);
+  if (r)
+    return r;
+
+  list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
+    rop.to_read.find(hoid)->second.to_read;
+
+  // (Note cuixf) If we need to read attrs and we read failed, try to read again.
+  bool want_attrs =
+    rop.to_read.find(hoid)->second.want_attrs &&
+    (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty());
+  if (want_attrs) {
+    dout(10) << __func__ << " want attrs again" << dendl;
+  }
+
+  rop.to_read.erase(hoid);
+  rop.to_read.insert(make_pair(
+      hoid,
+      read_request_t(
+       offsets,
+       shards,
+       want_attrs)));
+  return 0;
+}
+
+void ECCommon::ReadPipeline::kick_reads()
+{
+  while (in_progress_client_reads.size() &&
+         in_progress_client_reads.front().is_complete()) {
+    in_progress_client_reads.front().run();
+    in_progress_client_reads.pop_front();
+  }
+}
+
+
 void ECCommon::RMWPipeline::start_rmw(OpRef op)
 {
   ceph_assert(op);