}
void RecoveryBackend::clean_up(ceph::os::Transaction& t,
- std::string_view why)
+ interrupt_cause_t why)
{
for (auto& soid : temp_contents) {
t.remove(pg.get_collection_ref()->get_cid(),
recovering.clear();
}
+void RecoveryBackend::WaitForObjectRecovery::interrupt(interrupt_cause_t why) {
+ switch(why) {
+ case interrupt_cause_t::INTERVAL_CHANGE:
+ if (readable) {
+ readable->set_exception(
+ crimson::common::actingset_changed(pg.is_primary()));
+ readable.reset();
+ }
+ if (recovered) {
+ recovered->set_exception(
+ crimson::common::actingset_changed(pg.is_primary()));
+ recovered.reset();
+ }
+ if (pulled) {
+ pulled->set_exception(
+ crimson::common::actingset_changed(pg.is_primary()));
+ pulled.reset();
+ }
+ for (auto& [pg_shard, pr] : pushes) {
+ pr.set_exception(
+ crimson::common::actingset_changed(pg.is_primary()));
+ }
+ pushes.clear();
+ break;
+ default:
+ ceph_abort("impossible");
+ break;
+ }
+}
+
void RecoveryBackend::WaitForObjectRecovery::stop() {
if (readable) {
readable->set_exception(
backend{backend} {}
virtual ~RecoveryBackend() {}
std::pair<WaitForObjectRecovery&, bool> add_recovering(const hobject_t& soid) {
- auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery{});
+ auto [it, added] = recovering.emplace(soid, new WaitForObjectRecovery(pg));
assert(it->second);
return {*(it->second), added};
}
std::int64_t min,
std::int64_t max);
+ enum interrupt_cause_t : uint8_t {
+ INTERVAL_CHANGE,
+ MAX
+ };
void on_peering_interval_change(ceph::os::Transaction& t) {
- clean_up(t, "new peering interval");
+ clean_up(t, interrupt_cause_t::INTERVAL_CHANGE);
}
seastar::future<> stop() {
public boost::intrusive_ref_counter<
WaitForObjectRecovery, boost::thread_unsafe_counter>,
public crimson::BlockerT<WaitForObjectRecovery> {
+ crimson::osd::PG &pg;
std::optional<seastar::shared_promise<>> readable, recovered, pulled;
std::map<pg_shard_t, seastar::shared_promise<>> pushes;
public:
static constexpr const char* type_name = "WaitForObjectRecovery";
+ WaitForObjectRecovery(crimson::osd::PG &pg) : pg(pg) {}
+
crimson::osd::ObjectContextRef obc;
std::optional<pull_info_t> pull_info;
std::map<pg_shard_t, push_info_t> pushing;
pushes.erase(it);
}
}
- void interrupt(std::string_view why) {
- if (readable) {
- readable->set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
- readable.reset();
- }
- if (recovered) {
- recovered->set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
- recovered.reset();
- }
- if (pulled) {
- pulled->set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
- pulled.reset();
- }
- for (auto& [pg_shard, pr] : pushes) {
- pr.set_exception(std::system_error(
- std::make_error_code(std::errc::interrupted), why.data()));
- }
- pushes.clear();
- }
+ void interrupt(interrupt_cause_t why);
void stop();
void dump_detail(Formatter* f) const {
}
void add_temp_obj(const hobject_t &oid);
void clear_temp_obj(const hobject_t &oid);
- void clean_up(ceph::os::Transaction& t, std::string_view why);
+ void clean_up(ceph::os::Transaction& t, interrupt_cause_t why);
virtual seastar::future<> on_stop() = 0;
private:
void handle_backfill_finish(