}).then_interruptible([this, pg, &shard_services] {
return complete_rctx(shard_services, pg);
});
- }).then_interruptible([pg, &shard_services]()
- -> typename T::template interruptible_future<> {
- if (!pg->get_need_up_thru()) {
- return seastar::now();
- }
- return shard_services.send_alive(pg->get_same_interval_since());
- }).then_interruptible([&shard_services] {
- return shard_services.send_pg_temp();
});
}, [this](std::exception_ptr ep) {
LOG_PREFIX(PeeringEvent<T>::with_pg);
using interruptor = typename T::interruptor;
LOG_PREFIX(PeeringEvent<T>::complete_rctx);
DEBUGI("{}: submitting ctx", *this);
- return shard_services.dispatch_context(
- pg->get_collection_ref(),
- std::move(ctx));
+ return pg->complete_rctx(std::move(ctx));
}
ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()
shard_services.pg_created(pg->get_pgid(), pg);
logger().info("PGAdvanceMap::start new pg {}", *pg);
}
- return seastar::when_all_succeed(
- pg->get_need_up_thru()
- ? shard_services.send_alive(
- pg->get_same_interval_since())
- : seastar::now(),
- shard_services.dispatch_context(
- pg->get_collection_ref(),
- std::move(rctx)));
- }).then_unpack([this] {
- logger().debug("{}: sending pg temp", *this);
- return shard_services.send_pg_temp();
+ return pg->complete_rctx(std::move(rctx));
});
}).then([this] {
logger().debug("{}: complete", *this);
return std::size(snap_trimq);
}
+ seastar::future<> complete_rctx(PeeringCtx &&rctx) {
+ return seastar::when_all_succeed(
+ get_need_up_thru()
+ ? shard_services.send_alive(
+ get_same_interval_since())
+ : seastar::now(),
+ shard_services.dispatch_context(
+ get_collection_ref(),
+ std::move(rctx)),
+ shard_services.send_pg_temp()
+ ).then([](auto){});
+ }
+
void send_cluster_message(
int osd, MessageURef m,
epoch_t epoch, bool share_map_update=false) final {