#include "messages/MOSDOp.h"
#include "messages/MOSDPeeringOp.h"
#include "messages/MOSDPGCreate2.h"
+#include "messages/MOSDPGRemove.h"
#include "messages/MOSDPGUpdateLogMissing.h"
#include "messages/MOSDPGUpdateLogMissingReply.h"
#include "messages/MOSDRepOpReply.h"
[[fallthrough]];
case MSG_OSD_PG_LOG:
return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
+ case MSG_OSD_PG_REMOVE:
+ return handle_pg_remove(conn, boost::static_pointer_cast<MOSDPGRemove>(m));
case MSG_OSD_REPOP:
return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
case MSG_OSD_REPOPREPLY:
std::move(*evt)).second;
}
+seastar::future<> OSD::handle_pg_remove(
+ crimson::net::ConnectionRef conn,
+ Ref<MOSDPGRemove> m)
+{
+ LOG_PREFIX(OSD::handle_pg_remove);
+ const int from = m->get_source().num();
+ std::vector<seastar::future<>> futs;
+ for (auto &pg : m->pg_list) {
+ DEBUG("{} from {}", pg, from);
+ futs.emplace_back(
+ pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
+ conn,
+ pg_shard_t{from, pg.shard},
+ pg,
+ m->get_epoch(),
+ m->get_epoch(),
+ PeeringState::DeleteStart()).second);
+ }
+ return seastar::when_all_succeed(std::move(futs));
+}
+
seastar::future<> OSD::check_osdmap_features()
{
LOG_PREFIX(OSD::check_osdmap_features);
Ref<MOSDRepOpReply> m);
seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn,
Ref<MOSDPeeringOp> m);
+ seastar::future<> handle_pg_remove(crimson::net::ConnectionRef conn,
+ Ref<MOSDPGRemove> m);
seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
Ref<MOSDFastDispatchOp> m);
seastar::future<> handle_scrub_command(crimson::net::ConnectionRef conn,
{
recovery_handler->on_pg_clean();
scrubber.on_primary_active_clean();
- return nullptr;
+ recovery_finisher = new C_PG_FinishRecovery(*this);
+ return recovery_finisher;
}
seastar::future<> PG::clear_temp_objects()
pglog_based_recovery_op->cancel();
reset_pglog_based_recovery_op();
}
+
+void PG::C_PG_FinishRecovery::finish(int r) {
+ LOG_PREFIX(PG::C_PG_FinishRecovery::finish);
+ auto &peering_state = pg.get_peering_state();
+ if (peering_state.is_deleting() || !peering_state.is_clean()) {
+ DEBUGDPP("raced with delete or repair", pg);
+ return;
+ }
+ if (this == pg.recovery_finisher) {
+ peering_state.purge_strays();
+ pg.recovery_finisher = nullptr;
+ } else {
+ DEBUGDPP("stale recovery finsher", pg);
+ }
+}
}
}
void check_blocklisted_watchers() final;
void clear_primary_state() final {
- // Not needed yet
+ recovery_finisher = nullptr;
}
void queue_check_readable(epoch_t last_peering_reset,
void on_replica_activate() final;
void on_activate_complete() final;
void on_new_interval() final {
- // Not needed yet
+ recovery_finisher = nullptr;
}
Context *on_clean() final;
void on_activate_committed() final {
}
seastar::future<> stop();
private:
+ class C_PG_FinishRecovery : public Context {
+ public:
+ explicit C_PG_FinishRecovery(PG &pg) : pg(pg) {}
+ void finish(int r) override;
+ private:
+ PG& pg;
+ };
std::unique_ptr<PGBackend> backend;
std::unique_ptr<RecoveryBackend> recovery_backend;
std::unique_ptr<PGRecovery> recovery_handler;
+ C_PG_FinishRecovery *recovery_finisher;
PeeringState peering_state;
eversion_t projected_last_update;