}
}
-seastar::future<> IOHandler::do_out_dispatch()
+seastar::future<>
+IOHandler::do_out_dispatch(shard_states_t &ctx)
{
- return seastar::repeat([this] {
- switch (get_io_state()) {
+ return seastar::repeat([this, &ctx] {
+ switch (ctx.get_io_state()) {
case io_state_t::open: {
if (unlikely(!is_out_queued())) {
// try exit open dispatching
return frame_assembler->flush<false>(
- ).then([this] {
- if (get_io_state() != io_state_t::open || is_out_queued()) {
+ ).then([this, &ctx] {
+ if (ctx.get_io_state() != io_state_t::open || is_out_queued()) {
return seastar::make_ready_future<stop_t>(stop_t::no);
}
// still nothing pending to send after flush,
// open dispatching can ONLY stop now
- shard_states->exit_out_dispatching("exit-open", conn);
+ ctx.exit_out_dispatching("exit-open", conn);
return seastar::make_ready_future<stop_t>(stop_t::yes);
});
}
return frame_assembler->write<false>(
sweep_out_pending_msgs_to_sent(
need_keepalive, next_keepalive_ack, to_ack > 0)
- ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
- if (get_io_state() != io_state_t::open) {
+ ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack, &ctx] {
+ if (ctx.get_io_state() != io_state_t::open) {
return frame_assembler->flush<false>(
).then([] {
return seastar::make_ready_future<stop_t>(stop_t::no);
}
case io_state_t::delay:
// delay out dispatching until open
- shard_states->notify_out_dispatching_stopped("delay...", conn);
- return shard_states->wait_state_change(
+ ctx.notify_out_dispatching_stopped("delay...", conn);
+ return ctx.wait_state_change(
).then([] { return stop_t::no; });
case io_state_t::drop:
- shard_states->exit_out_dispatching("dropped", conn);
+ ctx.exit_out_dispatching("dropped", conn);
return seastar::make_ready_future<stop_t>(stop_t::yes);
default:
ceph_abort("impossible");
}
- }).handle_exception_type([this](const std::system_error& e) {
- auto io_state = get_io_state();
+ }).handle_exception_type([this, &ctx](const std::system_error& e) {
+ auto io_state = ctx.get_io_state();
if (e.code() != std::errc::broken_pipe &&
e.code() != std::errc::connection_reset &&
e.code() != error::negotiation_failure) {
conn, io_state, e.what());
}
- return do_out_dispatch();
+ return do_out_dispatch(ctx);
});
}
if (shard_states->try_enter_out_dispatching()) {
shard_states->dispatch_in_background(
"do_out_dispatch", conn, [this] {
- return do_out_dispatch();
+ return do_out_dispatch(*shard_states);
});
}
}
seastar::future<>
IOHandler::read_message(
+ shard_states_t &ctx,
utime_t throttle_stamp,
std::size_t msg_size)
{
return frame_assembler->read_frame_payload<false>(
- ).then([this, throttle_stamp, msg_size](auto payload) {
- if (unlikely(get_io_state() != io_state_t::open)) {
+ ).then([this, throttle_stamp, msg_size, &ctx](auto payload) {
+ if (unlikely(ctx.get_io_state() != io_state_t::open)) {
logger().debug("{} triggered {} during read_message()",
- conn, get_io_state());
+ conn, ctx.get_io_state());
abort_protocol();
}
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
+ assert(ctx.get_io_state() == io_state_t::open);
assert(get_io_state() == io_state_t::open);
ceph_assert_always(conn_ref);
// throttle the reading process by the returned future
{
shard_states->enter_in_dispatching();
shard_states->dispatch_in_background(
- "do_in_dispatch", conn, [this] {
- return seastar::keep_doing([this] {
+ "do_in_dispatch", conn, [this, &ctx=*shard_states] {
+ return seastar::keep_doing([this, &ctx] {
return frame_assembler->read_main_preamble<false>(
- ).then([this](auto ret) {
+ ).then([this, &ctx](auto ret) {
switch (ret.tag) {
case Tag::MESSAGE: {
size_t msg_size = get_msg_size(*ret.rx_frame_asm);
conn.policy.throttler_bytes->get_current(),
conn.policy.throttler_bytes->get_max());
return conn.policy.throttler_bytes->get(msg_size);
- }).then([this, msg_size] {
+ }).then([this, msg_size, &ctx] {
// TODO: throttle_dispatch_queue() logic
utime_t throttle_stamp{seastar::lowres_system_clock::now()};
- return read_message(throttle_stamp, msg_size);
+ return read_message(ctx, throttle_stamp, msg_size);
});
}
case Tag::ACK:
conn, keepalive_frame.timestamp());
// notify keepalive ack
next_keepalive_ack = keepalive_frame.timestamp();
- notify_out_dispatch();
+ if (seastar::this_shard_id() == get_shard_id()) {
+ notify_out_dispatch();
+ }
last_keepalive = seastar::lowres_system_clock::now();
});
}
}
});
- }).handle_exception([this](std::exception_ptr eptr) {
+ }).handle_exception([this, &ctx](std::exception_ptr eptr) {
const char *e_what;
try {
std::rethrow_exception(eptr);
e_what = e.what();
}
- auto io_state = get_io_state();
+ auto io_state = ctx.get_io_state();
if (io_state == io_state_t::open) {
logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
conn, io_state, e_what);
logger().info("{} do_in_dispatch(): fault at {} -- {}",
conn, io_state, e_what);
}
- }).finally([this] {
- shard_states->exit_in_dispatching();
+ }).finally([&ctx] {
+ ctx.exit_in_dispatching();
});
});
}