backend{backend} {}
virtual ~RecoveryBackend() {}
WaitForObjectRecovery& add_recovering(const hobject_t& soid) {
- auto [it, added] = recovering.emplace(soid, WaitForObjectRecovery{});
+ auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery{});
assert(added);
- return it->second;
+ return *(it->second);
}
WaitForObjectRecovery& get_recovering(const hobject_t& soid) {
assert(is_recovering(soid));
- return recovering.at(soid);
+ return *(recovering.at(soid));
}
void remove_recovering(const hobject_t& soid) {
recovering.erase(soid);
seastar::future<> stop() {
for (auto& [soid, recovery_waiter] : recovering) {
- recovery_waiter.stop();
+ recovery_waiter->stop();
}
return on_stop();
}
};
public:
- class WaitForObjectRecovery : public crimson::BlockerT<WaitForObjectRecovery> {
+ class WaitForObjectRecovery :
+ public boost::intrusive_ref_counter<
+ WaitForObjectRecovery, boost::thread_unsafe_counter>,
+ public crimson::BlockerT<WaitForObjectRecovery> {
seastar::shared_promise<> readable, recovered, pulled;
std::map<pg_shard_t, seastar::shared_promise<>> pushes;
public:
seastar::future<> wait_for_recovered() {
return recovered.get_shared_future();
}
+ template <typename T, typename F>
+ auto wait_track_blocking(T &trigger, F &&fut) {
+ WaitForObjectRecoveryRef ref = this;
+ return track_blocking(
+ trigger,
+ std::forward<F>(fut)
+ ).finally([ref] {});
+ }
seastar::future<> wait_for_recovered(BlockingEvent::TriggerI& trigger) {
- return trigger.maybe_record_blocking(recovered.get_shared_future(), *this);
+ WaitForObjectRecoveryRef ref = this;
+ return wait_track_blocking(trigger, recovered.get_shared_future());
}
seastar::future<> wait_for_pull() {
return pulled.get_shared_future();
};
using RecoveryBlockingEvent =
crimson::AggregateBlockingEvent<WaitForObjectRecovery::BlockingEvent>;
+ using WaitForObjectRecoveryRef = boost::intrusive_ptr<WaitForObjectRecovery>;
protected:
- std::map<hobject_t, WaitForObjectRecovery> recovering;
+ std::map<hobject_t, WaitForObjectRecoveryRef> recovering;
hobject_t get_temp_recovery_object(
const hobject_t& target,
eversion_t version) const;
return pg.with_head_obc<RWState::RWREAD>(soid,
[this, soid, need](auto obc) {
logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
- auto& recovery_waiter = recovering.at(soid);
+ auto& recovery_waiter = get_recovering(soid);
recovery_waiter.obc = obc;
recovery_waiter.obc->wait_recovery_read();
return maybe_push_shards(soid, need);
pg.get_osdmap_epoch()))
.then_interruptible(
[this, soid, shard] {
- return recovering.at(soid).wait_for_pushes(shard);
+ return get_recovering(soid).wait_for_pushes(shard);
});
});
}).then_interruptible([this, soid] {
- auto &recovery = recovering.at(soid);
+ auto &recovery = get_recovering(soid);
if (auto push_info = recovery.pushing.begin();
push_info != recovery.pushing.end()) {
pg.get_recovery_handler()->on_global_recover(soid,
}
return seastar::make_ready_future<>();
}).handle_exception_interruptible([this, soid](auto e) {
- auto &recovery = recovering.at(soid);
+ auto &recovery = get_recovering(soid);
if (recovery.obc) {
recovery.obc->drop_recovery_read();
}
return seastar::make_ready_future<>();
}
PullOp po;
- auto& recovery_waiter = recovering.at(soid);
+ auto& recovery_waiter = get_recovering(soid);
recovery_waiter.pi = std::make_optional<RecoveryBackend::PullInfo>();
auto& pi = *recovery_waiter.pi;
prepare_pull(po, pi, soid, need);
shard_services.send_to_osd(shard.osd, std::move(msg),
pg.get_osdmap_epoch())).then_interruptible(
[this, soid, shard] {
- return recovering.at(soid).wait_for_pushes(shard);
+ return get_recovering(soid).wait_for_pushes(shard);
});
}
return seastar::make_ready_future<>();
{
logger().debug("{}: {}, {}", __func__, soid, need);
- auto& recovery_waiter = recovering.at(soid);
+ auto& recovery_waiter = get_recovering(soid);
auto& obc = recovery_waiter.obc;
interval_set<uint64_t> data_subset;
if (obc->obs.oi.size) {
return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then_interruptible(
[this, soid, pg_shard](auto pop) {
- auto& recovery_waiter = recovering.at(soid);
+ auto& recovery_waiter = get_recovering(soid);
auto& pi = recovery_waiter.pushing[pg_shard];
pi.recovery_progress = pop.after_progress;
return pop;
pop.recovery_info, pop.after_progress, pop.data.length(), pop.data_included);
const hobject_t &hoid = pop.soid;
- auto& recovery_waiter = recovering.at(hoid);
+ auto& recovery_waiter = get_recovering(hoid);
auto& pi = *recovery_waiter.pi;
if (pi.recovery_info.size == (uint64_t(-1))) {
pi.recovery_info.size = pop.recovery_info.size;
if (complete) {
pi.stat.num_objects_recovered++;
pg.get_recovery_handler()->on_local_recover(
- pop.soid, recovering.at(pop.soid).pi->recovery_info,
+ pop.soid, get_recovering(pop.soid).pi->recovery_info,
false, *t);
return true;
} else {
}).then_interruptible([this, m, &response](bool complete) {
if (complete) {
auto& pop = m->pushes[0];
- recovering.at(pop.soid).set_pulled();
+ get_recovering(pop.soid).set_pulled();
return seastar::make_ready_future<>();
} else {
auto reply = crimson::make_message<MOSDPGPull>();
logger().debug("{}, soid {}, from {}", __func__, soid, peer);
auto recovering_iter = recovering.find(soid);
if (recovering_iter == recovering.end()
- || !recovering_iter->second.pushing.count(peer)) {
+ || !recovering_iter->second->pushing.count(peer)) {
logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer);
return seastar::make_ready_future<std::optional<PushOp>>();
} else {
- auto& pi = recovering_iter->second.pushing[peer];
+ auto& pi = recovering_iter->second->pushing[peer];
bool error = pi.recovery_progress.error;
if (!pi.recovery_progress.data_complete && !error) {
return build_push_op(pi.recovery_info, pi.recovery_progress,
return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
}).handle_exception_interruptible([recovering_iter, &pi, peer] (auto e) {
pi.recovery_progress.error = true;
- recovering_iter->second.set_push_failed(peer, e);
+ recovering_iter->second->set_push_failed(peer, e);
return seastar::make_ready_future<std::optional<PushOp>>();
});
}
if (!error) {
pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
}
- recovering_iter->second.set_pushed(peer);
+ recovering_iter->second->set_pushed(peer);
return seastar::make_ready_future<std::optional<PushOp>>();
}
}