]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: shuffle ECCommon::RecoveryBackend from ECBackend.cc to ECCommon.cc
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 9 May 2024 21:00:05 +0000 (21:00 +0000)
committerAlex Ainscow <aainscow@uk.ibm.com>
Wed, 17 Sep 2025 08:43:26 +0000 (09:43 +0100)
It's just code movement; there is no changes apart that.

Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
(cherry picked from commit ef644c9d29b8adaef228a20fc96830724d1fc3f5)

src/osd/ECBackend.cc
src/osd/ECCommon.cc

index bb641d1eefd3e19eb8c252c1084d1f69f385485a..c5158d8757723ced8d31459ca27bd43a72f741d7 100644 (file)
@@ -67,17 +67,6 @@ struct ECBackend::ECRecoveryBackend::ECRecoveryHandle : public PGBackend::Recove
   list<ECCommon::RecoveryBackend::RecoveryOp> ops;
 };
 
-void ECCommon::RecoveryBackend::RecoveryOp::dump(Formatter *f) const {
-  f->dump_stream("hoid") << hoid;
-  f->dump_stream("v") << v;
-  f->dump_stream("missing_on") << missing_on;
-  f->dump_stream("missing_on_shards") << missing_on_shards;
-  f->dump_stream("recovery_info") << recovery_info;
-  f->dump_stream("recovery_progress") << recovery_progress;
-  f->dump_stream("state") << tostr(state);
-  f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
-}
-
 ECBackend::ECBackend(
   PGBackend::Listener *pg,
   CephContext *cct,
@@ -107,41 +96,10 @@ PGBackend::RecoveryHandle *ECBackend::open_recovery_op() {
     recovery_backend.open_recovery_op());
 }
 
-ECCommon::RecoveryBackend::RecoveryBackend(
-  CephContext *cct,
-  const coll_t &coll,
-  ceph::ErasureCodeInterfaceRef ec_impl,
-  const ECUtil::stripe_info_t &sinfo,
-  ReadPipeline &read_pipeline,
-  ECListener *parent)
-  : cct(cct),
-    coll(coll),
-    ec_impl(std::move(ec_impl)),
-    sinfo(sinfo),
-    read_pipeline(read_pipeline),
-    parent(parent) {}
-
 ECBackend::ECRecoveryBackend::ECRecoveryHandle *ECBackend::ECRecoveryBackend::open_recovery_op() {
   return new ECRecoveryHandle;
 }
 
-void ECCommon::RecoveryBackend::_failed_push(const hobject_t &hoid,
-                                             ECCommon::read_result_t &res) {
-  dout(10) << __func__ << ": Read error " << hoid << " r="
-          << res.r << " errors=" << res.errors << dendl;
-  dout(10) << __func__ << ": canceling recovery op for obj " << hoid
-          << dendl;
-  ceph_assert(recovery_ops.count(hoid));
-  eversion_t v = recovery_ops[hoid].v;
-  recovery_ops.erase(hoid);
-
-  set<pg_shard_t> fl;
-  for (auto &&i: res.errors) {
-    fl.insert(i.first);
-  }
-  get_parent()->on_failed_pull(fl, hoid, v);
-}
-
 void ECBackend::handle_recovery_push(
   const PushOp &op,
   RecoveryMessages *m,
@@ -176,218 +134,6 @@ void ECBackend::handle_recovery_push(
   }
 }
 
-void ECCommon::RecoveryBackend::handle_recovery_push(
-  const PushOp &op,
-  RecoveryMessages *m,
-  bool is_repair) {
-  if (get_parent()->check_failsafe_full()) {
-    dout(10) << __func__ << " Out of space (failsafe) processing push request."
-             << dendl;
-    ceph_abort();
-  }
-
-  bool oneshot = op.before_progress.first && op.after_progress.data_complete;
-  ghobject_t tobj;
-  if (oneshot) {
-    tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
-                      get_parent()->whoami_shard().shard);
-  } else {
-    tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
-                                                             op.version),
-                      ghobject_t::NO_GEN,
-                      get_parent()->whoami_shard().shard);
-    if (op.before_progress.first) {
-      dout(10) << __func__ << ": Adding oid "
-              << tobj.hobj << " in the temp collection" << dendl;
-      add_temp_obj(tobj.hobj);
-    }
-  }
-
-  if (op.before_progress.first) {
-    m->t.remove(coll, tobj);
-    m->t.touch(coll, tobj);
-  }
-
-  ceph_assert(op.data.length() == op.data_included.size());
-  uint64_t tobj_size = 0;
-
-  uint64_t cursor = 0;
-  for (auto [off, len] : op.data_included) {
-    bufferlist bl;
-    if (len != op.data.length()) {
-      bl.substr_of(op.data, cursor, len);
-    } else {
-      bl = op.data;
-    }
-    m->t.write(coll, tobj, off, len, bl);
-    tobj_size = off + len;
-    cursor += len;
-  }
-
-  if (op.before_progress.first) {
-    ceph_assert(op.attrset.contains(OI_ATTR));
-    m->t.setattrs(
-      coll,
-      tobj,
-      op.attrset);
-  }
-
-  if (op.after_progress.data_complete) {
-    uint64_t shard_size = sinfo.object_size_to_shard_size(op.recovery_info.size,
-      get_parent()->whoami_shard().shard);
-    ceph_assert(shard_size >= tobj_size);
-    if (shard_size != tobj_size) {
-      m->t.truncate( coll, tobj, shard_size);
-    }
-  }
-
-  if (op.after_progress.data_complete && !oneshot) {
-    dout(10) << __func__ << ": Removing oid "
-            << tobj.hobj << " from the temp collection" << dendl;
-    clear_temp_obj(tobj.hobj);
-    m->t.remove(coll, ghobject_t(
-                  op.soid, ghobject_t::NO_GEN,
-                  get_parent()->whoami_shard().shard));
-    m->t.collection_move_rename(
-      coll, tobj,
-      coll, ghobject_t(
-        op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
-  }
-  if (op.after_progress.data_complete) {
-    if ((get_parent()->pgb_is_primary())) {
-      ceph_assert(recovery_ops.count(op.soid));
-      ceph_assert(recovery_ops[op.soid].obc);
-      if (get_parent()->pg_is_repair() || is_repair)
-        get_parent()->inc_osd_stat_repaired();
-      get_parent()->on_local_recover(
-        op.soid,
-        op.recovery_info,
-        recovery_ops[op.soid].obc,
-        false,
-        &m->t);
-    } else {
-      // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
-      if (is_repair)
-        get_parent()->inc_osd_stat_repaired();
-      get_parent()->on_local_recover(
-        op.soid,
-        op.recovery_info,
-        ObjectContextRef(),
-        false,
-        &m->t);
-    }
-  }
-  m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
-  m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
-}
-
-void ECCommon::RecoveryBackend::handle_recovery_push_reply(
-    const PushReplyOp &op,
-    pg_shard_t from,
-    RecoveryMessages *m) {
-  if (!recovery_ops.count(op.soid))
-    return;
-  RecoveryOp &rop = recovery_ops[op.soid];
-  ceph_assert(rop.waiting_on_pushes.contains(from));
-  rop.waiting_on_pushes.erase(from);
-  continue_recovery_op(rop, m);
-}
-
-void ECCommon::RecoveryBackend::update_object_size_after_read(
-    uint64_t size,
-    read_result_t &res,
-    read_request_t &req) {
-  // We didn't know the size before, meaning the zero for decode calculations
-  // will be off. Recalculate them!
-  ECUtil::shard_extent_set_t zero_mask(sinfo.get_k_plus_m());
-  sinfo.ro_size_to_zero_mask(size, zero_mask);
-  ECUtil::shard_extent_set_t read_mask(sinfo.get_k_plus_m());
-  sinfo.ro_size_to_read_mask(size, read_mask);
-  extent_set superset = res.buffers_read.get_extent_superset();
-
-  for (auto &&[shard, eset] : zero_mask) {
-    eset.intersection_of(superset);
-    if (!eset.empty() &&
-        (res.zero_length_reads.contains(shard) ||
-          res.buffers_read.contains(shard))) {
-      req.zeros_for_decode[shard].insert(eset);
-    }
-  }
-
-  /* Correct the shard_want_to_read, to make sure everything is within scope
-   * of the newly found object size.
-   */
-  for (auto iter = req.shard_want_to_read.begin(); iter != req.shard_want_to_read.end();) {
-    auto &&[shard, eset] = *iter;
-    bool erase = false;
-
-    if (read_mask.contains(shard)) {
-      eset.intersection_of(read_mask.get(shard));
-      erase = eset.empty();
-    } else {
-      erase = true;
-    }
-
-    /* Some shards may be empty */
-    if (erase) {
-      iter = req.shard_want_to_read.erase(iter);
-    } else {
-      ++iter;
-    }
-  }
-
-  dout(20) << "Update want and zeros from read:size=" << size
-           << " res=" << res
-           << " req=" << req
-           << dendl;
-}
-
-void ECCommon::RecoveryBackend::handle_recovery_read_complete(
-    const hobject_t &hoid,
-    read_result_t &&res,
-    read_request_t &req,
-    RecoveryMessages *m) {
-  dout(10) << __func__ << ": returned " << hoid << " " << res << dendl;
-  ceph_assert(recovery_ops.contains(hoid));
-  RecoveryBackend::RecoveryOp &op = recovery_ops[hoid];
-
-  if (res.attrs) {
-    op.xattrs.swap(*(res.attrs));
-    const auto empty_obc = !op.obc;
-    maybe_load_obc(op.xattrs, op);
-#ifdef WITH_CRIMSON
-    ceph_assert(hoid == op.hoid);
-#endif
-    if (empty_obc) {
-      update_object_size_after_read(op.recovery_info.size, res, req);
-    }
-  }
-  ceph_assert(op.xattrs.size());
-  ceph_assert(op.obc);
-
-  op.returned_data.emplace(std::move(res.buffers_read));
-  uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size);
-
-  dout(20) << __func__ << " before decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
-         << op.returned_data->debug_string(2048, 0)
-         << dendl;
-
-  op.returned_data->add_zero_padding_for_decode(req.zeros_for_decode);
-  int r = op.returned_data->decode(ec_impl, req.shard_want_to_read, aligned_size, get_parent()->get_dpp(), true);
-  ceph_assert(r == 0);
-
-  // Finally, we don't want to write any padding, so truncate the buffer
-  // to remove it.
-  op.returned_data->erase_after_ro_offset(aligned_size);
-
-  dout(20) << __func__ << ": oid=" << op.hoid << dendl;
-  dout(20) << __func__ << " after decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
-           << op.returned_data->debug_string(2048, 0)
-           << dendl;
-
-  continue_recovery_op(op, m);
-}
-
 void ECBackend::ECRecoveryBackend::maybe_load_obc(
   const std::map<std::string, ceph::bufferlist, std::less<>>& raw_attrs,
   RecoveryOp &op)
@@ -447,34 +193,6 @@ struct SendPushReplies : public Context {
   }
 };
 
-struct RecoveryReadCompleter : ECCommon::ReadCompleter {
-  RecoveryReadCompleter(ECCommon::RecoveryBackend &backend)
-    : backend(backend) {}
-
-  void finish_single_request(
-      const hobject_t &hoid,
-      ECCommon::read_result_t &&res,
-      ECCommon::read_request_t &req) override {
-    if (!(res.r == 0 && res.errors.empty())) {
-      backend._failed_push(hoid, res);
-      return;
-    }
-    ceph_assert(req.to_read.size() == 0);
-    backend.handle_recovery_read_complete(
-      hoid,
-      std::move(res),
-      req,
-      &rm);
-  }
-
-  void finish(int priority) && override {
-    backend.dispatch_recovery_messages(rm, priority);
-  }
-
-  ECCommon::RecoveryBackend &backend;
-  RecoveryMessages rm;
-};
-
 void ECBackend::ECRecoveryBackend::commit_txn_send_replies(
   ceph::os::Transaction &&txn,
   std::map<int, MOSDPGPushReply*> replies) {
@@ -487,261 +205,6 @@ void ECBackend::ECRecoveryBackend::commit_txn_send_replies(
   get_parent()->queue_transaction(std::move(txn));
 }
 
-void ECCommon::RecoveryBackend::dispatch_recovery_messages(
-  RecoveryMessages &m, int priority) {
-  for (map<pg_shard_t, vector<PushOp>>::iterator i = m.pushes.begin();
-       i != m.pushes.end();
-       m.pushes.erase(i++)) {
-    MOSDPGPush *msg = new MOSDPGPush();
-    msg->set_priority(priority);
-    msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
-    msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
-    msg->from = get_parent()->whoami_shard();
-    msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
-    msg->pushes.swap(i->second);
-    msg->compute_cost(cct);
-    msg->is_repair = get_parent()->pg_is_repair();
-    get_parent()->send_message_osd_cluster(i->first.osd, msg, msg->map_epoch);
-  }
-  std::map<int, MOSDPGPushReply*> replies;
-  for (map<pg_shard_t, vector<PushReplyOp>>::iterator i =
-         m.push_replies.begin();
-       i != m.push_replies.end();
-       m.push_replies.erase(i++)) {
-    MOSDPGPushReply *msg = new MOSDPGPushReply();
-    msg->set_priority(priority);
-    msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
-    msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
-    msg->from = get_parent()->whoami_shard();
-    msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
-    msg->replies.swap(i->second);
-    msg->compute_cost(cct);
-    replies.insert(std::pair(i->first.osd, msg));
-  }
-
-  if (!replies.empty()) {
-    dout(20) << __func__ << " recovery_transactions=";
-    Formatter *f = Formatter::create("json");
-    f->open_object_section("t");
-    m.t.dump(f);
-    f->close_section();
-    f->flush(*_dout);
-    delete f;
-    *_dout << dendl;
-    commit_txn_send_replies(std::move(m.t), std::move(replies));
-  }
-
-  if (m.recovery_reads.empty())
-    return;
-  read_pipeline.start_read_op(
-    priority,
-    m.recovery_reads,
-    false,
-    true,
-    std::make_unique<RecoveryReadCompleter>(*this));
-}
-
-void ECCommon::RecoveryBackend::continue_recovery_op(
-  RecoveryBackend::RecoveryOp &op,
-  RecoveryMessages *m) {
-  dout(10) << __func__ << ": continuing " << op << dendl;
-  using RecoveryOp = RecoveryBackend::RecoveryOp;
-  while (1) {
-    switch (op.state) {
-    case RecoveryOp::IDLE: {
-      ceph_assert(!op.recovery_progress.data_complete);
-      ECUtil::shard_extent_set_t want(sinfo.get_k_plus_m());
-
-      op.state = RecoveryOp::READING;
-
-      /* When beginning recovery, the OI may not be known. As such the object
-       * size is not known. For the first read, attempt to read the default
-       * size.  If this is larger than the object sizes, then the OSD will
-       * return truncated reads.  If the object size is known, then attempt
-       * correctly sized reads.
-       */
-      uint64_t read_size = get_recovery_chunk_size();
-      if (op.obc) {
-        uint64_t read_to_end = ECUtil::align_next(op.obc->obs.oi.size) -
-          op.recovery_progress.data_recovered_to;
-
-        if (read_to_end < read_size) {
-          read_size = read_to_end;
-        }
-      }
-      sinfo.ro_range_to_shard_extent_set_with_parity(
-        op.recovery_progress.data_recovered_to, read_size, want);
-
-      op.recovery_progress.data_recovered_to += read_size;
-
-      // We only need to recover shards that are missing.
-      for (auto shard : shard_id_set::difference(sinfo.get_all_shards(), op.missing_on_shards)) {
-        want.erase(shard);
-      }
-
-      if (op.recovery_progress.first && op.obc) {
-        op.xattrs = op.obc->attr_cache;
-      }
-
-      read_request_t read_request(std::move(want),
-                                  op.recovery_progress.first && !op.obc,
-                                  op.obc
-                                    ? op.obc->obs.oi.size
-                                    : get_recovery_chunk_size());
-
-      int r = read_pipeline.get_min_avail_to_read_shards(
-        op.hoid, true, false, read_request);
-
-      if (r != 0) {
-        // we must have lost a recovery source
-        ceph_assert(!op.recovery_progress.first);
-        dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
-                 << dendl;
-        // in crimson
-        get_parent()->cancel_pull(op.hoid);
-        recovery_ops.erase(op.hoid);
-        return;
-      }
-      if (read_request.shard_reads.empty()) {
-        ceph_assert(op.obc);
-        /* This can happen for several reasons
-         * - A zero-sized object.
-         * - The missing shards have no data.
-         * - The previous recovery did not need the last data shard. In this
-         *   case, data_recovered_to may indicate that the last shard still
-         *   needs recovery, when it does not.
-         * We can just skip the read and fall through below.
-         */
-        dout(10) << __func__ << " No reads required " << op << dendl;
-        // Create an empty read result and fall through.
-        op.returned_data.emplace(&sinfo);
-      } else {
-        m->recovery_read(
-          op.hoid,
-          read_request);
-        dout(10) << __func__ << ": IDLE return " << op << dendl;
-        return;
-      }
-    }
-      [[fallthrough]];
-    case RecoveryOp::READING: {
-      // read completed, start write
-      ceph_assert(op.xattrs.size());
-      ceph_assert(op.returned_data);
-      dout(20) << __func__ << ": returned_data=" << op.returned_data << dendl;
-      op.state = RecoveryOp::WRITING;
-      ObjectRecoveryProgress after_progress = op.recovery_progress;
-      after_progress.first = false;
-      if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
-        after_progress.data_complete = true;
-      }
-
-      for (auto &&pg_shard: op.missing_on) {
-        m->pushes[pg_shard].push_back(PushOp());
-        PushOp &pop = m->pushes[pg_shard].back();
-        pop.soid = op.hoid;
-        pop.version = op.recovery_info.oi.get_version_for_shard(pg_shard.shard);
-
-        op.returned_data->get_sparse_buffer(pg_shard.shard, pop.data, pop.data_included);
-        ceph_assert(pop.data.length() == pop.data_included.size());
-
-        dout(10) << __func__ << ": pop shard=" << pg_shard
-                 << ", oid=" << pop.soid
-                 << ", before_progress=" << op.recovery_progress
-                << ", after_progress=" << after_progress
-                << ", pop.data.length()=" << pop.data.length()
-                 << ", pop.data_included=" << pop.data_included
-                << ", size=" << op.obc->obs.oi.size << dendl;
-
-        if (op.recovery_progress.first) {
-          if (sinfo.is_nonprimary_shard(pg_shard.shard)) {
-            if (pop.version == op.recovery_info.oi.version) {
-              dout(10) << __func__ << ": copy OI attr only" << dendl;
-              pop.attrset[OI_ATTR] = op.xattrs[OI_ATTR];
-            } else {
-              // We are recovering a partial write - make sure we push the correct
-              // version in the OI or a scrub error will occur.
-              object_info_t oi(op.recovery_info.oi);
-              oi.shard_versions.clear();
-              oi.version = pop.version;
-              dout(10) << __func__ << ": partial write OI attr: oi=" << oi << dendl;
-              bufferlist bl;
-              oi.encode(bl, get_osdmap()->get_features(
-                CEPH_ENTITY_TYPE_OSD, nullptr));
-              pop.attrset[OI_ATTR] = bl;
-            }
-          } else {
-            dout(10) << __func__ << ": push all attrs (not nonprimary)" << dendl;
-            pop.attrset = op.xattrs;
-          }
-
-          // Following an upgrade, or turning of overwrites, we can take this
-          // opportunity to clean up hinfo.
-          if (pop.attrset.contains(ECUtil::get_hinfo_key())) {
-            pop.attrset.erase(ECUtil::get_hinfo_key());
-          }
-        }
-        pop.recovery_info = op.recovery_info;
-        pop.before_progress = op.recovery_progress;
-        pop.after_progress = after_progress;
-        if (pg_shard != get_parent()->primary_shard()) {
-          // already in crimson -- junction point with PeeringState
-          get_parent()->begin_peer_recover(
-            pg_shard,
-            op.hoid);
-        }
-      }
-      op.returned_data.reset();
-      op.waiting_on_pushes = op.missing_on;
-      op.recovery_progress = after_progress;
-      dout(10) << __func__ << ": READING return " << op << dendl;
-      return;
-    }
-    case RecoveryOp::WRITING: {
-      if (op.waiting_on_pushes.empty()) {
-        if (op.recovery_progress.data_complete) {
-          op.state = RecoveryOp::COMPLETE;
-          for (set<pg_shard_t>::iterator i = op.missing_on.begin();
-               i != op.missing_on.end();
-               ++i) {
-            if (*i != get_parent()->primary_shard()) {
-              dout(10) << __func__ << ": on_peer_recover on " << *i
-                      << ", obj " << op.hoid << dendl;
-              get_parent()->on_peer_recover(
-                *i,
-                op.hoid,
-                op.recovery_info);
-            }
-          }
-          object_stat_sum_t stat;
-          stat.num_bytes_recovered = op.recovery_info.size;
-          stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
-          stat.num_objects_recovered = 1;
-          // TODO: not in crimson yet
-          if (get_parent()->pg_is_repair())
-            stat.num_objects_repaired = 1;
-          // pg_recovery.cc in crimson has it
-          get_parent()->on_global_recover(op.hoid, stat, false);
-          dout(10) << __func__ << ": WRITING return " << op << dendl;
-          recovery_ops.erase(op.hoid);
-          return;
-        } else {
-          op.state = RecoveryOp::IDLE;
-          dout(10) << __func__ << ": WRITING continue " << op << dendl;
-          continue;
-        }
-      }
-      return;
-    }
-    // should never be called once complete
-    case RecoveryOp::COMPLETE:
-    default: {
-      ceph_abort();
-    };
-    }
-  }
-}
-
 void ECBackend::run_recovery_op(
   PGBackend::RecoveryHandle *_h,
   int priority) {
@@ -778,48 +241,6 @@ int ECBackend::recover_object(
   return 0;
 }
 
-ECCommon::RecoveryBackend::RecoveryOp
-ECCommon::RecoveryBackend::recover_object(
-  const hobject_t &hoid,
-  eversion_t v,
-  ObjectContextRef head,
-  ObjectContextRef obc) {
-  RecoveryOp op;
-  op.v = v;
-  op.hoid = hoid;
-  op.obc = obc;
-  op.recovery_info.soid = hoid;
-  op.recovery_info.version = v;
-  if (obc) {
-    op.recovery_info.size = obc->obs.oi.size;
-    op.recovery_info.oi = obc->obs.oi;
-  }
-  if (hoid.is_snap()) {
-    if (obc) {
-      ceph_assert(obc->ssc);
-      op.recovery_info.ss = obc->ssc->snapset;
-    } else if (head) {
-      ceph_assert(head->ssc);
-      op.recovery_info.ss = head->ssc->snapset;
-    } else {
-      ceph_abort_msg("neither obc nor head set for a snap object");
-    }
-  }
-  op.recovery_progress.omap_complete = true;
-  for (set<pg_shard_t>::const_iterator i =
-         get_parent()->get_acting_recovery_backfill_shards().begin();
-       i != get_parent()->get_acting_recovery_backfill_shards().end();
-       ++i) {
-    dout(10) << "checking " << *i << dendl;
-    if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
-      op.missing_on.insert(*i);
-      op.missing_on_shards.insert(i->shard);
-    }
-  }
-  dout(10) << __func__ << ": built op " << op << dendl;
-  return op;
-}
-
 bool ECBackend::can_handle_while_inactive(
   OpRequestRef _op) {
   return false;
index b06c68c1659df93f2f9317e8a4c7d7098f5ed7aa..46e203fa25cb01300245cd22c6b54c29941d9f76 100644 (file)
@@ -86,6 +86,18 @@ void ECCommon::ReadOp::dump(Formatter *f) const {
   f->dump_stream("in_progress") << in_progress;
 }
 
+void ECCommon::RecoveryBackend::RecoveryOp::dump(Formatter *f) const {
+  f->dump_stream("hoid") << hoid;
+  f->dump_stream("v") << v;
+  f->dump_stream("missing_on") << missing_on;
+  f->dump_stream("missing_on_shards") << missing_on_shards;
+  f->dump_stream("recovery_info") << recovery_info;
+  f->dump_stream("recovery_progress") << recovery_progress;
+  f->dump_stream("state") << tostr(state);
+  f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
+}
+
+
 void ECCommon::ReadPipeline::complete_read_op(ReadOp &&rop) {
   dout(20) << __func__ << " completing " << rop << dendl;
   auto req_iter = rop.to_read.begin();
@@ -977,3 +989,574 @@ void ECCommon::RMWPipeline::call_write_ordered(std::function<void(void)> &&cb) {
   next_write_all_shards = true;
   extent_cache.add_on_write(std::move(cb));
 }
+
+ECCommon::RecoveryBackend::RecoveryBackend(
+  CephContext *cct,
+  const coll_t &coll,
+  ceph::ErasureCodeInterfaceRef ec_impl,
+  const ECUtil::stripe_info_t &sinfo,
+  ReadPipeline &read_pipeline,
+  ECListener *parent)
+  : cct(cct),
+    coll(coll),
+    ec_impl(std::move(ec_impl)),
+    sinfo(sinfo),
+    read_pipeline(read_pipeline),
+    parent(parent) {}
+
+void ECCommon::RecoveryBackend::_failed_push(const hobject_t &hoid,
+                                             ECCommon::read_result_t &res) {
+  dout(10) << __func__ << ": Read error " << hoid << " r="
+          << res.r << " errors=" << res.errors << dendl;
+  dout(10) << __func__ << ": canceling recovery op for obj " << hoid
+          << dendl;
+  ceph_assert(recovery_ops.count(hoid));
+  eversion_t v = recovery_ops[hoid].v;
+  recovery_ops.erase(hoid);
+
+  set<pg_shard_t> fl;
+  for (auto &&i: res.errors) {
+    fl.insert(i.first);
+  }
+  get_parent()->on_failed_pull(fl, hoid, v);
+}
+
+void ECCommon::RecoveryBackend::handle_recovery_push(
+  const PushOp &op,
+  RecoveryMessages *m,
+  bool is_repair) {
+  if (get_parent()->check_failsafe_full()) {
+    dout(10) << __func__ << " Out of space (failsafe) processing push request."
+             << dendl;
+    ceph_abort();
+  }
+
+  bool oneshot = op.before_progress.first && op.after_progress.data_complete;
+  ghobject_t tobj;
+  if (oneshot) {
+    tobj = ghobject_t(op.soid, ghobject_t::NO_GEN,
+                      get_parent()->whoami_shard().shard);
+  } else {
+    tobj = ghobject_t(get_parent()->get_temp_recovery_object(op.soid,
+                                                             op.version),
+                      ghobject_t::NO_GEN,
+                      get_parent()->whoami_shard().shard);
+    if (op.before_progress.first) {
+      dout(10) << __func__ << ": Adding oid "
+              << tobj.hobj << " in the temp collection" << dendl;
+      add_temp_obj(tobj.hobj);
+    }
+  }
+
+  if (op.before_progress.first) {
+    m->t.remove(coll, tobj);
+    m->t.touch(coll, tobj);
+  }
+
+  ceph_assert(op.data.length() == op.data_included.size());
+  uint64_t tobj_size = 0;
+
+  uint64_t cursor = 0;
+  for (auto [off, len] : op.data_included) {
+    bufferlist bl;
+    if (len != op.data.length()) {
+      bl.substr_of(op.data, cursor, len);
+    } else {
+      bl = op.data;
+    }
+    m->t.write(coll, tobj, off, len, bl);
+    tobj_size = off + len;
+    cursor += len;
+  }
+
+  if (op.before_progress.first) {
+    ceph_assert(op.attrset.contains(OI_ATTR));
+    m->t.setattrs(
+      coll,
+      tobj,
+      op.attrset);
+  }
+
+  if (op.after_progress.data_complete) {
+    uint64_t shard_size = sinfo.object_size_to_shard_size(op.recovery_info.size,
+      get_parent()->whoami_shard().shard);
+    ceph_assert(shard_size >= tobj_size);
+    if (shard_size != tobj_size) {
+      m->t.truncate( coll, tobj, shard_size);
+    }
+  }
+
+  if (op.after_progress.data_complete && !oneshot) {
+    dout(10) << __func__ << ": Removing oid "
+            << tobj.hobj << " from the temp collection" << dendl;
+    clear_temp_obj(tobj.hobj);
+    m->t.remove(coll, ghobject_t(
+                  op.soid, ghobject_t::NO_GEN,
+                  get_parent()->whoami_shard().shard));
+    m->t.collection_move_rename(
+      coll, tobj,
+      coll, ghobject_t(
+        op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+  }
+  if (op.after_progress.data_complete) {
+    if ((get_parent()->pgb_is_primary())) {
+      ceph_assert(recovery_ops.count(op.soid));
+      ceph_assert(recovery_ops[op.soid].obc);
+      if (get_parent()->pg_is_repair() || is_repair)
+        get_parent()->inc_osd_stat_repaired();
+      get_parent()->on_local_recover(
+        op.soid,
+        op.recovery_info,
+        recovery_ops[op.soid].obc,
+        false,
+        &m->t);
+    } else {
+      // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
+      if (is_repair)
+        get_parent()->inc_osd_stat_repaired();
+      get_parent()->on_local_recover(
+        op.soid,
+        op.recovery_info,
+        ObjectContextRef(),
+        false,
+        &m->t);
+    }
+  }
+  m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
+  m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
+}
+
+void ECCommon::RecoveryBackend::handle_recovery_push_reply(
+    const PushReplyOp &op,
+    pg_shard_t from,
+    RecoveryMessages *m) {
+  if (!recovery_ops.count(op.soid))
+    return;
+  RecoveryOp &rop = recovery_ops[op.soid];
+  ceph_assert(rop.waiting_on_pushes.contains(from));
+  rop.waiting_on_pushes.erase(from);
+  continue_recovery_op(rop, m);
+}
+
+void ECCommon::RecoveryBackend::update_object_size_after_read(
+    uint64_t size,
+    read_result_t &res,
+    read_request_t &req) {
+  // We didn't know the size before, meaning the zero for decode calculations
+  // will be off. Recalculate them!
+  ECUtil::shard_extent_set_t zero_mask(sinfo.get_k_plus_m());
+  sinfo.ro_size_to_zero_mask(size, zero_mask);
+  ECUtil::shard_extent_set_t read_mask(sinfo.get_k_plus_m());
+  sinfo.ro_size_to_read_mask(size, read_mask);
+  extent_set superset = res.buffers_read.get_extent_superset();
+
+  for (auto &&[shard, eset] : zero_mask) {
+    eset.intersection_of(superset);
+    if (!eset.empty() &&
+        (res.zero_length_reads.contains(shard) ||
+          res.buffers_read.contains(shard))) {
+      req.zeros_for_decode[shard].insert(eset);
+    }
+  }
+
+  /* Correct the shard_want_to_read, to make sure everything is within scope
+   * of the newly found object size.
+   */
+  for (auto iter = req.shard_want_to_read.begin(); iter != req.shard_want_to_read.end();) {
+    auto &&[shard, eset] = *iter;
+    bool erase = false;
+
+    if (read_mask.contains(shard)) {
+      eset.intersection_of(read_mask.get(shard));
+      erase = eset.empty();
+    } else {
+      erase = true;
+    }
+
+    /* Some shards may be empty */
+    if (erase) {
+      iter = req.shard_want_to_read.erase(iter);
+    } else {
+      ++iter;
+    }
+  }
+
+  dout(20) << "Update want and zeros from read:size=" << size
+           << " res=" << res
+           << " req=" << req
+           << dendl;
+}
+
+void ECCommon::RecoveryBackend::handle_recovery_read_complete(
+    const hobject_t &hoid,
+    read_result_t &&res,
+    read_request_t &req,
+    RecoveryMessages *m) {
+  dout(10) << __func__ << ": returned " << hoid << " " << res << dendl;
+  ceph_assert(recovery_ops.contains(hoid));
+  RecoveryBackend::RecoveryOp &op = recovery_ops[hoid];
+
+  if (res.attrs) {
+    op.xattrs.swap(*(res.attrs));
+    const auto empty_obc = !op.obc;
+    maybe_load_obc(op.xattrs, op);
+#ifdef WITH_CRIMSON
+    ceph_assert(hoid == op.hoid);
+#endif
+    if (empty_obc) {
+      update_object_size_after_read(op.recovery_info.size, res, req);
+    }
+  }
+  ceph_assert(op.xattrs.size());
+  ceph_assert(op.obc);
+
+  op.returned_data.emplace(std::move(res.buffers_read));
+  uint64_t aligned_size = ECUtil::align_next(op.obc->obs.oi.size);
+
+  dout(20) << __func__ << " before decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
+         << op.returned_data->debug_string(2048, 0)
+         << dendl;
+
+  op.returned_data->add_zero_padding_for_decode(req.zeros_for_decode);
+  int r = op.returned_data->decode(ec_impl, req.shard_want_to_read, aligned_size, get_parent()->get_dpp(), true);
+  ceph_assert(r == 0);
+
+  // Finally, we don't want to write any padding, so truncate the buffer
+  // to remove it.
+  op.returned_data->erase_after_ro_offset(aligned_size);
+
+  dout(20) << __func__ << ": oid=" << op.hoid << dendl;
+  dout(20) << __func__ << " after decode: oid=" << op.hoid << " EC_DEBUG_BUFFERS: "
+           << op.returned_data->debug_string(2048, 0)
+           << dendl;
+
+  continue_recovery_op(op, m);
+}
+
+
+struct RecoveryReadCompleter : ECCommon::ReadCompleter {
+  RecoveryReadCompleter(ECCommon::RecoveryBackend &backend)
+    : backend(backend) {}
+
+  void finish_single_request(
+      const hobject_t &hoid,
+      ECCommon::read_result_t &&res,
+      ECCommon::read_request_t &req) override {
+    if (!(res.r == 0 && res.errors.empty())) {
+      backend._failed_push(hoid, res);
+      return;
+    }
+    ceph_assert(req.to_read.size() == 0);
+    backend.handle_recovery_read_complete(
+      hoid,
+      std::move(res),
+      req,
+      &rm);
+  }
+
+  void finish(int priority) && override {
+    backend.dispatch_recovery_messages(rm, priority);
+  }
+
+  ECCommon::RecoveryBackend &backend;
+  RecoveryMessages rm;
+};
+
+void ECCommon::RecoveryBackend::dispatch_recovery_messages(
+  RecoveryMessages &m, int priority) {
+  for (map<pg_shard_t, vector<PushOp>>::iterator i = m.pushes.begin();
+       i != m.pushes.end();
+       m.pushes.erase(i++)) {
+    MOSDPGPush *msg = new MOSDPGPush();
+    msg->set_priority(priority);
+    msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
+    msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
+    msg->from = get_parent()->whoami_shard();
+    msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
+    msg->pushes.swap(i->second);
+    msg->compute_cost(cct);
+    msg->is_repair = get_parent()->pg_is_repair();
+    get_parent()->send_message_osd_cluster(i->first.osd, msg, msg->map_epoch);
+  }
+  std::map<int, MOSDPGPushReply*> replies;
+  for (map<pg_shard_t, vector<PushReplyOp>>::iterator i =
+         m.push_replies.begin();
+       i != m.push_replies.end();
+       m.push_replies.erase(i++)) {
+    MOSDPGPushReply *msg = new MOSDPGPushReply();
+    msg->set_priority(priority);
+    msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
+    msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
+    msg->from = get_parent()->whoami_shard();
+    msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
+    msg->replies.swap(i->second);
+    msg->compute_cost(cct);
+    replies.insert(std::pair(i->first.osd, msg));
+  }
+
+  if (!replies.empty()) {
+    dout(20) << __func__ << " recovery_transactions=";
+    Formatter *f = Formatter::create("json");
+    f->open_object_section("t");
+    m.t.dump(f);
+    f->close_section();
+    f->flush(*_dout);
+    delete f;
+    *_dout << dendl;
+    commit_txn_send_replies(std::move(m.t), std::move(replies));
+  }
+
+  if (m.recovery_reads.empty())
+    return;
+  read_pipeline.start_read_op(
+    priority,
+    m.recovery_reads,
+    false,
+    true,
+    std::make_unique<RecoveryReadCompleter>(*this));
+}
+
+void ECCommon::RecoveryBackend::continue_recovery_op(
+  RecoveryBackend::RecoveryOp &op,
+  RecoveryMessages *m) {
+  dout(10) << __func__ << ": continuing " << op << dendl;
+  using RecoveryOp = RecoveryBackend::RecoveryOp;
+  while (1) {
+    switch (op.state) {
+    case RecoveryOp::IDLE: {
+      ceph_assert(!op.recovery_progress.data_complete);
+      ECUtil::shard_extent_set_t want(sinfo.get_k_plus_m());
+
+      op.state = RecoveryOp::READING;
+
+      /* When beginning recovery, the OI may not be known. As such the object
+       * size is not known. For the first read, attempt to read the default
+       * size.  If this is larger than the object sizes, then the OSD will
+       * return truncated reads.  If the object size is known, then attempt
+       * correctly sized reads.
+       */
+      uint64_t read_size = get_recovery_chunk_size();
+      if (op.obc) {
+        uint64_t read_to_end = ECUtil::align_next(op.obc->obs.oi.size) -
+          op.recovery_progress.data_recovered_to;
+
+        if (read_to_end < read_size) {
+          read_size = read_to_end;
+        }
+      }
+      sinfo.ro_range_to_shard_extent_set_with_parity(
+        op.recovery_progress.data_recovered_to, read_size, want);
+
+      op.recovery_progress.data_recovered_to += read_size;
+
+      // We only need to recover shards that are missing.
+      for (auto shard : shard_id_set::difference(sinfo.get_all_shards(), op.missing_on_shards)) {
+        want.erase(shard);
+      }
+
+      if (op.recovery_progress.first && op.obc) {
+        op.xattrs = op.obc->attr_cache;
+      }
+
+      read_request_t read_request(std::move(want),
+                                  op.recovery_progress.first && !op.obc,
+                                  op.obc
+                                    ? op.obc->obs.oi.size
+                                    : get_recovery_chunk_size());
+
+      int r = read_pipeline.get_min_avail_to_read_shards(
+        op.hoid, true, false, read_request);
+
+      if (r != 0) {
+        // we must have lost a recovery source
+        ceph_assert(!op.recovery_progress.first);
+        dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
+                 << dendl;
+        // in crimson
+        get_parent()->cancel_pull(op.hoid);
+        recovery_ops.erase(op.hoid);
+        return;
+      }
+      if (read_request.shard_reads.empty()) {
+        ceph_assert(op.obc);
+        /* This can happen for several reasons
+         * - A zero-sized object.
+         * - The missing shards have no data.
+         * - The previous recovery did not need the last data shard. In this
+         *   case, data_recovered_to may indicate that the last shard still
+         *   needs recovery, when it does not.
+         * We can just skip the read and fall through below.
+         */
+        dout(10) << __func__ << " No reads required " << op << dendl;
+        // Create an empty read result and fall through.
+        op.returned_data.emplace(&sinfo);
+      } else {
+        m->recovery_read(
+          op.hoid,
+          read_request);
+        dout(10) << __func__ << ": IDLE return " << op << dendl;
+        return;
+      }
+    }
+      [[fallthrough]];
+    case RecoveryOp::READING: {
+      // read completed, start write
+      ceph_assert(op.xattrs.size());
+      ceph_assert(op.returned_data);
+      dout(20) << __func__ << ": returned_data=" << op.returned_data << dendl;
+      op.state = RecoveryOp::WRITING;
+      ObjectRecoveryProgress after_progress = op.recovery_progress;
+      after_progress.first = false;
+      if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
+        after_progress.data_complete = true;
+      }
+
+      for (auto &&pg_shard: op.missing_on) {
+        m->pushes[pg_shard].push_back(PushOp());
+        PushOp &pop = m->pushes[pg_shard].back();
+        pop.soid = op.hoid;
+        pop.version = op.recovery_info.oi.get_version_for_shard(pg_shard.shard);
+
+        op.returned_data->get_sparse_buffer(pg_shard.shard, pop.data, pop.data_included);
+        ceph_assert(pop.data.length() == pop.data_included.size());
+
+        dout(10) << __func__ << ": pop shard=" << pg_shard
+                 << ", oid=" << pop.soid
+                 << ", before_progress=" << op.recovery_progress
+                << ", after_progress=" << after_progress
+                << ", pop.data.length()=" << pop.data.length()
+                 << ", pop.data_included=" << pop.data_included
+                << ", size=" << op.obc->obs.oi.size << dendl;
+
+        if (op.recovery_progress.first) {
+          if (sinfo.is_nonprimary_shard(pg_shard.shard)) {
+            if (pop.version == op.recovery_info.oi.version) {
+              dout(10) << __func__ << ": copy OI attr only" << dendl;
+              pop.attrset[OI_ATTR] = op.xattrs[OI_ATTR];
+            } else {
+              // We are recovering a partial write - make sure we push the correct
+              // version in the OI or a scrub error will occur.
+              object_info_t oi(op.recovery_info.oi);
+              oi.shard_versions.clear();
+              oi.version = pop.version;
+              dout(10) << __func__ << ": partial write OI attr: oi=" << oi << dendl;
+              bufferlist bl;
+              oi.encode(bl, get_osdmap()->get_features(
+                CEPH_ENTITY_TYPE_OSD, nullptr));
+              pop.attrset[OI_ATTR] = bl;
+            }
+          } else {
+            dout(10) << __func__ << ": push all attrs (not nonprimary)" << dendl;
+            pop.attrset = op.xattrs;
+          }
+
+          // Following an upgrade, or turning of overwrites, we can take this
+          // opportunity to clean up hinfo.
+          if (pop.attrset.contains(ECUtil::get_hinfo_key())) {
+            pop.attrset.erase(ECUtil::get_hinfo_key());
+          }
+        }
+        pop.recovery_info = op.recovery_info;
+        pop.before_progress = op.recovery_progress;
+        pop.after_progress = after_progress;
+        if (pg_shard != get_parent()->primary_shard()) {
+          // already in crimson -- junction point with PeeringState
+          get_parent()->begin_peer_recover(
+            pg_shard,
+            op.hoid);
+        }
+      }
+      op.returned_data.reset();
+      op.waiting_on_pushes = op.missing_on;
+      op.recovery_progress = after_progress;
+      dout(10) << __func__ << ": READING return " << op << dendl;
+      return;
+    }
+    case RecoveryOp::WRITING: {
+      if (op.waiting_on_pushes.empty()) {
+        if (op.recovery_progress.data_complete) {
+          op.state = RecoveryOp::COMPLETE;
+          for (set<pg_shard_t>::iterator i = op.missing_on.begin();
+               i != op.missing_on.end();
+               ++i) {
+            if (*i != get_parent()->primary_shard()) {
+              dout(10) << __func__ << ": on_peer_recover on " << *i
+                      << ", obj " << op.hoid << dendl;
+              get_parent()->on_peer_recover(
+                *i,
+                op.hoid,
+                op.recovery_info);
+            }
+          }
+          object_stat_sum_t stat;
+          stat.num_bytes_recovered = op.recovery_info.size;
+          stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
+          stat.num_objects_recovered = 1;
+          // TODO: not in crimson yet
+          if (get_parent()->pg_is_repair())
+            stat.num_objects_repaired = 1;
+          // pg_recovery.cc in crimson has it
+          get_parent()->on_global_recover(op.hoid, stat, false);
+          dout(10) << __func__ << ": WRITING return " << op << dendl;
+          recovery_ops.erase(op.hoid);
+          return;
+        } else {
+          op.state = RecoveryOp::IDLE;
+          dout(10) << __func__ << ": WRITING continue " << op << dendl;
+          continue;
+        }
+      }
+      return;
+    }
+    // should never be called once complete
+    case RecoveryOp::COMPLETE:
+    default: {
+      ceph_abort();
+    };
+    }
+  }
+}
+
+ECCommon::RecoveryBackend::RecoveryOp
+ECCommon::RecoveryBackend::recover_object(
+  const hobject_t &hoid,
+  eversion_t v,
+  ObjectContextRef head,
+  ObjectContextRef obc) {
+  RecoveryOp op;
+  op.v = v;
+  op.hoid = hoid;
+  op.obc = obc;
+  op.recovery_info.soid = hoid;
+  op.recovery_info.version = v;
+  if (obc) {
+    op.recovery_info.size = obc->obs.oi.size;
+    op.recovery_info.oi = obc->obs.oi;
+  }
+  if (hoid.is_snap()) {
+    if (obc) {
+      ceph_assert(obc->ssc);
+      op.recovery_info.ss = obc->ssc->snapset;
+    } else if (head) {
+      ceph_assert(head->ssc);
+      op.recovery_info.ss = head->ssc->snapset;
+    } else {
+      ceph_abort_msg("neither obc nor head set for a snap object");
+    }
+  }
+  op.recovery_progress.omap_complete = true;
+  for (set<pg_shard_t>::const_iterator i =
+         get_parent()->get_acting_recovery_backfill_shards().begin();
+       i != get_parent()->get_acting_recovery_backfill_shards().end();
+       ++i) {
+    dout(10) << "checking " << *i << dendl;
+    if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
+      op.missing_on.insert(*i);
+      op.missing_on_shards.insert(i->shard);
+    }
+  }
+  dout(10) << __func__ << ": built op " << op << dendl;
+  return op;
+}
+
+END_IGNORE_DEPRECATED