std::optional<seastar::future<>>
OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
{
- assert(seastar::this_shard_id() == PRIMARY_CORE);
+ if (get_pg_shard_manager().is_stopping()) {
+ return seastar::now();
+ }
bool dispatched = true;
gate.dispatch_in_background(__func__, *this, [this, conn=std::move(conn),
m=std::move(m), &dispatched]() mutable {
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
{
return conn.get_foreign().then([this, m = std::move(m)](auto f_conn) {
- return shard_dispatchers.invoke_on(PRIMARY_CORE,
- [f_conn = std::move(f_conn), m = std::move(m)]
- (auto &local_dispatcher) mutable ->seastar::future<>{
- return local_dispatcher.ms_dispatch(std::move(f_conn), std::move(m));
- });
+ return shard_dispatchers.local().ms_dispatch(std::move(f_conn), std::move(m));
});
}
default:
crimson::net::ConnectionFRef f_conn,
MessageRef m)
{
- crimson::net::ConnectionRef conn = make_local_shared_foreign(std::move(f_conn));
- if (pg_shard_manager.is_stopping()) {
- return seastar::now();
+ if (seastar::this_shard_id() != PRIMARY_CORE) {
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_MAP:
+ case MSG_COMMAND:
+ case MSG_OSD_MARK_ME_DOWN:
+ return container().invoke_on(PRIMARY_CORE,
+ [f_conn = std::move(f_conn), m = std::move(m)]
+ (auto& local_dispatcher) mutable {
+ return local_dispatcher.ms_dispatch(std::move(f_conn), std::move(m));
+ });
+ }
}
+ crimson::net::ConnectionRef conn = make_local_shared_foreign(std::move(f_conn));
+
switch (m->get_type()) {
case CEPH_MSG_OSD_MAP:
return handle_osd_map(boost::static_pointer_cast<MOSDMap>(m));
}
auto &get_pg_shard_manager() {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_dispatchers.local().get_pg_shard_manager();
}
auto &get_pg_shard_manager() const {
- ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return shard_dispatchers.local().get_pg_shard_manager();
}