});
}
return seastar::make_ready_future<>();
- }).then([this, &recovery_info, &progress, &available, &new_progress, pop] {
+ }).then([this, &recovery_info, &progress, &available, pop] {
logger().debug("build_push_op: available: {}, copy_subset: {}",
available, recovery_info.copy_subset);
- if (available > 0) {
- if (!recovery_info.copy_subset.empty()) {
- return seastar::do_with(interval_set<uint64_t>(recovery_info.copy_subset),
- [this, &recovery_info, &progress, &available, pop, &new_progress]
- (auto& copy_subset) {
- return backend->fiemap(coll, ghobject_t(recovery_info.soid),
- 0, copy_subset.range_end()).then(
- [©_subset](auto m) {
- interval_set<uint64_t> fiemap_included(std::move(m));
- copy_subset.intersection_of(fiemap_included);
- return seastar::make_ready_future<>();
- }).then([&recovery_info, &progress,
- ©_subset, &available, pop, &new_progress] {
- pop->data_included.span_of(copy_subset, progress.data_recovered_to,
- available);
- if (pop->data_included.empty()) // zero filled section, skip to end!
- new_progress.data_recovered_to =
- recovery_info.copy_subset.range_end();
- else
- new_progress.data_recovered_to = pop->data_included.range_end();
- return seastar::make_ready_future<>();
- }).handle_exception([©_subset](auto e) {
- copy_subset.clear();
- return seastar::make_ready_future<>();
- });
- });
- } else {
- return seastar::now();
- }
- } else {
- pop->data_included.clear();
- return seastar::make_ready_future<>();
- }
- }).then([this, &oi, pop] {
- return store->readv(coll, ghobject_t{oi.soid}, pop->data_included, 0);
- }).safe_then([&recovery_info, &progress,
- &new_progress, stat, pop, &v]
- (auto bl) {
- pop->data.claim_append(bl);
+ return read_object_for_push_op(recovery_info.soid,
+ recovery_info.copy_subset,
+ progress.data_recovered_to,
+ available, pop);
+ }).then([this, &recovery_info, &v, &progress, &new_progress, stat, pop]
+ (uint64_t recovered_to) {
+ new_progress.data_recovered_to = recovered_to;
if (new_progress.is_complete(recovery_info)) {
new_progress.data_complete = true;
if (stat)
pop->version, pop->data.length());
return seastar::make_ready_future<ObjectRecoveryProgress>
(std::move(new_progress));
- }, PGBackend::read_errorator::all_same_way([](auto e) {
- logger().debug("build_push_op: read exception");
- return seastar::make_exception_future<ObjectRecoveryProgress>(e);
- })
- );
+ });
});
}
+seastar::future<uint64_t>
+ReplicatedRecoveryBackend::read_object_for_push_op(
+ const hobject_t& oid,
+ const interval_set<uint64_t>& copy_subset,
+ uint64_t offset,
+ uint64_t max_len,
+ PushOp* push_op)
+{
+ if (max_len == 0 || copy_subset.empty()) {
+ push_op->data_included.clear();
+ return seastar::make_ready_future<uint64_t>(offset);
+ }
+ // 1. get the extents in the interested range
+ return backend->fiemap(coll, ghobject_t{oid},
+ 0, copy_subset.range_end()).then_wrapped(
+ [=](auto&& fiemap_included) mutable {
+ interval_set<uint64_t> extents;
+ try {
+ extents.intersection_of(copy_subset, fiemap_included.get0());
+ } catch (std::exception &) {
+ // if fiemap() fails, we will read nothing, as the intersection of
+ // copy_subset and an empty interval_set would be empty anyway
+ extents.clear();
+ }
+ // 2. we can read up to "max_len" bytes from "offset", so truncate the
+ // extents down to this quota. no need to return the number of consumed
+ // bytes, as this is the last consumer of this quota
+ push_op->data_included.span_of(extents, offset, max_len);
+ // 3. read the truncated extents
+ // TODO: check if the returned extents are pruned
+ return store->readv(coll, ghobject_t{oid}, push_op->data_included, 0);
+ }).safe_then([push_op, range_end=copy_subset.range_end()](auto &&bl) {
+ push_op->data.claim_append(std::move(bl));
+ uint64_t recovered_to = 0;
+ if (push_op->data_included.empty()) {
+ // zero filled section, skip to end!
+ recovered_to = range_end;
+ } else {
+ // note down the progress, we will start from there next time
+ recovered_to = push_op->data_included.range_end();
+ }
+ return seastar::make_ready_future<uint64_t>(recovered_to);
+ }, PGBackend::read_errorator::all_same_way([](auto e) {
+ logger().debug("build_push_op: read exception");
+ return seastar::make_exception_future<uint64_t>(e);
+ }));
+}
+
std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>
ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid)
{