public:
static constexpr bool is_trackable = true;
+ virtual bool requires_pg() const {
+ return true;
+ }
};
template <class T>
unsigned instance_id = 0;
public:
+ epoch_t get_epoch_sent_at() const {
+ return m->get_map_epoch();
+ }
+
/**
* instance_handle_t
*
}
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return req->get_min_epoch(); }
+ epoch_t get_epoch_sent_at() const {
+ return req->get_map_epoch();
+ }
ConnectionPipeline &get_connection_pipeline();
}
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return req->get_min_epoch(); }
+ epoch_t get_epoch_sent_at() const {
+ return req->get_map_epoch();
+ }
ConnectionPipeline &get_connection_pipeline();
float delay = 0;
PGPeeringEvent evt;
+ epoch_t get_epoch_sent_at() const {
+ return evt.get_epoch_sent();
+ }
+
const pg_shard_t get_from() const {
return from;
}
evt(std::forward<Args>(args)...)
{}
+ bool requires_pg() const final {
+ return evt.requires_pg;
+ }
+
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;
seastar::future<> with_pg(
PGPeeringPipeline::Process::BlockingEvent
> tracking_events;
+ epoch_t get_epoch_sent_at() const {
+ return to;
+ }
+
private:
PGPeeringPipeline &peering_pp(PG &pg);
};
}
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return m->get_min_epoch(); }
+ epoch_t get_epoch_sent_at() const {
+ return m->get_map_epoch();
+ }
ConnectionPipeline &get_connection_pipeline();
}
PipelineHandle &get_handle() { return handle; }
epoch_t get_epoch() const { return req->get_min_epoch(); }
+ epoch_t get_epoch_sent_at() const {
+ return req->get_map_epoch();
+ }
ConnectionPipeline &get_connection_pipeline();
crimson::net::ConnectionRef l_conn;
crimson::net::ConnectionXcoreRef r_conn;
- epoch_t epoch;
spg_t pgid;
protected:
using interruptor = InterruptibleOperation::interruptor;
+ epoch_t epoch;
template <typename U=void>
using ifut = InterruptibleOperation::interruptible_future<U>;
public:
RemoteScrubEventBaseT(
crimson::net::ConnectionRef conn, epoch_t epoch, spg_t pgid)
- : l_conn(std::move(conn)), epoch(epoch), pgid(pgid) {}
+ : l_conn(std::move(conn)), pgid(pgid), epoch(epoch) {}
PGPeeringPipeline &get_peering_pipeline(PG &pg);
: RemoteScrubEventBaseT<ScrubRequested>(std::forward<Args>(base_args)...),
deep(deep) {}
+ epoch_t get_epoch_sent_at() const {
+ return epoch;
+ }
+
void print(std::ostream &out) const final {
out << "(deep=" << deep << ")";
}
ceph_assert(scrub::PGScrubber::is_scrub_message(*m));
}
+ epoch_t get_epoch_sent_at() const {
+ return epoch;
+ }
+
void print(std::ostream &out) const final {
out << "(m=" << *m << ")";
}
auto &opref = *op;
return opref.template with_blocking_event<
PGMap::PGCreationBlockingEvent
- >([&target_shard_services, &opref](auto &&trigger) {
- return target_shard_services.wait_for_pg(
- std::move(trigger), opref.get_pgid());
- }).safe_then([&logger, &target_shard_services, &opref](Ref<PG> pgref) {
- logger.debug("{}: have_pg", opref);
- return opref.with_pg(target_shard_services, pgref);
- }).handle_error(
- crimson::ct_error::ecanceled::handle([&logger, &opref](auto) {
- logger.debug("{}: pg creation canceled, dropping", opref);
- return seastar::now();
- })
- ).then([op=std::move(op)] {});
+ >([&target_shard_services, &opref, &logger](auto &&trigger) mutable {
+ auto pg = target_shard_services.get_pg(opref.get_pgid());
+ auto fut = ShardServices::wait_for_pg_ertr::make_ready_future<Ref<PG>>(pg);
+ if (!pg) {
+ if (opref.requires_pg()) {
+ auto osdmap = target_shard_services.get_map();
+ if (!osdmap->is_up_acting_osd_shard(
+ opref.get_pgid(), target_shard_services.local_state.whoami)) {
+ logger.debug(
+ "pg {} for {} is no longer here, discarding",
+ opref.get_pgid(), opref);
+ opref.get_handle().exit();
+ auto _fut = seastar::now();
+ if (osdmap->get_epoch() > opref.get_epoch_sent_at()) {
+ _fut = target_shard_services.send_incremental_map(
+ std::ref(opref.get_foreign_connection()),
+ opref.get_epoch_sent_at() + 1);
+ }
+ return _fut;
+ }
+ }
+ fut = target_shard_services.wait_for_pg(
+ std::move(trigger), opref.get_pgid());
+ }
+ return fut.safe_then([&logger, &target_shard_services, &opref](Ref<PG> pgref) {
+ logger.debug("{}: have_pg", opref);
+ return opref.with_pg(target_shard_services, pgref);
+ }).handle_error(
+ crimson::ct_error::ecanceled::handle([&logger, &opref](auto) {
+ logger.debug("{}: pg creation canceled, dropping", opref);
+ return seastar::now();
+ })
+ );
+ }).then([op=std::move(op)] {});
}
seastar::future<> load_pgs(crimson::os::FuturizedStore& store);
co_return;
}
+Ref<PG> ShardServices::get_pg(spg_t pgid)
+{
+ return local_state.get_pg(pgid);
+}
+
seastar::future<> ShardServices::dispatch_context_messages(
BufferedRecoveryMessages &&ctx)
{
return pg_to_shard_mapping.remove_pg_mapping(pgid);
}
+ Ref<PG> get_pg(spg_t pgid);
+
crimson::common::CephContext *get_cct() {
return &(local_state.cct);
}