if (unlikely(!conn.update_rx_seq(msg->get_seq()))) {
// skip this message
- return;
+ return seastar::now();
}
logger().debug("{} <== #{} === {} ({})",
conn, msg_ref->get_seq(), *msg_ref, msg_ref->get_type());
- std::ignore = dispatcher->ms_dispatch(&conn, std::move(msg_ref));
+ // throttle the reading process by the returned future
+ return dispatchers.ms_dispatch(&conn, std::move(msg_ref));
});
}
local_conf()->ms_die_on_old_message) {
ceph_assert(0 == "old msgs despite reconnect_seq feature");
}
- return;
+ return seastar::now();
} else if (message->get_seq() > cur_seq + 1) {
logger().error("{} missed message? skipped from seq {} to {}",
conn, cur_seq, message->get_seq());
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
- std::ignore = dispatcher->ms_dispatch(&conn, std::move(msg_ref));
+ // throttle the reading process by the returned future
+ return dispatchers.ms_dispatch(&conn, std::move(msg_ref));
});
}