From e366e736ff7318e86b018545271f2bee01b64de6 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Thu, 28 Feb 2019 16:40:24 +0800 Subject: [PATCH] crimson/net: skeleton code for ProtocolV2 logic crimson ProtocolV2 class is following a state-machine design style: * states are defined in ProtocolV2::state_t; * call `execute_()` methods to trigger different states; * V2 logics are implemented in each execute_() methods, and with explicit transitions to other states at the end of the execute_*; * each state is associated with a write state defined in Protocol.h: - none: not allowed to send; - delay: messages can be queued, but will be delayed to send; - open: dispatch queued message/keepalive/ack; - drop: not send any messages, drop them all. crimson ProtocolV2 is alike async ProtocolV2, with some considerations: * explicit and encapsulated client/server handshake workflow. * futurized-exception-based fault handling, which can interrupt protocol workflow at any time in each state. * introduced SERVER_WAIT state, meaning to wait for peer-client's socket to reset or be replaced, and expect no further reads. * introduced an explicit REPLACING state, async-msgr would be at the NONE state during replacing. Signed-off-by: Yingxin Cheng --- src/crimson/CMakeLists.txt | 3 +- src/crimson/net/ProtocolV2.cc | 524 ++++++++++++++++++++++++++++ src/crimson/net/ProtocolV2.h | 120 +++++++ src/crimson/net/SocketConnection.cc | 4 +- src/crimson/net/SocketConnection.h | 1 + 5 files changed, 649 insertions(+), 3 deletions(-) create mode 100644 src/crimson/net/ProtocolV2.cc create mode 100644 src/crimson/net/ProtocolV2.h diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index cb82ba81c85..a895fe20e01 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -124,7 +124,8 @@ set(crimson_net_srcs net/SocketMessenger.cc net/Socket.cc net/Protocol.cc - net/ProtocolV1.cc) + net/ProtocolV1.cc + net/ProtocolV2.cc) set(crimson_thread_srcs thread/ThreadPool.cc thread/Throttle.cc) diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc new file mode 100644 index 00000000000..6e33929bbdc --- /dev/null +++ b/src/crimson/net/ProtocolV2.cc @@ -0,0 +1,524 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ProtocolV2.h" + +#include "include/msgr.h" + +#include "Dispatcher.h" +#include "Errors.h" +#include "Socket.h" +#include "SocketConnection.h" +#include "SocketMessenger.h" + +using namespace ceph::msgr::v2; + +namespace { + +seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); +} + +seastar::future<> abort_in_fault() { + throw std::system_error(make_error_code(ceph::net::error::negotiation_failure)); +} + +seastar::future<> abort_in_close() { + throw std::system_error(make_error_code(ceph::net::error::connection_aborted)); +} + +inline void expect_tag(const Tag& expected, + const Tag& actual, + ceph::net::SocketConnection& conn, + const char *where) { + if (actual != expected) { + logger().error("{} {} received wrong tag: {}, expected {}", + conn, where, + static_cast(actual), + static_cast(expected)); + abort_in_fault(); + } +} + +inline seastar::future<> unexpected_tag(const Tag& unexpected, + ceph::net::SocketConnection& conn, + const char *where) { + logger().error("{} {} received unexpected tag: {}", + conn, where, static_cast(unexpected)); + return abort_in_fault(); +} + +} // namespace anonymous + +namespace ceph::net { + +ProtocolV2::ProtocolV2(Dispatcher& dispatcher, + SocketConnection& conn, + SocketMessenger& messenger) + : Protocol(2, dispatcher, conn, messenger) {} + +ProtocolV2::~ProtocolV2() {} + +void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, + const entity_type_t& _peer_type) +{ + ceph_assert(state == state_t::NONE); + ceph_assert(!socket); + conn.peer_addr = _peer_addr; + conn.peer_type = _peer_type; + // TODO: lossless policy + conn.policy = SocketPolicy::lossy_client(0); + messenger.register_conn( + seastar::static_pointer_cast(conn.shared_from_this())); + execute_connecting(); +} + +void ProtocolV2::start_accept(SocketFRef&& sock, + const entity_addr_t& _peer_addr) +{ + ceph_assert(state == state_t::NONE); + ceph_assert(!socket); + socket = std::move(sock); + messenger.accept_conn( + seastar::static_pointer_cast(conn.shared_from_this())); + execute_accepting(); +} + +void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant) +{ + if (!reentrant && _state == state) { + logger().error("{} is not allowed to re-trigger state {}", + conn, get_state_name(state)); + ceph_assert(false); + } + logger().debug("{} trigger {}, was {}", + conn, get_state_name(_state), get_state_name(state)); + state = _state; + set_write_state(_write_state); +} + +seastar::future<> ProtocolV2::fault() +{ + logger().warn("{} fault during {}", + conn, get_state_name(state)); + // TODO: + // TODO: + // TODO: + close(); + return seastar::now(); +} + +seastar::future<> ProtocolV2::banner_exchange() +{ + // 1. + // 2. then: + // 3. then: + // 4. then: + // 5. then: + // 6. then: + return seastar::now(); +} + +// CONNECTING state + +seastar::future<> ProtocolV2::handle_auth_reply() +{ +//return read_main_preamble() +//.then([this] (Tag tag) { +// switch (tag) { +// case Tag::AUTH_BAD_METHOD: +// handle_auth_bad_method() logic +// return client_auth(bad_method.allowed_methods()); +// case Tag::AUTH_REPLY_MORE: +// +// +// return handle_auth_reply() +// case Tag::AUTH_DONE: +// +// + return seastar::now(); +// default: { +// return unexpected_tag(tag, conn, __func__); +// } +//}); +} + +seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods) +{ + // send_auth_request() logic + // + // + return handle_auth_reply(); +} + +seastar::future ProtocolV2::process_wait() +{ +//return read_frame_payload() +//.then([this] (bufferlist payload) { +// handle_wait() logic +// return false; +//}); + return seastar::make_ready_future(false); +} + +seastar::future ProtocolV2::client_connect() +{ + // send_client_ident() logic + // + // return read_main_preamble() + // .then([this] (Tag tag) { + // switch (tag) { + // case Tag::IDENT_MISSING_FEATURES: + // abort_in_fault(); + // case Tag::WAIT: + // return process_wait(); + // case Tag::SERVER_IDENT: + // + return seastar::make_ready_future(true); + // default: { + // unexpected_tag(tag, conn, "post_client_connect"); + // } + // } + // }); +} + +seastar::future ProtocolV2::client_reconnect() +{ + // send_reconnect() logic + // + // + // return read_main_preamble() + // .then([this] (Tag tag) { + // switch (tag) { + // case Tag::SESSION_RETRY_GLOBAL: + // + // return client_reconnect(); + // case Tag::SESSION_RETRY: + // + // return client_reconnect(); + // case Tag::SESSION_RESET: + // + // return client_connect(); + // case Tag::WAIT: + // return process_wait(); + // case Tag::SESSION_RECONNECT_OK: + // + return seastar::make_ready_future(true); + // default: { + // unexpected_tag(tag, conn, "post_client_reconnect"); + // } + // } + // }); +} + +void ProtocolV2::execute_connecting() +{ + trigger_state(state_t::CONNECTING, write_state_t::delay, true); + seastar::with_gate(pending_dispatch, [this] { + return Socket::connect(conn.peer_addr) + .then([this](SocketFRef sock) { + socket = std::move(sock); + if (state == state_t::CLOSING) { + return socket->close().then([this] { + logger().info("{} is closed during Socket::connect()", conn); + abort_in_fault(); + }); + } + return seastar::now(); + }).then([this] { + return banner_exchange(); + }).then([this] { + return client_auth(); + }).then([this] { + if (1) { // TODO check connect or reconnect + return client_connect(); + } else { + // TODO: lossless policy + ceph_assert(false); + return client_reconnect(); + } + }).then([this] (bool proceed_or_wait) { + if (proceed_or_wait) { + execute_ready(); + } else { + execute_wait(); + } + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in CONNECTING state + return fault(); + }); + }); +} + +// ACCEPTING state + +seastar::future<> ProtocolV2::_auth_bad_method(int r) +{ + // _auth_bad_method() logic + // + // + return server_auth(); +} + +seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) +{ + // _handle_auth_request() logic + // + // + // + return seastar::now(); + // + // + // + // return read_main_preamble() + // .then([this] (Tag tag) { + // expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__); + // + // + // return _handle_auth_request(auth_more.auth_payload(), true); + // }); + // + // return _auth_bad_method(); +} + +seastar::future<> ProtocolV2::server_auth() +{ + // return read_main_preamble() + // .then([this] (Tag tag) { + // expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__); + // + // + // return _auth_bad_method(); + // <...> + auto dummy_auth = bufferlist{}; + return _handle_auth_request(/*request.auth_payload()*/dummy_auth, false); + // }); +} + +seastar::future ProtocolV2::send_wait() +{ + // + // + return seastar::make_ready_future(false); +} + +seastar::future ProtocolV2::handle_existing_connection(SocketConnectionRef existing) +{ + // TODO: lossless policy + ceph_assert(false); +} + +seastar::future ProtocolV2::server_connect() +{ + // handle_client_ident() logic + // + // + // + // + // return seastar::make_ready_future(false); + // + // return handle_existing_connection(existing); + // + return send_server_ident() + .then([this] { + // goto ready + return true; + }); +} + +seastar::future ProtocolV2::read_reconnect() +{ + // return read_main_preamble() + // .then([this] (Tag tag) { + // expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect"); + return server_reconnect(); + // }); +} + +seastar::future ProtocolV2::send_retry(uint64_t connect_seq) +{ + // + // + return read_reconnect(); +} + +seastar::future ProtocolV2::send_retry_global(uint64_t global_seq) +{ + // + // + return read_reconnect(); +} + +seastar::future ProtocolV2::send_reset(bool full) +{ + // + // + // return read_main_preamble() + // .then([this] (Tag tag) { + // expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset"); + return server_connect(); + // }); +} + +seastar::future ProtocolV2::server_reconnect() +{ + // handle_reconnect() logic + // + // + return send_reset(0); + // + // return send_retry_global(); + // + // return send_wait(); + // + // return send_retry(); + // + // return send_reset(); + // TODO: lossless policy + // return reuse_connection(existing, exproto); +} + +void ProtocolV2::execute_accepting() +{ + trigger_state(state_t::ACCEPTING, write_state_t::none, false); + seastar::with_gate(pending_dispatch, [this] { + return banner_exchange() + .then([this] { + return server_auth(); + }).then([this] { + // return read_main_preamble() + //}).then([this] (Tag tag) { + // switch (tag) { + // case Tag::CLIENT_IDENT: + return server_connect(); + // case Tag::SESSION_RECONNECT: + // return server_reconnect(); + // default: { + // unexpected_tag(tag, conn, "post_server_auth"); + // } + // } + }).then([this] (bool proceed_or_wait) { + if (proceed_or_wait) { + messenger.register_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + messenger.unaccept_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + execute_ready(); + } else { + execute_server_wait(); + } + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in ACCEPTING state + return fault(); + }); + }); +} + +// ACCEPTING or REPLACING state + +seastar::future<> ProtocolV2::send_server_ident() +{ + // send_server_ident() logic + // + + return seastar::now(); +} + +// REPLACING state + +seastar::future<> ProtocolV2::send_reconnect_ok() +{ + // send_reconnect_ok() logic + // + + return seastar::now(); +} + +// READY state + +seastar::future<> ProtocolV2::write_message(MessageRef msg) +{ + // TODO not implemented + // + ceph_assert(false); +} + +seastar::future<> ProtocolV2::do_keepalive() +{ + // TODO not implemented + // + ceph_assert(false); +} + +seastar::future<> ProtocolV2::do_keepalive_ack() +{ + // TODO not implemented + // + ceph_assert(false); +} + +void ProtocolV2::execute_ready() +{ + // + // trigger_state(state_t::READY, write_state_t::open, false); + // TODO: schedule reading messages, AckFrame, KeepAliveFrame, KeepAliveAckFrame + state = state_t::READY; + logger().info("{} reaches READY state successfully.", conn); + close(); +} + +// STANDBY state + +void ProtocolV2::execute_standby() +{ + // TODO not implemented + // trigger_state(state_t::STANDBY, write_state_t::delay, false); + ceph_assert(false); +} + +// WAIT state + +void ProtocolV2::execute_wait() +{ + // TODO not implemented + // trigger_state(state_t::WAIT, write_state_t::delay, false); + ceph_assert(false); +} + +// SERVER_WAIT state + +void ProtocolV2::execute_server_wait() +{ + // TODO not implemented + // trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false); + ceph_assert(false); +} + +// CLOSING state + +void ProtocolV2::trigger_close() +{ + if (state == state_t::ACCEPTING) { + messenger.unaccept_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + } else if (state >= state_t::CONNECTING && state < state_t::CLOSING) { + messenger.unregister_conn( + seastar::static_pointer_cast( + conn.shared_from_this())); + } else { + // cannot happen + ceph_assert(false); + } + + if (!socket) { + ceph_assert(state == state_t::CONNECTING); + } + + trigger_state(state_t::CLOSING, write_state_t::drop, false); +} + +} // namespace ceph::net diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h new file mode 100644 index 00000000000..5fbb02cd663 --- /dev/null +++ b/src/crimson/net/ProtocolV2.h @@ -0,0 +1,120 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "Protocol.h" +#include "msg/async/frames_v2.h" + +namespace ceph::net { + +class ProtocolV2 final : public Protocol { + public: + ProtocolV2(Dispatcher& dispatcher, + SocketConnection& conn, + SocketMessenger& messenger); + ~ProtocolV2() override; + + private: + void start_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) override; + + void start_accept(SocketFRef&& socket, + const entity_addr_t& peer_addr) override; + + void trigger_close() override; + + seastar::future<> write_message(MessageRef msg) override; + + seastar::future<> do_keepalive() override; + + seastar::future<> do_keepalive_ack() override; + + private: + SocketMessenger &messenger; + + enum class state_t { + NONE = 0, + ACCEPTING, + CONNECTING, + READY, + STANDBY, + WAIT, // ? CLIENT_WAIT + SERVER_WAIT, // ? + REPLACING, // ? + CLOSING + }; + state_t state = state_t::NONE; + + static const char *get_state_name(state_t state) { + const char *const statenames[] = {"NONE", + "ACCEPTING", + "CONNECTING", + "READY", + "STANDBY", + "WAIT", // ? CLIENT_WAIT + "SERVER_WAIT", // ? + "REPLACING", // ? + "CLOSING"}; + return statenames[static_cast(state)]; + } + + void trigger_state(state_t state, write_state_t write_state, bool reentrant); + + // TODO: the rest of protocol data structures and variables. + + private: + seastar::future<> fault(); + seastar::future<> banner_exchange(); + + // CONNECTING (client) + seastar::future<> handle_auth_reply(); + inline seastar::future<> client_auth() { + std::vector empty; + return client_auth(empty); + } + seastar::future<> client_auth(std::vector &allowed_methods); + + seastar::future process_wait(); + seastar::future client_connect(); + seastar::future client_reconnect(); + void execute_connecting(); + + // ACCEPTING (server) + seastar::future<> _auth_bad_method(int r); + seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more); + seastar::future<> server_auth(); + + seastar::future send_wait(); + + seastar::future handle_existing_connection(SocketConnectionRef existing); + seastar::future server_connect(); + + seastar::future read_reconnect(); + seastar::future send_retry(uint64_t connect_seq); + seastar::future send_retry_global(uint64_t global_seq); + seastar::future send_reset(bool full); + seastar::future server_reconnect(); + + void execute_accepting(); + + // ACCEPTING/REPLACING (server) + seastar::future<> send_server_ident(); + + // REPLACING (server) + seastar::future<> send_reconnect_ok(); + + // READY + void execute_ready(); + + // STANDBY + void execute_standby(); + + // WAIT + void execute_wait(); + + // SERVER_WAIT + void execute_server_wait(); +}; + +} // namespace ceph::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 6b0e0a5ce30..dc35c66a05e 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -16,6 +16,7 @@ #include "Config.h" #include "ProtocolV1.h" +#include "ProtocolV2.h" #include "SocketMessenger.h" using namespace ceph::net; @@ -33,8 +34,7 @@ SocketConnection::SocketConnection(SocketMessenger& messenger, { ceph_assert(&messenger.container().local() == &messenger); if (is_msgr2) { - // TODO: ProtocolV2 - ceph_assert(false); + protocol = std::make_unique(dispatcher, *this, messenger); } else { protocol = std::make_unique(dispatcher, *this, messenger); } diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index a4cf67a0aca..22753500e4e 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -117,6 +117,7 @@ class SocketConnection : public Connection { friend class Protocol; friend class ProtocolV1; + friend class ProtocolV2; }; } // namespace ceph::net -- 2.39.5