* send
*
* Send a message over a connection that has completed its handshake.
- * May be invoked from any core.
+ *
+ * May be invoked from any core, but that requires to chain the returned
+ * future to preserve ordering.
*/
virtual seastar::future<> send(MessageURef msg) = 0;
* Send a keepalive message over a connection that has completed its
* handshake.
*
- * May be invoked from any core.
+ * May be invoked from any core, but that requires to chain the returned
+ * future to preserve ordering.
*/
virtual seastar::future<> send_keepalive() = 0;
seastar::future<> IOHandler::send(MessageFRef msg)
{
+ // sid may be changed on-the-fly during the submission
if (seastar::this_shard_id() == get_shard_id()) {
return do_send(std::move(msg));
} else {
+ logger().trace("{} send() is directed to {} -- {}",
+ conn, get_shard_id(), *msg);
return seastar::smp::submit_to(
get_shard_id(), [this, msg=std::move(msg)]() mutable {
- return do_send(std::move(msg));
+ return send_redirected(std::move(msg));
+ });
+ }
+}
+
+seastar::future<> IOHandler::send_redirected(MessageFRef msg)
+{
+ // sid may be changed on-the-fly during the submission
+ if (seastar::this_shard_id() == get_shard_id()) {
+ return do_send(std::move(msg));
+ } else {
+ logger().debug("{} send() is redirected to {} -- {}",
+ conn, get_shard_id(), *msg);
+ return seastar::smp::submit_to(
+ get_shard_id(), [this, msg=std::move(msg)]() mutable {
+ return send_redirected(std::move(msg));
});
}
}
seastar::future<> IOHandler::do_send(MessageFRef msg)
{
assert(seastar::this_shard_id() == get_shard_id());
+ logger().trace("{} do_send() got message -- {}", conn, *msg);
if (get_io_state() != io_state_t::drop) {
out_pending_msgs.push_back(std::move(msg));
notify_out_dispatch();
seastar::future<> IOHandler::send_keepalive()
{
+ // sid may be changed on-the-fly during the submission
+ if (seastar::this_shard_id() == get_shard_id()) {
+ return do_send_keepalive();
+ } else {
+ logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id());
+ return seastar::smp::submit_to(
+ get_shard_id(), [this] {
+ return send_keepalive_redirected();
+ });
+ }
+}
+
+seastar::future<> IOHandler::send_keepalive_redirected()
+{
+ // sid may be changed on-the-fly during the submission
if (seastar::this_shard_id() == get_shard_id()) {
return do_send_keepalive();
} else {
+ logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id());
return seastar::smp::submit_to(
get_shard_id(), [this] {
- return do_send_keepalive();
+ return send_keepalive_redirected();
});
}
}
seastar::future<> IOHandler::do_send_keepalive()
{
assert(seastar::this_shard_id() == get_shard_id());
+ logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive);
if (!need_keepalive) {
need_keepalive = true;
notify_out_dispatch();