From 98741f3a077cb50b309a2104696e284aedad3ab2 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Wed, 7 Nov 2018 17:16:40 +0000 Subject: [PATCH] msg/async: msgr2: fault handling Signed-off-by: Ricardo Dias --- src/msg/async/ProtocolV2.cc | 148 +++++++++++++++++++++++++++++++++--- src/msg/async/ProtocolV2.h | 16 ++-- 2 files changed, 149 insertions(+), 15 deletions(-) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 8cdd9658e49..9323e71f2c8 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -56,6 +56,7 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) cookie(0), message_seq(0), can_write(false), + connect_seq(0), bannerExchangeCallback(nullptr), next_frame_len(0), keepalive(false) { @@ -112,12 +113,66 @@ void ProtocolV2::stop() { state = CLOSED; } -void ProtocolV2::fault() { +void ProtocolV2::fault() { _fault(); } + +void ProtocolV2::requeue_sent() { + if (sent.empty()) { + return; + } + + list > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; + out_seq -= sent.size(); + while (!sent.empty()) { + Message *m = sent.back(); + sent.pop_back(); + ldout(cct, 10) << __func__ << " " << *m << " for resend " + << " (" << m->get_seq() << ")" << dendl; + rq.push_front(make_pair(bufferlist(), m)); + } +} + +void ProtocolV2::reset_recv_state() { + // clean read and write callbacks + connection->pendingReadLen.reset(); + connection->writeCallback.reset(); + + uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + + current_header.data_len; + + if (state > THROTTLE_MESSAGE && state <= READ_MESSAGE_FRONT && + connection->policy.throttler_messages) { + ldout(cct, 10) << __func__ << " releasing " << 1 + << " message to policy throttler " + << connection->policy.throttler_messages->get_current() + << "/" << connection->policy.throttler_messages->get_max() + << dendl; + connection->policy.throttler_messages->put(); + } + if (state > THROTTLE_BYTES && state <= READ_MESSAGE_FRONT) { + if (connection->policy.throttler_bytes) { + ldout(cct, 10) << __func__ << " releasing " << cur_msg_size + << " bytes to policy throttler " + << connection->policy.throttler_bytes->get_current() << "/" + << connection->policy.throttler_bytes->get_max() << dendl; + connection->policy.throttler_bytes->put(cur_msg_size); + } + } + if (state > THROTTLE_DISPATCH_QUEUE && state <= READ_MESSAGE_FRONT) { + ldout(cct, 10) + << __func__ << " releasing " << cur_msg_size + << " bytes to dispatch_queue throttler " + << connection->dispatch_queue->dispatch_throttler.get_current() << "/" + << connection->dispatch_queue->dispatch_throttler.get_max() << dendl; + connection->dispatch_queue->dispatch_throttle_release(cur_msg_size); + } +} + +CtPtr ProtocolV2::_fault() { ldout(cct, 10) << __func__ << dendl; if (state == CLOSED || state == NONE) { ldout(cct, 10) << __func__ << " connection is already closed" << dendl; - return; + return nullptr; } if (connection->policy.lossy && state != START_CONNECT && @@ -125,8 +180,71 @@ void ProtocolV2::fault() { ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl; stop(); connection->dispatch_queue->queue_reset(connection); - return; + return nullptr; } + + connection->write_lock.lock(); + + can_write = false; + // requeue sent items + requeue_sent(); + + if (out_queue.empty() && state >= START_ACCEPT && + state <= ACCEPTED_CLIENT_IDENT) { + ldout(cct, 10) << __func__ << " with nothing to send and in the half " + << " accept state just closed" << dendl; + stop(); + connection->dispatch_queue->queue_reset(connection); + connection->write_lock.unlock(); + return nullptr; + } + + connection->fault(); + reset_recv_state(); + + if (connection->policy.standby && out_queue.empty() && state != WAIT) { + ldout(cct, 10) << __func__ << " with nothing to send, going to standby" + << dendl; + state = STANDBY; + connection->write_lock.unlock(); + return nullptr; + } + + connection->write_lock.unlock(); + + if (state != START_CONNECT && state != CONNECTING && state != WAIT) { + // policy maybe empty when state is in accept + if (connection->policy.server) { + ldout(cct, 0) << __func__ << " server, going to standby" << dendl; + state = STANDBY; + } else { + ldout(cct, 0) << __func__ << " initiating reconnect" << dendl; + connect_seq++; + state = START_CONNECT; + connection->state = AsyncConnection::STATE_CONNECTING; + } + backoff = utime_t(); + connection->center->dispatch_event_external(connection->read_handler); + } else { + if (state == WAIT) { + backoff.set_from_double(cct->_conf->ms_max_backoff); + } else if (backoff == utime_t()) { + backoff.set_from_double(cct->_conf->ms_initial_backoff); + } else { + backoff += backoff; + if (backoff > cct->_conf->ms_max_backoff) + backoff.set_from_double(cct->_conf->ms_max_backoff); + } + + state = START_CONNECT; + connection->state = AsyncConnection::STATE_CONNECTING; + ldout(cct, 10) << __func__ << " waiting " << backoff << dendl; + // woke up again; + connection->register_time_events.insert( + connection->center->create_time_event(backoff.to_nsec() / 1000, + connection->wakeup_handler)); + } + return nullptr; } void ProtocolV2::prepare_send_message(uint64_t features, Message *m, @@ -400,12 +518,11 @@ void ProtocolV2::write_event() { connection->write_lock.unlock(); connection->lock.lock(); connection->write_lock.lock(); - // if (state == STANDBY && !connection->policy.server && is_queued()) { - // ldout(cct, 10) << __func__ << " policy.server is false" << dendl; - // connection->_connect(); - // } else - if (connection->cs && state != NONE && state != CLOSED && - state != START_CONNECT) { + if (state == STANDBY && !connection->policy.server && is_queued()) { + ldout(cct, 10) << __func__ << " policy.server is false" << dendl; + connection->_connect(); + } else if (connection->cs && state != NONE && state != CLOSED && + state != START_CONNECT) { r = connection->_try_send(); if (r < 0) { ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl; @@ -530,6 +647,15 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); if (connection->get_peer_type() == -1) { connection->set_peer_type(peer_type); + + ceph_assert(state == ACCEPTING); + connection->policy = messenger->get_policy(peer_type); + ldout(cct, 10) << __func__ << " accept of host_type " << peer_type + << ", policy.lossy=" << connection->policy.lossy + << " policy.server=" << connection->policy.server + << " policy.standby=" << connection->policy.standby + << " policy.resetcheck=" << connection->policy.resetcheck + << dendl; } else { if (connection->get_peer_type() != peer_type) { ldout(cct, 1) << __func__ << " connection peer type does not match what" @@ -1518,6 +1644,8 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { return WRITE(bl, handle_ident_missing_features_write); } + state = ACCEPTED_CLIENT_IDENT; + // if everything is OK reply with server identification connection->peer_global_id = client_ident.gid; cookie = client_ident.cookie; @@ -1555,7 +1683,7 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { connection->inject_delay(); return _fault(); } - if (state != ACCEPTING) { + if (state != ACCEPTED_CLIENT_IDENT) { ldout(cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down" << dendl; diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 7de850756d0..616239d4fd9 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -14,12 +14,15 @@ private: CONNECTING, START_ACCEPT, ACCEPTING, + ACCEPTED_CLIENT_IDENT, READY, THROTTLE_MESSAGE, THROTTLE_BYTES, THROTTLE_DISPATCH_QUEUE, READ_MESSAGE_FRONT, READ_MESSAGE_COMPLETE, + STANDBY, + WAIT, CLOSED }; @@ -29,16 +32,20 @@ private: "CONNECTING", "START_ACCEPT", "ACCEPTING", + "ACCEPTED_CLIENT_IDENT", "READY", "THROTTLE_MESSAGE", "THROTTLE_BYTES", "THROTTLE_DISPATCH_QUEUE", "READ_MESSAGE_FRONT", "READ_MESSAGE_COMPLETE", + "STANDBY", + "WAIT", "CLOSED"}; return statenames[state]; } +public: enum class Tag : uint32_t { AUTH_REQUEST, AUTH_BAD_METHOD, @@ -315,6 +322,7 @@ private: bool can_write; std::map>> out_queue; std::list sent; + __u32 connect_seq; std::atomic out_seq{0}; std::atomic in_seq{0}; std::atomic ack_left{0}; @@ -342,11 +350,9 @@ private: Ct *write(CONTINUATION_PARAM(next, ProtocolV2, int), bufferlist &bl); - inline Ct *_fault() { - fault(); - return nullptr; - } - + void requeue_sent(); + void reset_recv_state(); + Ct *_fault(); void discard_out_queue(); void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); Message *_get_next_outgoing(bufferlist *bl); -- 2.39.5