void drop_read() {
return put_lock_type(RWState::RWREAD);
}
- bool get_recovery_read() {
- return rwstate.get_recovery_read();
+ seastar::future<bool> get_recovery_read(bool can_wait = false) {
+ if (!can_wait) {
+ return seastar::make_ready_future<bool>(rwstate.get_recovery_read());
+ }
+ return with_queue([this] {
+ return rwstate.get_recovery_read();
+ }).then([] {return seastar::make_ready_future<bool>(true); });
}
void drop_recovery_read() {
ceph_assert(rwstate.recovery_read_marker);
});
}
+seastar::future<bool> UrgentRecovery::do_recovery()
+{
+ if (!pg->has_reset_since(epoch_started)) {
+ auto futopt = pg->get_recovery_handler()->recover_missing(soid, need);
+ assert(futopt);
+ return with_blocking_future(std::move(*futopt)).then([] {
+ return seastar::make_ready_future<bool>(false);
+ });
+ }
+ return seastar::make_ready_future<bool>(false);
+}
+
+void UrgentRecovery::print(std::ostream &lhs) const
+{
+ lhs << "UrgentRecovery(" << pg->get_pgid() << ", "
+ << soid << ", v" << need << ")";
+}
+
+void UrgentRecovery::dump_detail(Formatter *f) const
+{
+ f->dump_stream("pgid") << pg->get_pgid();
+ f->open_object_section("recovery_detail");
+ {
+ f->dump_stream("oid") << soid;
+ f->dump_stream("version") << need;
+ }
+ f->close_section();
+}
+
}
class PG;
class ShardServices;
-class BackgroundRecovery final : public OperationT<BackgroundRecovery> {
+class BackgroundRecovery : public OperationT<BackgroundRecovery> {
public:
static constexpr OperationTypeCode type = OperationTypeCode::background_recovery;
epoch_t epoch_started,
crimson::osd::scheduler::scheduler_class_t scheduler_class);
- void print(std::ostream &) const final;
- void dump_detail(Formatter *f) const final;
+ virtual void print(std::ostream &) const;
+ virtual void dump_detail(Formatter *f) const;
seastar::future<> start();
-private:
+protected:
Ref<PG> pg;
ShardServices &ss;
epoch_t epoch_started;
crimson::osd::scheduler::scheduler_class_t scheduler_class;
-
- auto get_scheduler_params() const {
+ auto get_scheduler_params(crimson::osd::scheduler::cost_t cost = 1,
+ crimson::osd::scheduler::client_t owner = 0) const {
return crimson::osd::scheduler::params_t{
- 1, // cost
- 0, // owner
+ cost, // cost
+ owner, // owner
scheduler_class
};
}
+ virtual seastar::future<bool> do_recovery();
+};
- seastar::future<bool> do_recovery();
+class UrgentRecovery final : public BackgroundRecovery {
+public:
+ UrgentRecovery(
+ const hobject_t& soid,
+ const eversion_t& need,
+ Ref<PG> pg,
+ ShardServices& ss,
+ epoch_t epoch_started,
+ crimson::osd::scheduler::scheduler_class_t scheduler_class)
+ : BackgroundRecovery{pg, ss, epoch_started, scheduler_class},
+ soid{soid}, need(need) {}
+ void print(std::ostream&) const final;
+ void dump_detail(Formatter* f) const final;
+private:
+ const hobject_t soid;
+ const eversion_t need;
+ seastar::future<bool> do_recovery() override;
};
}
return with_blocking_future(osd.wait_for_pg(m->get_spg()));
}).then([this, opref=std::move(opref)](Ref<PG> pgref) {
return seastar::do_with(
- std::move(pgref), std::move(opref), [this](auto pgref, auto opref) {
+ std::move(pgref), std::move(opref), [this](auto& pgref, auto& opref) {
PG &pg = *pgref;
return with_blocking_future(
handle.enter(pp(pg).await_map)
handle.enter(pp(pg).wait_for_active));
}).then([this, &pg]() mutable {
return with_blocking_future(pg.wait_for_active_blocker.wait());
- }).then([this, &pg]() mutable {
+ }).then([this, &pgref]() mutable {
if (m->finish_decode()) {
m->clear_payload();
}
if (is_pg_op()) {
- return process_pg_op(pg);
+ return process_pg_op(pgref);
} else {
- return process_op(pg);
+ return process_op(pgref);
}
});
});
}
seastar::future<> ClientRequest::process_pg_op(
- PG &pg)
+ Ref<PG> &pg)
{
- return pg.do_pg_ops(m)
+ return pg->do_pg_ops(m)
.then([this](Ref<MOSDOpReply> reply) {
return conn->send(reply);
});
}
seastar::future<> ClientRequest::process_op(
- PG &pg)
+ Ref<PG> &pgref)
{
+ PG& pg = *pgref;
return with_blocking_future(
- handle.enter(pp(pg).get_obc)
- ).then([this, &pg]() {
+ handle.enter(pp(pg).recover_missing)
+ ).then([this, &pg, pgref=std::move(pgref)] {
+ eversion_t ver;
+ const hobject_t& soid = m->get_hobj();
+ if (pg.is_unreadable_object(soid, &ver)) {
+ auto [op, fut] = osd.get_shard_services().start_operation<UrgentRecovery>(
+ soid, ver, pgref, osd.get_shard_services(), m->get_min_epoch(),
+ crimson::osd::scheduler::scheduler_class_t::immediate);
+ return std::move(fut);
+ }
+ return seastar::now();
+ }).then([this, &pg] {
+ return with_blocking_future(handle.enter(pp(pg).get_obc));
+ }).then([this, &pg]() {
op_info.set_from_op(&*m, *pg.get_osdmap());
return pg.with_locked_obc(
m,
return seastar::now();
}));
}
+
}
OrderedPipelinePhase wait_for_active = {
"ClientRequest::PGPipeline::wait_for_active"
};
+ OrderedPipelinePhase recover_missing = {
+ "ClientRequest::PGPipeline::recover_missing"
+ };
OrderedPipelinePhase get_obc = {
"ClientRequest::PGPipeline::get_obc"
};
private:
seastar::future<> process_pg_op(
- PG &pg);
+ Ref<PG> &pg);
seastar::future<> process_op(
- PG &pg);
+ Ref<PG> &pg);
bool is_pg_op() const;
ConnectionPipeline &cp();
bool is_missing_object(const hobject_t& soid) const {
return peering_state.get_pg_log().get_missing().get_items().count(soid);
}
- bool is_unreadable_object(const hobject_t &oid) const final {
+ bool is_unreadable_object(const hobject_t &oid,
+ eversion_t* v = 0) const final {
return is_missing_object(oid) ||
!peering_state.get_missing_loc().readable_with_acting(
- oid, get_actingset());
+ oid, get_actingset(), v);
}
const set<pg_shard_t> &get_actingset() const {
return peering_state.get_actingset();
obc->obs.oi = recovery_info.oi;
// obc is loaded the excl lock
obc->put_lock_type(RWState::RWEXCL);
- assert(obc->get_recovery_read());
+ assert(obc->get_recovery_read().get0());
}
if (!pg->is_unreadable_object(soid)) {
pg->get_recovery_backend()->get_recovering(soid).set_readable();
void _committed_pushed_object(epoch_t epoch,
eversion_t last_complete);
friend class ReplicatedRecoveryBackend;
+ friend class crimson::osd::UrgentRecovery;
seastar::future<> handle_pull(Ref<MOSDPGPull> m);
seastar::future<> handle_push(Ref<MOSDPGPush> m);
seastar::future<> handle_push_reply(Ref<MOSDPGPushReply> m);
virtual const pg_shard_t& get_pg_whoami() const = 0;
virtual const spg_t& get_pgid() const = 0;
virtual RecoveryBackend* get_recovery_backend() = 0;
- virtual bool is_unreadable_object(const hobject_t&) const = 0;
+ virtual bool is_unreadable_object(const hobject_t&, eversion_t* v = 0) const = 0;
virtual bool has_reset_since(epoch_t) const = 0;
virtual std::vector<pg_shard_t> get_replica_recovery_order() const = 0;
virtual seastar::future<> stop() = 0;
std::move(msg),
pg.get_osdmap_epoch()).then(
[&recovery_waiter] {
- return recovery_waiter.wait_for_pull();
+ return recovery_waiter.wait_for_pull().then([] {
+ return seastar::make_ready_future<bool>(true);
+ });
});
} else {
- return seastar::make_ready_future<>();
+ return seastar::make_ready_future<bool>(false);
}
- }().then([this, &pops, &shards, soid, need, &recovery_waiter]() mutable {
- return [this, &recovery_waiter, soid] {
+ }().then([this, &pops, &shards, soid, need, &recovery_waiter](bool pulled) mutable {
+ return [this, &recovery_waiter, soid, pulled] {
if (!recovery_waiter.obc) {
return pg.get_or_load_head_obc(soid).safe_then(
- [&recovery_waiter](auto p) {
+ [&recovery_waiter, pulled](auto p) {
auto& [obc, existed] = p;
logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
recovery_waiter.obc = obc;
// obc is loaded with excl lock
recovery_waiter.obc->put_lock_type(RWState::RWEXCL);
}
- assert(recovery_waiter.obc->get_recovery_read());
+ bool got = recovery_waiter.obc->get_recovery_read().get0();
+ assert(pulled ? got : 1);
+ if (!got) {
+ return recovery_waiter.obc->get_recovery_read(true)
+ .then([](bool) { return seastar::now(); });
+ }
return seastar::make_ready_future<>();
}, crimson::osd::PG::load_obc_ertr::all_same_way(
[this, &recovery_waiter, soid](const std::error_code& e) {
recovery_waiter.obc = obc;
// obc is loaded with excl lock
recovery_waiter.obc->put_lock_type(RWState::RWEXCL);
- assert(recovery_waiter.obc->get_recovery_read());
+ assert(recovery_waiter.obc->get_recovery_read().get0());
return seastar::make_ready_future<>();
})
);
bool MissingLoc::readable_with_acting(
const hobject_t &hoid,
- const set<pg_shard_t> &acting) const {
- if (!needs_recovery(hoid))
+ const set<pg_shard_t> &acting,
+ eversion_t* v) const {
+ if (!needs_recovery(hoid, v))
return true;
if (is_deleted(hoid))
return false;
}
bool readable_with_acting(
const hobject_t &hoid,
- const std::set<pg_shard_t> &acting) const;
+ const std::set<pg_shard_t> &acting,
+ eversion_t* v = 0) const;
uint64_t num_unfound() const {
uint64_t ret = 0;
for (std::map<hobject_t, pg_missing_item>::const_iterator i =