From d75c9e884a9f00d6578053bc03b4a420ee2c04b4 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 8 Aug 2019 16:56:30 +0800 Subject: [PATCH] crimson/net: WAIT state and backoff for client Client goes to WAIT state when it is delayed to reconnect, or wants to be replaced by a newly established socket. Signed-off-by: Yingxin Cheng --- src/crimson/net/Config.h | 4 +-- src/crimson/net/ProtocolV1.cc | 16 +++++----- src/crimson/net/ProtocolV2.cc | 56 +++++++++++++++++++++++++++++------ src/crimson/net/ProtocolV2.h | 26 ++++++++++++++-- 4 files changed, 79 insertions(+), 23 deletions(-) diff --git a/src/crimson/net/Config.h b/src/crimson/net/Config.h index 90929bdedb251..6b040a1981272 100644 --- a/src/crimson/net/Config.h +++ b/src/crimson/net/Config.h @@ -18,8 +18,8 @@ constexpr struct simple_md_config_t { bool cephx_service_require_signatures = false; bool ms_die_on_old_message = true; bool ms_die_on_skipped_message = true; - std::chrono::milliseconds ms_initial_backoff = 200ms; - std::chrono::milliseconds ms_max_backoff = 15000ms; + double ms_initial_backoff = .2; + double ms_max_backoff = 15.0; std::chrono::milliseconds threadpool_empty_queue_max_wait = 100ms; size_t osd_client_message_size_cap = 500ULL << 20; } conf; diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 9e2ba07c9b5ec..7aca87fd0f36a 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -958,15 +958,13 @@ seastar::future<> ProtocolV1::fault() messenger.unregister_conn(seastar::static_pointer_cast( conn.shared_from_this())); } - if (h.backoff.count()) { - h.backoff += h.backoff; - } else { - h.backoff = conf.ms_initial_backoff; - } - if (h.backoff > conf.ms_max_backoff) { - h.backoff = conf.ms_max_backoff; - } - return seastar::sleep(h.backoff); + // XXX: we decided not to support lossless connection in v1. as the + // client's default policy is + // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is + // lossy. And by the time of crimson-osd's GA, the in-cluster communication + // will all be performed using v2 protocol. + ceph_abort("lossless policy not supported for v1"); + return seastar::now(); } } // namespace ceph::net diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index a39df77489a1b..9474562e1387c 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -127,10 +127,28 @@ inline ostream& operator<<( namespace ceph::net { +seastar::future<> ProtocolV2::Timer::backoff(double seconds) +{ + logger().warn("{} waiting {} seconds ...", conn, seconds); + cancel(); + last_dur_ = seconds; + as = seastar::abort_source(); + auto dur = std::chrono::duration_cast( + std::chrono::duration(seconds)); + return seastar::sleep_abortable(dur, *as + ).handle_exception_type([this] (const seastar::sleep_aborted& e) { + logger().warn("{} wait aborted", conn); + abort_protocol(); + }); +} + ProtocolV2::ProtocolV2(Dispatcher& dispatcher, SocketConnection& conn, SocketMessenger& messenger) - : Protocol(proto_t::v2, dispatcher, conn), messenger{messenger} {} + : Protocol(proto_t::v2, dispatcher, conn), + messenger{messenger}, + protocol_timer{conn} +{} ProtocolV2::~ProtocolV2() {} @@ -741,7 +759,6 @@ ProtocolV2::client_connect() connect_seq = 0; server_cookie = 0; } - // TODO: backoff = utime_t(); return dispatcher.ms_handle_connect( seastar::static_pointer_cast(conn.shared_from_this())) @@ -817,8 +834,6 @@ ProtocolV2::client_reconnect() logger().debug("{} GOT ReconnectOkFrame: msg_seq={}", conn, reconnect_ok.msg_seq()); requeue_up_to(reconnect_ok.msg_seq()); - // TODO - // backoff = utime_t(); return dispatcher.ms_handle_connect( seastar::static_pointer_cast( conn.shared_from_this())) @@ -932,7 +947,7 @@ void ProtocolV2::execute_connecting() break; } case next_step_t::wait: { - execute_wait(); + execute_wait(true); break; } default: { @@ -1684,6 +1699,7 @@ void ProtocolV2::execute_ready() { trigger_state(state_t::READY, write_state_t::open, false); execution_done = seastar::with_gate(pending_dispatch, [this] { + protocol_timer.cancel(); return seastar::keep_doing([this] { return read_main_preamble() .then([this] (Tag tag) { @@ -1774,11 +1790,31 @@ void ProtocolV2::notify_write() // WAIT state -void ProtocolV2::execute_wait() +void ProtocolV2::execute_wait(bool max_backoff) { - // TODO not implemented - // trigger_state(state_t::WAIT, write_state_t::delay, false); - ceph_assert(false); + trigger_state(state_t::WAIT, write_state_t::delay, true); + if (socket) { + socket->shutdown(); + } + execution_done = seastar::with_gate(pending_dispatch, + [this, max_backoff] { + double backoff = protocol_timer.last_dur(); + if (max_backoff) { + backoff = conf.ms_max_backoff; + } else if (backoff > 0) { + backoff = std::min(conf.ms_max_backoff, 2 * backoff); + } else { + backoff = conf.ms_initial_backoff; + } + return protocol_timer.backoff(backoff).then([this] { + execute_connecting(); + }).handle_exception([this] (std::exception_ptr eptr) { + logger().debug("{} execute_wait(): got exception {} at state {}, abort", + conn, eptr, get_state_name(state)); + assert(state == state_t::REPLACING || + state == state_t::CLOSING); + }); + }); } // SERVER_WAIT state @@ -1815,6 +1851,8 @@ void ProtocolV2::trigger_close() ceph_assert(false); } + protocol_timer.cancel(); + if (!socket) { ceph_assert(state == state_t::CONNECTING); } diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 509b18c77de86..179299f7298d9 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -3,6 +3,8 @@ #pragma once +#include + #include "Protocol.h" #include "msg/async/frames_v2.h" #include "msg/async/crypto_onwire.h" @@ -44,7 +46,7 @@ class ProtocolV2 final : public Protocol { CONNECTING, READY, STANDBY, - WAIT, // ? CLIENT_WAIT + WAIT, REPLACING, // ? CLOSING }; @@ -57,7 +59,7 @@ class ProtocolV2 final : public Protocol { "CONNECTING", "READY", "STANDBY", - "WAIT", // ? CLIENT_WAIT + "WAIT", "REPLACING", // ? "CLOSING"}; return statenames[static_cast(state)]; @@ -76,6 +78,24 @@ class ProtocolV2 final : public Protocol { seastar::future<> execution_done = seastar::now(); + class Timer { + double last_dur_ = 0.0; + const SocketConnection& conn; + std::optional as; + public: + Timer(SocketConnection& conn) : conn(conn) {} + double last_dur() const { return last_dur_; } + seastar::future<> backoff(double seconds); + void cancel() { + last_dur_ = 0.0; + if (as) { + as->request_abort(); + as = std::nullopt; + } + } + }; + Timer protocol_timer; + // TODO: Frame related implementations, probably to a separate class. private: bool record_io = false; @@ -160,7 +180,7 @@ class ProtocolV2 final : public Protocol { void execute_standby(); // WAIT - void execute_wait(); + void execute_wait(bool max_backoff); // SERVER_WAIT void execute_server_wait(); -- 2.39.5