IOHandler::IOHandler(ChainedDispatchers &dispatchers,
SocketConnection &conn)
- : sid(seastar::this_shard_id()),
+ : shard_states(shard_states_t::create(
+ seastar::this_shard_id(), io_state_t::none)),
dispatchers(dispatchers),
conn(conn),
conn_ref(conn.get_local_shared_foreign_from_this())
IOHandler::~IOHandler()
{
// close_io() must be finished
- ceph_assert(gate.is_closed());
- assert(!out_exit_dispatching);
+ ceph_assert_always(shard_states->assert_closed_and_exit());
assert(!conn_ref);
}
seastar::future<> IOHandler::send(MessageFRef msg)
{
- if (seastar::this_shard_id() == sid) {
+ if (seastar::this_shard_id() == get_shard_id()) {
return do_send(std::move(msg));
} else {
return seastar::smp::submit_to(
- sid, [this, msg=std::move(msg)]() mutable {
+ get_shard_id(), [this, msg=std::move(msg)]() mutable {
return do_send(std::move(msg));
});
}
seastar::future<> IOHandler::do_send(MessageFRef msg)
{
- assert(seastar::this_shard_id() == sid);
- if (io_state != io_state_t::drop) {
+ assert(seastar::this_shard_id() == get_shard_id());
+ if (get_io_state() != io_state_t::drop) {
out_pending_msgs.push_back(std::move(msg));
notify_out_dispatch();
}
seastar::future<> IOHandler::send_keepalive()
{
- if (seastar::this_shard_id() == sid) {
+ if (seastar::this_shard_id() == get_shard_id()) {
return do_send_keepalive();
} else {
return seastar::smp::submit_to(
- sid, [this] {
+ get_shard_id(), [this] {
return do_send_keepalive();
});
}
seastar::future<> IOHandler::do_send_keepalive()
{
- assert(seastar::this_shard_id() == sid);
+ assert(seastar::this_shard_id() == get_shard_id());
if (!need_keepalive) {
need_keepalive = true;
notify_out_dispatch();
void IOHandler::mark_down()
{
- ceph_assert_always(seastar::this_shard_id() == sid);
- ceph_assert_always(io_state != io_state_t::none);
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(get_io_state() != io_state_t::none);
need_dispatch_reset = false;
- if (io_state == io_state_t::drop) {
+ if (get_io_state() == io_state_t::drop) {
return;
}
void IOHandler::print_io_stat(std::ostream &out) const
{
+ assert(seastar::this_shard_id() == get_shard_id());
out << "io_stat("
- << "io_state=" << fmt::format("{}", io_state)
+ << "io_state=" << fmt::format("{}", get_io_state())
<< ", in_seq=" << in_seq
<< ", out_seq=" << out_seq
<< ", out_pending_msgs_size=" << out_pending_msgs.size()
FrameAssemblerV2Ref fa,
bool set_notify_out)
{
- auto prv_state = io_state;
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ auto prv_state = get_io_state();
ceph_assert_always(!(
(new_state == io_state_t::none && prv_state != io_state_t::none) ||
(new_state == io_state_t::open && prv_state == io_state_t::open) ||
assert(fa == nullptr);
ceph_assert_always(frame_assembler->is_socket_valid());
frame_assembler->shutdown_socket<false>(nullptr);
- if (out_dispatching) {
- ceph_assert_always(!out_exit_dispatching.has_value());
- out_exit_dispatching = seastar::promise<>();
- }
} else {
assert(fa == nullptr);
}
// FIXME: simplify and drop the prv_state == new_state case
if (prv_state != new_state) {
- io_state = new_state;
- io_state_changed.set_value();
- io_state_changed = seastar::promise<>();
+ shard_states->set_io_state(new_state);
}
/*
seastar::future<IOHandler::exit_dispatching_ret>
IOHandler::wait_io_exit_dispatching()
{
- ceph_assert_always(io_state != io_state_t::open);
+ ceph_assert_always(get_io_state() != io_state_t::open);
ceph_assert_always(frame_assembler != nullptr);
ceph_assert_always(!frame_assembler->is_socket_valid());
- return seastar::when_all(
- [this] {
- if (out_exit_dispatching) {
- return out_exit_dispatching->get_future();
- } else {
- return seastar::now();
- }
- }(),
- [this] {
- if (in_exit_dispatching) {
- return in_exit_dispatching->get_future();
- } else {
- return seastar::now();
- }
- }()
- ).discard_result().then([this] {
+ return shard_states->wait_io_exit_dispatching(
+ ).then([this] {
ceph_assert_always(frame_assembler != nullptr);
ceph_assert_always(!frame_assembler->is_socket_valid());
return exit_dispatching_ret{
void IOHandler::reset_session(bool full)
{
- assert(io_state != io_state_t::open);
+ assert(get_io_state() != io_state_t::open);
reset_in();
if (full) {
reset_out();
void IOHandler::reset_peer_state()
{
- assert(io_state != io_state_t::open);
+ assert(get_io_state() != io_state_t::open);
reset_in();
requeue_out_sent_up_to(0);
discard_out_sent();
void IOHandler::requeue_out_sent()
{
- assert(io_state != io_state_t::open);
+ assert(get_io_state() != io_state_t::open);
if (out_sent_msgs.empty()) {
return;
}
void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
{
- assert(io_state != io_state_t::open);
+ assert(get_io_state() != io_state_t::open);
if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
conn, out_seq, seq);
void IOHandler::reset_in()
{
- assert(io_state != io_state_t::open);
+ assert(get_io_state() != io_state_t::open);
in_seq = 0;
}
void IOHandler::reset_out()
{
- assert(io_state != io_state_t::open);
+ assert(get_io_state() != io_state_t::open);
discard_out_sent();
out_pending_msgs.clear();
need_keepalive = false;
void IOHandler::discard_out_sent()
{
- assert(io_state != io_state_t::open);
+ assert(get_io_state() != io_state_t::open);
out_seq = 0;
out_sent_msgs.clear();
}
void IOHandler::dispatch_accept()
{
- if (io_state == io_state_t::drop) {
+ if (get_io_state() == io_state_t::drop) {
return;
}
// protocol_is_connected can be from true to true here if the replacing is
void IOHandler::dispatch_connect()
{
- if (io_state == io_state_t::drop) {
+ if (get_io_state() == io_state_t::drop) {
return;
}
ceph_assert_always(protocol_is_connected == false);
void IOHandler::dispatch_reset(bool is_replace)
{
- ceph_assert_always(io_state == io_state_t::drop);
+ ceph_assert_always(get_io_state() == io_state_t::drop);
if (!need_dispatch_reset) {
return;
}
void IOHandler::dispatch_remote_reset()
{
- if (io_state == io_state_t::drop) {
+ if (get_io_state() == io_state_t::drop) {
return;
}
ceph_assert_always(conn_ref);
seastar::future<> IOHandler::do_out_dispatch()
{
return seastar::repeat([this] {
- switch (io_state) {
+ switch (get_io_state()) {
case io_state_t::open: {
if (unlikely(!is_out_queued())) {
// try exit open dispatching
return frame_assembler->flush<false>(
).then([this] {
- if (io_state != io_state_t::open || is_out_queued()) {
+ if (get_io_state() != io_state_t::open || is_out_queued()) {
return seastar::make_ready_future<stop_t>(stop_t::no);
}
// still nothing pending to send after flush,
// open dispatching can ONLY stop now
- ceph_assert(out_dispatching);
- out_dispatching = false;
- if (unlikely(out_exit_dispatching.has_value())) {
- out_exit_dispatching->set_value();
- out_exit_dispatching = std::nullopt;
- logger().info("{} do_out_dispatch: nothing queued at {},"
- " set out_exit_dispatching",
- conn, io_state);
- }
+ shard_states->exit_out_dispatching("exit-open", conn);
return seastar::make_ready_future<stop_t>(stop_t::yes);
});
}
sweep_out_pending_msgs_to_sent(
need_keepalive, next_keepalive_ack, to_ack > 0)
).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
- if (io_state != io_state_t::open) {
+ if (get_io_state() != io_state_t::open) {
return frame_assembler->flush<false>(
).then([] {
return seastar::make_ready_future<stop_t>(stop_t::no);
}
case io_state_t::delay:
// delay out dispatching until open
- if (out_exit_dispatching) {
- out_exit_dispatching->set_value();
- out_exit_dispatching = std::nullopt;
- logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
- } else {
- logger().info("{} do_out_dispatch: delay ...", conn);
- }
- return io_state_changed.get_future(
+ shard_states->notify_out_dispatching_stopped("delay...", conn);
+ return shard_states->wait_state_change(
).then([] { return stop_t::no; });
case io_state_t::drop:
- ceph_assert(out_dispatching);
- out_dispatching = false;
- if (out_exit_dispatching) {
- out_exit_dispatching->set_value();
- out_exit_dispatching = std::nullopt;
- logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
- } else {
- logger().info("{} do_out_dispatch: dropped", conn);
- }
+ shard_states->exit_out_dispatching("dropped", conn);
return seastar::make_ready_future<stop_t>(stop_t::yes);
default:
ceph_abort("impossible");
}
}).handle_exception_type([this](const std::system_error& e) {
+ auto io_state = get_io_state();
if (e.code() != std::errc::broken_pipe &&
e.code() != std::errc::connection_reset &&
e.code() != error::negotiation_failure) {
if (need_notify_out) {
handshake_listener->notify_out();
}
- if (out_dispatching) {
- // already dispatching
- return;
- }
-
- switch (io_state) {
- case io_state_t::open:
- [[fallthrough]];
- case io_state_t::delay:
- out_dispatching = true;
- assert(!gate.is_closed());
- gate.dispatch_in_background("do_out_dispatch", conn, [this] {
+ if (shard_states->try_enter_out_dispatching()) {
+ shard_states->dispatch_in_background(
+ "do_out_dispatch", conn, [this] {
return do_out_dispatch();
});
- return;
- case io_state_t::drop:
- // do not dispatch out
- return;
- default:
- ceph_abort("impossible");
}
}
{
return frame_assembler->read_frame_payload<false>(
).then([this, throttle_stamp, msg_size](auto payload) {
- if (unlikely(io_state != io_state_t::open)) {
+ if (unlikely(get_io_state() != io_state_t::open)) {
logger().debug("{} triggered {} during read_message()",
- conn, io_state);
+ conn, get_io_state());
abort_protocol();
}
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
- assert(io_state == io_state_t::open);
+ assert(get_io_state() == io_state_t::open);
ceph_assert_always(conn_ref);
// throttle the reading process by the returned future
return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
void IOHandler::do_in_dispatch()
{
- ceph_assert_always(!in_exit_dispatching.has_value());
- in_exit_dispatching = seastar::promise<>();
- gate.dispatch_in_background(
+ shard_states->enter_in_dispatching();
+ shard_states->dispatch_in_background(
"do_in_dispatch", conn, [this] {
return seastar::keep_doing([this] {
return frame_assembler->read_main_preamble<false>(
e_what = e.what();
}
+ auto io_state = get_io_state();
if (io_state == io_state_t::open) {
logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
conn, io_state, e_what);
conn, io_state, e_what);
}
}).finally([this] {
- ceph_assert_always(in_exit_dispatching.has_value());
- in_exit_dispatching->set_value();
- in_exit_dispatching = std::nullopt;
+ shard_states->exit_in_dispatching();
});
});
}
seastar::future<>
IOHandler::close_io(bool is_dispatch_reset, bool is_replace)
{
- ceph_assert_always(io_state == io_state_t::drop);
+ ceph_assert_always(get_io_state() == io_state_t::drop);
if (is_dispatch_reset) {
dispatch_reset(is_replace);
ceph_assert_always(conn_ref);
conn_ref.reset();
+ return shard_states->close(
+ ).then([this] {
+ assert(shard_states->assert_closed_and_exit());
+ });
+}
+
+/*
+ * IOHandler::shard_states_t
+ */
+
+void
+IOHandler::shard_states_t::notify_out_dispatching_stopped(
+ const char *what, SocketConnection &conn)
+{
+ assert(seastar::this_shard_id() == sid);
+ if (unlikely(out_exit_dispatching.has_value())) {
+ out_exit_dispatching->set_value();
+ out_exit_dispatching = std::nullopt;
+ logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching",
+ conn, what, io_state);
+ } else {
+ if (unlikely(io_state != io_state_t::open)) {
+ logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching",
+ conn, what, io_state);
+ }
+ }
+}
+
+seastar::future<>
+IOHandler::shard_states_t::wait_io_exit_dispatching()
+{
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != io_state_t::open);
assert(!gate.is_closed());
- return gate.close();
+ return seastar::when_all(
+ [this] {
+ if (out_exit_dispatching) {
+ return out_exit_dispatching->get_future();
+ } else {
+ return seastar::now();
+ }
+ }(),
+ [this] {
+ if (in_exit_dispatching) {
+ return in_exit_dispatching->get_future();
+ } else {
+ return seastar::now();
+ }
+ }()
+ ).discard_result();
}
} // namespace crimson::net
*/
private:
seastar::shard_id get_shard_id() const final {
- return sid;
+ return shard_states->get_shard_id();
}
bool is_connected() const final {
- ceph_assert_always(seastar::this_shard_id() == sid);
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
return protocol_is_connected;
}
seastar::future<> send_keepalive() final;
clock_t::time_point get_last_keepalive() const final {
- ceph_assert_always(seastar::this_shard_id() == sid);
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
return last_keepalive;
}
clock_t::time_point get_last_keepalive_ack() const final {
- ceph_assert_always(seastar::this_shard_id() == sid);
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
return last_keepalive_ack;
}
void set_last_keepalive_ack(clock_t::time_point when) final {
- ceph_assert_always(seastar::this_shard_id() == sid);
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
last_keepalive_ack = when;
}
void dispatch_connect();
private:
+ class shard_states_t;
+ using shard_states_ref_t = std::unique_ptr<shard_states_t>;
+
+ class shard_states_t {
+ public:
+ shard_states_t(seastar::shard_id _sid, io_state_t state)
+ : sid{_sid}, io_state{state} {}
+
+ seastar::shard_id get_shard_id() const {
+ return sid;
+ }
+
+ io_state_t get_io_state() const {
+ assert(seastar::this_shard_id() == sid);
+ return io_state;
+ }
+
+ void set_io_state(io_state_t new_state) {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != new_state);
+ pr_io_state_changed.set_value();
+ pr_io_state_changed = seastar::promise<>();
+ if (io_state == io_state_t::open) {
+ // from open
+ if (out_dispatching) {
+ ceph_assert_always(!out_exit_dispatching.has_value());
+ out_exit_dispatching = seastar::promise<>();
+ }
+ }
+ io_state = new_state;
+ }
+
+ seastar::future<> wait_state_change() {
+ assert(seastar::this_shard_id() == sid);
+ return pr_io_state_changed.get_future();
+ }
+
+ template <typename Func>
+ void dispatch_in_background(
+ const char *what, SocketConnection &who, Func &&func) {
+ assert(seastar::this_shard_id() == sid);
+ ceph_assert_always(!gate.is_closed());
+ gate.dispatch_in_background(what, who, std::move(func));
+ }
+
+ void enter_in_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state == io_state_t::open);
+ ceph_assert_always(!in_exit_dispatching.has_value());
+ in_exit_dispatching = seastar::promise<>();
+ }
+
+ void exit_in_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ assert(io_state != io_state_t::open);
+ ceph_assert_always(in_exit_dispatching.has_value());
+ in_exit_dispatching->set_value();
+ in_exit_dispatching = std::nullopt;
+ }
+
+ bool try_enter_out_dispatching() {
+ assert(seastar::this_shard_id() == sid);
+ if (out_dispatching) {
+ // already dispatching out
+ return false;
+ }
+ switch (io_state) {
+ case io_state_t::open:
+ [[fallthrough]];
+ case io_state_t::delay:
+ out_dispatching = true;
+ return true;
+ case io_state_t::drop:
+ // do not dispatch out
+ return false;
+ default:
+ ceph_abort("impossible");
+ }
+ }
+
+ void notify_out_dispatching_stopped(
+ const char *what, SocketConnection &conn);
+
+ void exit_out_dispatching(
+ const char *what, SocketConnection &conn) {
+ assert(seastar::this_shard_id() == sid);
+ ceph_assert_always(out_dispatching);
+ out_dispatching = false;
+ notify_out_dispatching_stopped(what, conn);
+ }
+
+ seastar::future<> wait_io_exit_dispatching();
+
+ seastar::future<> close() {
+ assert(seastar::this_shard_id() == sid);
+ assert(!gate.is_closed());
+ return gate.close();
+ }
+
+ bool assert_closed_and_exit() const {
+ assert(seastar::this_shard_id() == sid);
+ if (gate.is_closed()) {
+ ceph_assert_always(io_state == io_state_t::drop);
+ ceph_assert_always(!out_dispatching);
+ ceph_assert_always(!out_exit_dispatching);
+ ceph_assert_always(!in_exit_dispatching);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ static shard_states_ref_t create(
+ seastar::shard_id sid, io_state_t state) {
+ return std::make_unique<shard_states_t>(sid, state);
+ }
+
+ private:
+ const seastar::shard_id sid;
+ io_state_t io_state;
+
+ crimson::common::Gated gate;
+ seastar::promise<> pr_io_state_changed;
+ bool out_dispatching = false;
+ std::optional<seastar::promise<>> out_exit_dispatching;
+ std::optional<seastar::promise<>> in_exit_dispatching;
+ };
+
+ io_state_t get_io_state() const {
+ return shard_states->get_io_state();
+ }
+
seastar::future<> do_send(MessageFRef msg);
seastar::future<> do_send_keepalive();
void do_in_dispatch();
private:
- seastar::shard_id sid;
+ shard_states_ref_t shard_states;
ChainedDispatchers &dispatchers;
HandshakeListener *handshake_listener = nullptr;
- crimson::common::Gated gate;
-
FrameAssemblerV2Ref frame_assembler;
bool protocol_is_connected = false;
bool need_dispatch_reset = true;
- io_state_t io_state = io_state_t::none;
-
- // wait until current io_state changed
- seastar::promise<> io_state_changed;
-
/*
* out states for writing
*/
- bool out_dispatching = false;
-
- std::optional<seastar::promise<>> out_exit_dispatching;
-
/// the seq num of the last transmitted message
seq_num_t out_seq = 0;
* in states for reading
*/
- std::optional<seastar::promise<>> in_exit_dispatching;
-
/// the seq num of the last received message
seq_num_t in_seq = 0;