}
void set_pushed(pg_shard_t shard) {
auto it = pushes.find(shard);
- if (it != pushes.end()) {
- auto &push_promise = it->second;
- push_promise.set_value();
- pushes.erase(it);
- }
+ ceph_assert(it != pushes.end());
+ it->second.set_value();
+ pushes.erase(it);
}
void set_pulled() {
if (pulled) {
msg->min_epoch = pg.get_last_peering_reset();
msg->pushes.push_back(std::move(push));
msg->set_priority(pg.get_recovery_op_priority());
+ seastar::future<> push_future = get_recovering(soid).wait_for_pushes(shard);
return interruptor::make_interruptible(
shard_services.send_to_osd(shard.osd,
std::move(msg),
pg.get_osdmap_epoch()))
.then_interruptible(
- [this, soid, shard] {
- return get_recovering(soid).wait_for_pushes(shard);
+ [push_future = std::move(push_future)]() mutable {
+ return std::move(push_future);
});
});
});
pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch);
msg->set_priority(pg.get_recovery_op_priority());
msg->objects.push_back(std::make_pair(soid, need));
+ seastar::future<> push_future = get_recovering(soid).wait_for_pushes(shard);
return interruptor::make_interruptible(
shard_services.send_to_osd(shard.osd, std::move(msg),
pg.get_osdmap_epoch())).then_interruptible(
- [this, soid, shard] {
- return get_recovering(soid).wait_for_pushes(shard);
+ [push_future = std::move(push_future)]() mutable {
+ return std::move(push_future);
});
}
return seastar::make_ready_future<>();