logger().debug("{} {} @{}",
__func__, recovery_info.soid, recovery_info.version);
return seastar::do_with(ObjectRecoveryProgress(progress),
- object_info_t(),
uint64_t(crimson::common::local_conf()
->osd_recovery_max_chunk),
eversion_t(),
PushOp(),
[this, &recovery_info, &progress, stat]
- (auto& new_progress, auto& oi, auto& available, auto& v, auto& pop) {
- return [this, &recovery_info, &progress, &new_progress, &oi, &v, pop=&pop] {
- v = recovery_info.version;
- if (progress.first) {
- return seastar::when_all_succeed(
- backend->omap_get_header(coll, ghobject_t(recovery_info.soid)).safe_then(
- [&pop](auto bl) {
- pop->omap_header.claim_append(bl);
- }, crimson::os::FuturizedStore::read_errorator::all_same_way(
- [] (const std::error_code& e) {
- return seastar::make_exception_future<>(e);
- })),
- store->get_attrs(coll, ghobject_t(recovery_info.soid)).safe_then(
- [&oi, pop, &v](auto attrs) mutable {
- for (auto& [key, val] : attrs) {
- pop->attrset[std::move(key)].push_back(std::move(val));
- }
- logger().debug("build_push_op: {}", pop->attrset[OI_ATTR]);
- oi.decode(pop->attrset[OI_ATTR]);
- if (v == eversion_t()) {
- v = oi.version;
- }
- }, crimson::os::FuturizedStore::read_errorator::all_same_way(
- [] (const std::error_code& e) {
- return seastar::make_exception_future<>(e);
- }))
- ).then_unpack([&new_progress] {
- new_progress.first = false;
- });
+ (auto& new_progress, auto& available, auto& v, auto& pop) {
+ v = recovery_info.version;
+ return read_metadata_for_push_op(recovery_info.soid,
+ progress, new_progress,
+ v, &pop).then([&](eversion_t local_ver) mutable {
+ // If requestor didn't know the version, use ours
+ if (v == eversion_t()) {
+ v = local_ver;
}
- return seastar::make_ready_future<>();
- }().then([this, &recovery_info, &progress, &new_progress, &available, &pop]() mutable {
return read_omap_for_push_op(recovery_info.soid,
progress,
new_progress,
});
}
+seastar::future<eversion_t>
+ReplicatedRecoveryBackend::read_metadata_for_push_op(
+ const hobject_t& oid,
+ const ObjectRecoveryProgress& progress,
+ ObjectRecoveryProgress& new_progress,
+ eversion_t ver,
+ PushOp* push_op)
+{
+ if (!progress.first) {
+ return seastar::make_ready_future<eversion_t>(ver);
+ }
+ return seastar::when_all_succeed(
+ backend->omap_get_header(coll, ghobject_t(oid)).handle_error(
+ crimson::os::FuturizedStore::read_errorator::all_same_way(
+ [] (const std::error_code& e) {
+ return seastar::make_ready_future<bufferlist>();
+ })),
+ store->get_attrs(coll, ghobject_t(oid)).handle_error(
+ crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
+ [] (const std::error_code& e) {
+ return seastar::make_ready_future<crimson::os::FuturizedStore::attrs_t>();
+ }))
+ ).then_unpack([&new_progress, push_op](auto bl, auto attrs) {
+ if (bl.length() == 0) {
+ logger().error("read_metadata_for_push_op: fail to read omap header");
+ return eversion_t{};
+ } else if (attrs.empty()) {
+ logger().error("read_metadata_for_push_op: fail to read attrs");
+ return eversion_t{};
+ }
+ push_op->omap_header.claim_append(std::move(bl));
+ for (auto&& [key, val] : std::move(attrs)) {
+ push_op->attrset[key].push_back(val);
+ }
+ logger().debug("read_metadata_for_push_op: {}", push_op->attrset[OI_ATTR]);
+ object_info_t oi;
+ oi.decode(push_op->attrset[OI_ATTR]);
+ new_progress.first = false;
+ return oi.version;
+ });
+}
+
seastar::future<uint64_t>
ReplicatedRecoveryBackend::read_object_for_push_op(
const hobject_t& oid,