IOHandler::~IOHandler()
{
+ // close_io() must be finished
ceph_assert(gate.is_closed());
assert(!out_exit_dispatching);
+ assert(!conn_ref);
}
ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
seastar::future<> IOHandler::send(MessageFRef msg)
{
- ceph_assert_always(seastar::this_shard_id() == sid);
+ if (seastar::this_shard_id() == sid) {
+ return do_send(std::move(msg));
+ } else {
+ return seastar::smp::submit_to(
+ sid, [this, msg=std::move(msg)]() mutable {
+ return do_send(std::move(msg));
+ });
+ }
+}
+
+seastar::future<> IOHandler::do_send(MessageFRef msg)
+{
+ assert(seastar::this_shard_id() == sid);
if (io_state != io_state_t::drop) {
out_pending_msgs.push_back(std::move(msg));
notify_out_dispatch();
seastar::future<> IOHandler::send_keepalive()
{
- ceph_assert_always(seastar::this_shard_id() == sid);
+ if (seastar::this_shard_id() == sid) {
+ return do_send_keepalive();
+ } else {
+ return seastar::smp::submit_to(
+ sid, [this] {
+ return do_send_keepalive();
+ });
+ }
+}
+
+seastar::future<> IOHandler::do_send_keepalive()
+{
+ assert(seastar::this_shard_id() == sid);
if (!need_keepalive) {
need_keepalive = true;
notify_out_dispatch();
FrameAssemblerV2Ref fa,
bool set_notify_out)
{
+ auto prv_state = io_state;
ceph_assert_always(!(
- (new_state == io_state_t::none && io_state != io_state_t::none) ||
- (new_state == io_state_t::open && io_state == io_state_t::open) ||
- (new_state != io_state_t::drop && io_state == io_state_t::drop)
+ (new_state == io_state_t::none && prv_state != io_state_t::none) ||
+ (new_state == io_state_t::open && prv_state == io_state_t::open) ||
+ (new_state != io_state_t::drop && prv_state == io_state_t::drop)
));
bool dispatch_in = false;
conn.get_local_shared_foreign_from_this());
}
#endif
- } else if (io_state == io_state_t::open) {
+ } else if (prv_state == io_state_t::open) {
// from open
ceph_assert_always(protocol_is_connected == true);
protocol_is_connected = false;
need_notify_out = false;
}
- if (io_state != new_state) {
+ // FIXME: simplify and drop the prv_state == new_state case
+ if (prv_state != new_state) {
io_state = new_state;
io_state_changed.set_value();
io_state_changed = seastar::promise<>();
}
}()
).discard_result().then([this] {
+ ceph_assert_always(frame_assembler != nullptr);
+ ceph_assert_always(!frame_assembler->is_socket_valid());
return exit_dispatching_ret{
std::move(frame_assembler),
get_states()};
// protocol_is_connected can be from true to true here if the replacing is
// happening to a connected connection.
protocol_is_connected = true;
+ ceph_assert_always(conn_ref);
dispatchers.ms_handle_accept(conn_ref);
}
}
ceph_assert_always(protocol_is_connected == false);
protocol_is_connected = true;
+ ceph_assert_always(conn_ref);
dispatchers.ms_handle_connect(conn_ref);
}
return;
}
need_dispatch_reset = false;
+ ceph_assert_always(conn_ref);
dispatchers.ms_handle_reset(conn_ref, is_replace);
}
if (io_state == io_state_t::drop) {
return;
}
+ ceph_assert_always(conn_ref);
dispatchers.ms_handle_remote_reset(conn_ref);
}
}
}
-seastar::future<stop_t> IOHandler::try_exit_out_dispatch() {
- assert(!is_out_queued());
- return frame_assembler->flush<false>(
- ).then([this] {
- if (!is_out_queued()) {
- // still nothing pending to send after flush,
- // the dispatching can ONLY stop now
- ceph_assert(out_dispatching);
- out_dispatching = false;
- if (unlikely(out_exit_dispatching.has_value())) {
- out_exit_dispatching->set_value();
- out_exit_dispatching = std::nullopt;
- logger().info("{} do_out_dispatch: nothing queued at {},"
- " set out_exit_dispatching",
- conn, io_state);
- }
- return seastar::make_ready_future<stop_t>(stop_t::yes);
- } else {
- // something is pending to send during flushing
- return seastar::make_ready_future<stop_t>(stop_t::no);
- }
- });
-}
-
seastar::future<> IOHandler::do_out_dispatch()
{
return seastar::repeat([this] {
switch (io_state) {
case io_state_t::open: {
- bool still_queued = is_out_queued();
- if (unlikely(!still_queued)) {
- return try_exit_out_dispatch();
+ if (unlikely(!is_out_queued())) {
+ // try exit open dispatching
+ return frame_assembler->flush<false>(
+ ).then([this] {
+ if (io_state != io_state_t::open || is_out_queued()) {
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ // still nothing pending to send after flush,
+ // open dispatching can ONLY stop now
+ ceph_assert(out_dispatching);
+ out_dispatching = false;
+ if (unlikely(out_exit_dispatching.has_value())) {
+ out_exit_dispatching->set_value();
+ out_exit_dispatching = std::nullopt;
+ logger().info("{} do_out_dispatch: nothing queued at {},"
+ " set out_exit_dispatching",
+ conn, io_state);
+ }
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ });
}
+
auto to_ack = ack_left;
assert(to_ack == 0 || in_seq > 0);
return frame_assembler->write<false>(
sweep_out_pending_msgs_to_sent(
need_keepalive, next_keepalive_ack, to_ack > 0)
).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
+ if (io_state != io_state_t::open) {
+ return frame_assembler->flush<false>(
+ ).then([] {
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ });
+ }
+
need_keepalive = false;
if (next_keepalive_ack == prv_keepalive_ack) {
next_keepalive_ack = std::nullopt;
}
assert(ack_left >= to_ack);
ack_left -= to_ack;
- if (!is_out_queued()) {
- return try_exit_out_dispatch();
- } else {
- // messages were enqueued during socket write
- return seastar::make_ready_future<stop_t>(stop_t::no);
- }
+
+ // FIXME: may leak a flush if state is changed after return and before
+ // the next repeat body.
+ return seastar::make_ready_future<stop_t>(stop_t::no);
});
}
case io_state_t::delay:
}
return seastar::make_ready_future<stop_t>(stop_t::yes);
default:
- ceph_assert(false);
+ ceph_abort("impossible");
}
- }).handle_exception_type([this] (const std::system_error& e) {
+ }).handle_exception_type([this](const std::system_error& e) {
if (e.code() != std::errc::broken_pipe &&
e.code() != std::errc::connection_reset &&
e.code() != error::negotiation_failure) {
void IOHandler::notify_out_dispatch()
{
+ assert(is_out_queued());
if (need_notify_out) {
handshake_listener->notify_out();
}
// already dispatching
return;
}
- out_dispatching = true;
+
switch (io_state) {
- case io_state_t::open:
- [[fallthrough]];
- case io_state_t::delay:
+ case io_state_t::open:
+ [[fallthrough]];
+ case io_state_t::delay:
+ out_dispatching = true;
assert(!gate.is_closed());
gate.dispatch_in_background("do_out_dispatch", conn, [this] {
return do_out_dispatch();
});
return;
- case io_state_t::drop:
- out_dispatching = false;
+ case io_state_t::drop:
+ // do not dispatch out
return;
- default:
- ceph_assert(false);
+ default:
+ ceph_abort("impossible");
}
}
seastar::future<>
-IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
+IOHandler::read_message(
+ utime_t throttle_stamp,
+ std::size_t msg_size)
{
return frame_assembler->read_frame_payload<false>(
).then([this, throttle_stamp, msg_size](auto payload) {
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
assert(io_state == io_state_t::open);
+ ceph_assert_always(conn_ref);
// throttle the reading process by the returned future
return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
});
{
ceph_assert_always(!in_exit_dispatching.has_value());
in_exit_dispatching = seastar::promise<>();
- gate.dispatch_in_background("do_in_dispatch", conn, [this] {
+ gate.dispatch_in_background(
+ "do_in_dispatch", conn, [this] {
return seastar::keep_doing([this] {
return frame_assembler->read_main_preamble<false>(
).then([this](auto ret) {
return seastar::now();
}
// TODO: message throttler
- ceph_assert(false);
+ ceph_abort("TODO");
return seastar::now();
}).then([this, msg_size] {
// throttle_bytes() logic
});
}
+seastar::future<>
+IOHandler::close_io(bool is_dispatch_reset, bool is_replace)
+{
+ ceph_assert_always(io_state == io_state_t::drop);
+
+ if (is_dispatch_reset) {
+ dispatch_reset(is_replace);
+ }
+
+ ceph_assert_always(conn_ref);
+ conn_ref.reset();
+
+ assert(!gate.is_closed());
+ return gate.close();
+}
+
} // namespace crimson::net