From 7f46b984be79e28a6bad96ee33ae085a17d3eee0 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Wed, 24 Feb 2021 14:31:41 +0000 Subject: [PATCH] crimson/net: drop the implementation of ProtocolV1. Signed-off-by: Radoslaw Zarzynski --- src/crimson/CMakeLists.txt | 1 - src/crimson/net/ProtocolV1.cc | 1014 --------------------------------- src/crimson/net/ProtocolV1.h | 137 ----- 3 files changed, 1152 deletions(-) delete mode 100644 src/crimson/net/ProtocolV1.cc delete mode 100644 src/crimson/net/ProtocolV1.h diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 07bec8a990da6..1c1f897e7596f 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -174,7 +174,6 @@ set(crimson_net_srcs net/SocketMessenger.cc net/Socket.cc net/Protocol.cc - net/ProtocolV1.cc net/ProtocolV2.cc net/chained_dispatchers.cc) add_library(crimson STATIC diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc deleted file mode 100644 index 3c604240d5834..0000000000000 --- a/src/crimson/net/ProtocolV1.cc +++ /dev/null @@ -1,1014 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "ProtocolV1.h" - -#include -#include -#include - -#include "include/msgr.h" -#include "include/random.h" -#include "auth/Auth.h" -#include "auth/AuthSessionHandler.h" - -#include "crimson/auth/AuthClient.h" -#include "crimson/auth/AuthServer.h" -#include "crimson/common/log.h" -#include "chained_dispatchers.h" -#include "Errors.h" -#include "Socket.h" -#include "SocketConnection.h" -#include "SocketMessenger.h" - -WRITE_RAW_ENCODER(ceph_msg_connect); -WRITE_RAW_ENCODER(ceph_msg_connect_reply); - -using crimson::common::local_conf; - -std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c) -{ - return out << "connect{features=" << std::hex << c.features << std::dec - << " host_type=" << c.host_type - << " global_seq=" << c.global_seq - << " connect_seq=" << c.connect_seq - << " protocol_version=" << c.protocol_version - << " authorizer_protocol=" << c.authorizer_protocol - << " authorizer_len=" << c.authorizer_len - << " flags=" << std::hex << static_cast(c.flags) << std::dec << '}'; -} - -std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r) -{ - return out << "connect_reply{tag=" << static_cast(r.tag) - << " features=" << std::hex << r.features << std::dec - << " global_seq=" << r.global_seq - << " connect_seq=" << r.connect_seq - << " protocol_version=" << r.protocol_version - << " authorizer_len=" << r.authorizer_len - << " flags=" << std::hex << static_cast(r.flags) << std::dec << '}'; -} - -namespace { - -seastar::logger& logger() { - return crimson::get_logger(ceph_subsys_ms); -} - -template -seastar::net::packet make_static_packet(const T& value) { - return { reinterpret_cast(&value), sizeof(value) }; -} - -// store the banner in a non-const string for buffer::create_static() -char banner[] = CEPH_BANNER; -constexpr size_t banner_size = sizeof(CEPH_BANNER)-1; - -constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr); -constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr); - -// check that the buffer starts with a valid banner without requiring it to -// be contiguous in memory -void validate_banner(bufferlist::const_iterator& p) -{ - auto b = std::cbegin(banner); - auto end = b + banner_size; - while (b != end) { - const char *buf{nullptr}; - auto remaining = std::distance(b, end); - auto len = p.get_ptr_and_advance(remaining, &buf); - if (!std::equal(buf, buf + len, b)) { - throw std::system_error( - make_error_code(crimson::net::error::bad_connect_banner)); - } - b += len; - } -} - -// return a static bufferptr to the given object -template -bufferptr create_static(T& obj) -{ - return buffer::create_static(sizeof(obj), reinterpret_cast(&obj)); -} - -uint32_t get_proto_version(entity_type_t peer_type, bool connect) -{ - constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD; - // see also OSD.h, unlike other connection of simple/async messenger, - // crimson msgr is only used by osd - constexpr uint32_t CEPH_OSD_PROTOCOL = 10; - if (peer_type == my_type) { - // internal - return CEPH_OSD_PROTOCOL; - } else { - // public - switch (connect ? peer_type : my_type) { - case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; - case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; - case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; - default: return 0; - } - } -} - -void discard_up_to(std::deque* queue, - crimson::net::seq_num_t seq) -{ - while (!queue->empty() && - queue->front()->get_seq() < seq) { - queue->pop_front(); - } -} - -} // namespace anonymous - -namespace crimson::net { - -ProtocolV1::ProtocolV1(ChainedDispatchers& dispatchers, - SocketConnection& conn, - SocketMessenger& messenger) - : Protocol(proto_t::v1, dispatchers, conn), messenger{messenger} {} - -ProtocolV1::~ProtocolV1() {} - -bool ProtocolV1::is_connected() const -{ - return state == state_t::open; -} - -// connecting state - -void ProtocolV1::reset_session() -{ - conn.out_q = {}; - conn.sent = {}; - conn.in_seq = 0; - h.connect_seq = 0; - if (HAVE_FEATURE(conn.features, MSG_AUTH)) { - // Set out_seq to a random value, so CRC won't be predictable. - // Constant to limit starting sequence number to 2^31. Nothing special - // about it, just a big number. - constexpr uint64_t SEQ_MASK = 0x7fffffff; - conn.out_seq = ceph::util::generate_random_number(0, SEQ_MASK); - } else { - // previously, seq #'s always started at 0. - conn.out_seq = 0; - } -} - -seastar::future -ProtocolV1::handle_connect_reply(msgr_tag_t tag) -{ - if (h.auth_payload.length() && !conn.peer_is_mon()) { - if (tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { // more - h.auth_more = messenger.get_auth_client()->handle_auth_reply_more( - conn.shared_from_this(), auth_meta, h.auth_payload); - return seastar::make_ready_future(stop_t::no); - } else { - int ret = messenger.get_auth_client()->handle_auth_done( - conn.shared_from_this(), auth_meta, 0, 0, h.auth_payload); - if (ret < 0) { - // fault - logger().warn("{} AuthClient::handle_auth_done() return {}", conn, ret); - throw std::system_error(make_error_code(error::negotiation_failure)); - } - } - } - - switch (tag) { - case CEPH_MSGR_TAG_FEATURES: - logger().error("{} connect protocol feature mispatch", __func__); - throw std::system_error(make_error_code(error::negotiation_failure)); - case CEPH_MSGR_TAG_BADPROTOVER: - logger().error("{} connect protocol version mispatch", __func__); - throw std::system_error(make_error_code(error::negotiation_failure)); - case CEPH_MSGR_TAG_BADAUTHORIZER: - logger().error("{} got bad authorizer", __func__); - throw std::system_error(make_error_code(error::negotiation_failure)); - case CEPH_MSGR_TAG_RESETSESSION: - reset_session(); - return seastar::make_ready_future(stop_t::no); - case CEPH_MSGR_TAG_RETRY_GLOBAL: - return messenger.get_global_seq(h.reply.global_seq).then([this] (auto gs) { - h.global_seq = gs; - return seastar::make_ready_future(stop_t::no); - }); - case CEPH_MSGR_TAG_RETRY_SESSION: - ceph_assert(h.reply.connect_seq > h.connect_seq); - h.connect_seq = h.reply.connect_seq; - return seastar::make_ready_future(stop_t::no); - case CEPH_MSGR_TAG_WAIT: - // TODO: state wait - throw std::system_error(make_error_code(error::negotiation_failure)); - case CEPH_MSGR_TAG_SEQ: - case CEPH_MSGR_TAG_READY: - if (auto missing = (conn.policy.features_required & ~(uint64_t)h.reply.features); - missing) { - logger().error("{} missing required features", __func__); - throw std::system_error(make_error_code(error::negotiation_failure)); - } - return seastar::futurize_invoke([this, tag] { - if (tag == CEPH_MSGR_TAG_SEQ) { - return socket->read_exactly(sizeof(seq_num_t)) - .then([this] (auto buf) { - auto acked_seq = reinterpret_cast(buf.get()); - discard_up_to(&conn.out_q, *acked_seq); - return socket->write_flush(make_static_packet(conn.in_seq)); - }); - } - // tag CEPH_MSGR_TAG_READY - return seastar::now(); - }).then([this] { - // hooray! - h.peer_global_seq = h.reply.global_seq; - conn.policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY; - h.connect_seq++; - h.backoff = 0ms; - conn.set_features(h.reply.features & h.connect.features); - if (auth_meta->authorizer) { - session_security.reset( - get_auth_session_handler(nullptr, - auth_meta->authorizer->protocol, - auth_meta->session_key, - conn.features)); - } else { - session_security.reset(); - } - return seastar::make_ready_future(stop_t::yes); - }); - break; - default: - // unknown tag - logger().error("{} got unknown tag", __func__, int(tag)); - throw std::system_error(make_error_code(error::negotiation_failure)); - } -} - -ceph::bufferlist ProtocolV1::get_auth_payload() -{ - // only non-mons connectings to mons use MAuth messages - if (conn.peer_is_mon() && - messenger.get_mytype() != CEPH_ENTITY_TYPE_MON) { - return {}; - } else { - if (h.auth_more.length()) { - logger().info("using augmented (challenge) auth payload"); - return std::move(h.auth_more); - } else { - auto [auth_method, preferred_modes, auth_bl] = - messenger.get_auth_client()->get_auth_request( - conn.shared_from_this(), auth_meta); - auth_meta->auth_method = auth_method; - return auth_bl; - } - } -} - -seastar::future -ProtocolV1::repeat_connect() -{ - // encode ceph_msg_connect - memset(&h.connect, 0, sizeof(h.connect)); - h.connect.features = conn.policy.features_supported; - h.connect.host_type = messenger.get_myname().type(); - h.connect.global_seq = h.global_seq; - h.connect.connect_seq = h.connect_seq; - h.connect.protocol_version = get_proto_version(conn.get_peer_type(), true); - // this is fyi, actually, server decides! - h.connect.flags = conn.policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; - - ceph_assert(messenger.get_auth_client()); - - bufferlist bl; - bufferlist auth_bl = get_auth_payload(); - if (auth_bl.length()) { - h.connect.authorizer_protocol = auth_meta->auth_method; - h.connect.authorizer_len = auth_bl.length(); - bl.append(create_static(h.connect)); - bl.claim_append(auth_bl); - } else { - h.connect.authorizer_protocol = 0; - h.connect.authorizer_len = 0; - bl.append(create_static(h.connect)); - }; - return socket->write_flush(std::move(bl)) - .then([this] { - // read the reply - return socket->read(sizeof(h.reply)); - }).then([this] (bufferlist bl) { - auto p = bl.cbegin(); - ::decode(h.reply, p); - ceph_assert(p.end()); - return socket->read(h.reply.authorizer_len); - }).then([this] (bufferlist bl) { - h.auth_payload = std::move(bl); - return handle_connect_reply(h.reply.tag); - }); -} - -void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, - const entity_name_t& _peer_name) -{ - ceph_assert(state == state_t::none); - logger().trace("{} trigger connecting, was {}", conn, static_cast(state)); - state = state_t::connecting; - set_write_state(write_state_t::delay); - - ceph_assert(!socket); - ceph_assert(!gate.is_closed()); - conn.peer_addr = _peer_addr; - conn.target_addr = _peer_addr; - conn.set_peer_name(_peer_name); - conn.policy = messenger.get_policy(_peer_name.type()); - messenger.register_conn( - seastar::static_pointer_cast(conn.shared_from_this())); - gate.dispatch_in_background("start_connect", *this, [this] { - return Socket::connect(conn.peer_addr) - .then([this](SocketRef sock) { - socket = std::move(sock); - if (state != state_t::connecting) { - assert(state == state_t::closing); - return socket->close().then([] { - throw std::system_error(make_error_code(error::protocol_aborted)); - }); - } - return seastar::now(); - }).then([this] { - return messenger.get_global_seq(); - }).then([this] (auto gs) { - h.global_seq = gs; - // read server's handshake header - return socket->read(server_header_size); - }).then([this] (bufferlist headerbl) { - auto p = headerbl.cbegin(); - validate_banner(p); - entity_addr_t saddr, caddr; - ::decode(saddr, p); - ::decode(caddr, p); - ceph_assert(p.end()); - if (saddr != conn.peer_addr) { - logger().error("{} my peer_addr {} doesn't match what peer advertized {}", - conn, conn.peer_addr, saddr); - throw std::system_error( - make_error_code(crimson::net::error::bad_peer_address)); - } - if (state != state_t::connecting) { - assert(state == state_t::closing); - throw std::system_error(make_error_code(error::protocol_aborted)); - } - socket->learn_ephemeral_port_as_connector(caddr.get_port()); - if (unlikely(caddr.is_msgr2())) { - logger().warn("{} peer sent a v2 address for me: {}", - conn, caddr); - throw std::system_error( - make_error_code(crimson::net::error::bad_peer_address)); - } - caddr.set_type(entity_addr_t::TYPE_LEGACY); - return messenger.learned_addr(caddr, conn); - }).then([this] { - // encode/send client's handshake header - bufferlist bl; - bl.append(buffer::create_static(banner_size, banner)); - ::encode(messenger.get_myaddr(), bl, 0); - return socket->write_flush(std::move(bl)); - }).then([=] { - return seastar::repeat([this] { - return repeat_connect(); - }); - }).then([this] { - if (state != state_t::connecting) { - assert(state == state_t::closing); - throw std::system_error(make_error_code(error::protocol_aborted)); - } - execute_open(open_t::connected); - }).handle_exception([this] (std::exception_ptr eptr) { - // TODO: handle fault in the connecting state - logger().warn("{} connecting fault: {}", conn, eptr); - close(true); - }); - }); -} - -// accepting state - -seastar::future ProtocolV1::send_connect_reply( - msgr_tag_t tag, bufferlist&& authorizer_reply) -{ - h.reply.tag = tag; - h.reply.features = static_cast((h.connect.features & - conn.policy.features_supported) | - conn.policy.features_required); - h.reply.authorizer_len = authorizer_reply.length(); - return socket->write(make_static_packet(h.reply)) - .then([this, reply=std::move(authorizer_reply)]() mutable { - return socket->write_flush(std::move(reply)); - }).then([] { - return stop_t::no; - }); -} - -seastar::future ProtocolV1::send_connect_reply_ready( - msgr_tag_t tag, bufferlist&& authorizer_reply) -{ - return messenger.get_global_seq( - ).then([this, tag, auth_len = authorizer_reply.length()] (auto gs) { - h.global_seq = gs; - h.reply.tag = tag; - h.reply.features = conn.policy.features_supported; - h.reply.global_seq = h.global_seq; - h.reply.connect_seq = h.connect_seq; - h.reply.flags = 0; - if (conn.policy.lossy) { - h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; - } - h.reply.authorizer_len = auth_len; - - session_security.reset( - get_auth_session_handler(nullptr, - auth_meta->auth_method, - auth_meta->session_key, - conn.features)); - - return socket->write(make_static_packet(h.reply)); - }).then([this, reply=std::move(authorizer_reply)]() mutable { - if (reply.length()) { - return socket->write(std::move(reply)); - } else { - return seastar::now(); - } - }).then([this] { - if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { - return socket->write_flush(make_static_packet(conn.in_seq)) - .then([this] { - return socket->read_exactly(sizeof(seq_num_t)); - }).then([this] (auto buf) { - auto acked_seq = reinterpret_cast(buf.get()); - discard_up_to(&conn.out_q, *acked_seq); - }); - } else { - return socket->flush(); - } - }).then([] { - return stop_t::yes; - }); -} - -seastar::future ProtocolV1::replace_existing( - SocketConnectionRef existing, - bufferlist&& authorizer_reply, - bool is_reset_from_peer) -{ - msgr_tag_t reply_tag; - if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) && - !is_reset_from_peer) { - reply_tag = CEPH_MSGR_TAG_SEQ; - } else { - reply_tag = CEPH_MSGR_TAG_READY; - } - if (!existing->is_lossy()) { - // XXX: we decided not to support lossless connection in v1. as the - // client's default policy is - // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is - // lossy. And by the time - // will all be performed using v2 protocol. - ceph_abort("lossless policy not supported for v1"); - } - existing->protocol->close(true); - return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); -} - -seastar::future ProtocolV1::handle_connect_with_existing( - SocketConnectionRef existing, bufferlist&& authorizer_reply) -{ - ProtocolV1 *exproto = dynamic_cast(existing->protocol.get()); - - if (h.connect.global_seq < exproto->peer_global_seq()) { - h.reply.global_seq = exproto->peer_global_seq(); - return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL); - } else if (existing->is_lossy()) { - return replace_existing(existing, std::move(authorizer_reply)); - } else if (h.connect.connect_seq == 0 && exproto->connect_seq() > 0) { - return replace_existing(existing, std::move(authorizer_reply), true); - } else if (h.connect.connect_seq < exproto->connect_seq()) { - // old attempt, or we sent READY but they didn't get it. - h.reply.connect_seq = exproto->connect_seq() + 1; - return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); - } else if (h.connect.connect_seq == exproto->connect_seq()) { - // if the existing connection successfully opened, and/or - // subsequently went to standby, then the peer should bump - // their connect_seq and retry: this is not a connection race - // we need to resolve here. - if (exproto->get_state() == state_t::open || - exproto->get_state() == state_t::standby) { - if (conn.policy.resetcheck && exproto->connect_seq() == 0) { - return replace_existing(existing, std::move(authorizer_reply)); - } else { - h.reply.connect_seq = exproto->connect_seq() + 1; - return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); - } - } else if (existing->peer_wins()) { - return replace_existing(existing, std::move(authorizer_reply)); - } else { - return send_connect_reply(CEPH_MSGR_TAG_WAIT); - } - } else if (conn.policy.resetcheck && - exproto->connect_seq() == 0) { - return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION); - } else { - return replace_existing(existing, std::move(authorizer_reply)); - } -} - -bool ProtocolV1::require_auth_feature() const -{ - if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { - return false; - } - if (local_conf()->cephx_require_signatures) { - return true; - } - if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || - h.connect.host_type == CEPH_ENTITY_TYPE_MDS || - h.connect.host_type == CEPH_ENTITY_TYPE_MGR) { - return local_conf()->cephx_cluster_require_signatures; - } else { - return local_conf()->cephx_service_require_signatures; - } -} - -bool ProtocolV1::require_cephx_v2_feature() const -{ - if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { - return false; - } - if (local_conf()->cephx_require_version >= 2) { - return true; - } - if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || - h.connect.host_type == CEPH_ENTITY_TYPE_MDS || - h.connect.host_type == CEPH_ENTITY_TYPE_MGR) { - return local_conf()->cephx_cluster_require_version >= 2; - } else { - return local_conf()->cephx_service_require_version >= 2; - } -} - -seastar::future ProtocolV1::repeat_handle_connect() -{ - return socket->read(sizeof(h.connect)) - .then([this](bufferlist bl) { - auto p = bl.cbegin(); - ::decode(h.connect, p); - if (conn.get_peer_type() != 0 && - conn.get_peer_type() != h.connect.host_type) { - logger().error("{} repeat_handle_connect(): my peer type does not match" - " what peer advertises {} != {}", - conn, conn.get_peer_type(), h.connect.host_type); - throw std::system_error(make_error_code(error::protocol_aborted)); - } - conn.set_peer_type(h.connect.host_type); - conn.policy = messenger.get_policy(h.connect.host_type); - if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) { - logger().error("{} we don't know how to reconnect to peer {}", - conn, conn.target_addr); - throw std::system_error( - make_error_code(crimson::net::error::bad_peer_address)); - } - return socket->read(h.connect.authorizer_len); - }).then([this] (bufferlist authorizer) { - memset(&h.reply, 0, sizeof(h.reply)); - // TODO: set reply.protocol_version - if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) { - return send_connect_reply( - CEPH_MSGR_TAG_BADPROTOVER, bufferlist{}); - } - if (require_auth_feature()) { - conn.policy.features_required |= CEPH_FEATURE_MSG_AUTH; - } - if (require_cephx_v2_feature()) { - conn.policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2; - } - if (auto feat_missing = conn.policy.features_required & ~(uint64_t)h.connect.features; - feat_missing != 0) { - return send_connect_reply( - CEPH_MSGR_TAG_FEATURES, bufferlist{}); - } - - bufferlist authorizer_reply; - auth_meta->auth_method = h.connect.authorizer_protocol; - if (!HAVE_FEATURE((uint64_t)h.connect.features, CEPHX_V2)) { - // peer doesn't support it and we won't get here if we require it - auth_meta->skip_authorizer_challenge = true; - } - auto more = static_cast(auth_meta->authorizer_challenge); - ceph_assert(messenger.get_auth_server()); - int r = messenger.get_auth_server()->handle_auth_request( - conn.shared_from_this(), auth_meta, more, auth_meta->auth_method, authorizer, - &authorizer_reply); - - if (r < 0) { - session_security.reset(); - return send_connect_reply( - CEPH_MSGR_TAG_BADAUTHORIZER, std::move(authorizer_reply)); - } else if (r == 0) { - ceph_assert(authorizer_reply.length()); - return send_connect_reply( - CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER, std::move(authorizer_reply)); - } - - // r > 0 - if (auto existing = messenger.lookup_conn(conn.peer_addr); existing) { - if (existing->protocol->proto_type != proto_t::v1) { - logger().warn("{} existing {} proto version is {} not 1, close existing", - conn, *existing, - static_cast(existing->protocol->proto_type)); - // NOTE: this is following async messenger logic, but we may miss the reset event. - existing->mark_down(); - } else { - return handle_connect_with_existing(existing, std::move(authorizer_reply)); - } - } - if (h.connect.connect_seq > 0) { - return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION, - std::move(authorizer_reply)); - } - h.connect_seq = h.connect.connect_seq + 1; - h.peer_global_seq = h.connect.global_seq; - conn.set_features((uint64_t)conn.policy.features_supported & (uint64_t)h.connect.features); - // TODO: cct - return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply)); - }); -} - -void ProtocolV1::start_accept(SocketRef&& sock, - const entity_addr_t& _peer_addr) -{ - ceph_assert(state == state_t::none); - logger().trace("{} trigger accepting, was {}", - conn, static_cast(state)); - state = state_t::accepting; - set_write_state(write_state_t::delay); - - ceph_assert(!socket); - // until we know better - conn.target_addr = _peer_addr; - socket = std::move(sock); - messenger.accept_conn( - seastar::static_pointer_cast(conn.shared_from_this())); - gate.dispatch_in_background("start_accept", *this, [this] { - // stop learning my_addr before sending it out, so it won't change - return messenger.learned_addr(messenger.get_myaddr(), conn).then([this] { - // encode/send server's handshake header - bufferlist bl; - bl.append(buffer::create_static(banner_size, banner)); - ::encode(messenger.get_myaddr(), bl, 0); - ::encode(conn.target_addr, bl, 0); - return socket->write_flush(std::move(bl)); - }).then([this] { - // read client's handshake header and connect request - return socket->read(client_header_size); - }).then([this] (bufferlist bl) { - auto p = bl.cbegin(); - validate_banner(p); - entity_addr_t addr; - ::decode(addr, p); - ceph_assert(p.end()); - if ((addr.is_legacy() || addr.is_any()) && - addr.is_same_host(conn.target_addr)) { - // good - } else { - logger().error("{} peer advertized an invalid peer_addr: {}," - " which should be v1 and the same host with {}.", - conn, addr, conn.peer_addr); - throw std::system_error( - make_error_code(crimson::net::error::bad_peer_address)); - } - conn.peer_addr = addr; - conn.target_addr = conn.peer_addr; - return seastar::repeat([this] { - return repeat_handle_connect(); - }); - }).then([this] { - if (state != state_t::accepting) { - assert(state == state_t::closing); - throw std::system_error(make_error_code(error::protocol_aborted)); - } - messenger.register_conn( - seastar::static_pointer_cast(conn.shared_from_this())); - messenger.unaccept_conn( - seastar::static_pointer_cast(conn.shared_from_this())); - execute_open(open_t::accepted); - }).handle_exception([this] (std::exception_ptr eptr) { - // TODO: handle fault in the accepting state - logger().warn("{} accepting fault: {}", conn, eptr); - close(false); - }); - }); -} - -// open state - -ceph::bufferlist ProtocolV1::do_sweep_messages( - const std::deque& msgs, - size_t num_msgs, - bool require_keepalive, - std::optional _keepalive_ack, - bool require_ack) -{ - static const size_t RESERVE_MSG_SIZE = sizeof(CEPH_MSGR_TAG_MSG) + - sizeof(ceph_msg_header) + - sizeof(ceph_msg_footer); - static const size_t RESERVE_MSG_SIZE_OLD = sizeof(CEPH_MSGR_TAG_MSG) + - sizeof(ceph_msg_header) + - sizeof(ceph_msg_footer_old); - - ceph::bufferlist bl; - if (likely(num_msgs)) { - if (HAVE_FEATURE(conn.features, MSG_AUTH)) { - bl.reserve(num_msgs * RESERVE_MSG_SIZE); - } else { - bl.reserve(num_msgs * RESERVE_MSG_SIZE_OLD); - } - } - - if (unlikely(require_keepalive)) { - k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( - ceph::coarse_real_clock::now()); - logger().trace("{} write keepalive2 {}", conn, k.req.stamp.tv_sec); - bl.append(create_static(k.req)); - } - - if (unlikely(_keepalive_ack.has_value())) { - logger().trace("{} write keepalive2 ack {}", conn, *_keepalive_ack); - k.ack.stamp = ceph_timespec(*_keepalive_ack); - bl.append(create_static(k.ack)); - } - - if (require_ack) { - // XXX: we decided not to support lossless connection in v1. as the - // client's default policy is - // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is - // lossy. And by the time of crimson-osd's GA, the in-cluster communication - // will all be performed using v2 protocol. - ceph_abort("lossless policy not supported for v1"); - } - - std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) { - ceph_assert(!msg->get_seq() && "message already has seq"); - msg->set_seq(++conn.out_seq); - auto& header = msg->get_header(); - header.src = messenger.get_myname(); - msg->encode(conn.features, messenger.get_crc_flags()); - if (session_security) { - session_security->sign_message(msg.get()); - } - logger().debug("{} --> #{} === {} ({})", - conn, msg->get_seq(), *msg, msg->get_type()); - bl.append(CEPH_MSGR_TAG_MSG); - bl.append((const char*)&header, sizeof(header)); - bl.append(msg->get_payload()); - bl.append(msg->get_middle()); - bl.append(msg->get_data()); - auto& footer = msg->get_footer(); - if (HAVE_FEATURE(conn.features, MSG_AUTH)) { - bl.append((const char*)&footer, sizeof(footer)); - } else { - ceph_msg_footer_old old_footer; - if (messenger.get_crc_flags() & MSG_CRC_HEADER) { - old_footer.front_crc = footer.front_crc; - old_footer.middle_crc = footer.middle_crc; - } else { - old_footer.front_crc = old_footer.middle_crc = 0; - } - if (messenger.get_crc_flags() & MSG_CRC_DATA) { - old_footer.data_crc = footer.data_crc; - } else { - old_footer.data_crc = 0; - } - old_footer.flags = footer.flags; - bl.append((const char*)&old_footer, sizeof(old_footer)); - } - }); - - return bl; -} - -seastar::future<> ProtocolV1::handle_keepalive2_ack() -{ - return socket->read_exactly(sizeof(ceph_timespec)) - .then([this] (auto buf) { - auto t = reinterpret_cast(buf.get()); - k.ack_stamp = *t; - logger().trace("{} got keepalive2 ack {}", conn, t->tv_sec); - }); -} - -seastar::future<> ProtocolV1::handle_keepalive2() -{ - return socket->read_exactly(sizeof(ceph_timespec)) - .then([this] (auto buf) { - utime_t ack{*reinterpret_cast(buf.get())}; - notify_keepalive_ack(ack); - }); -} - -seastar::future<> ProtocolV1::handle_ack() -{ - return socket->read_exactly(sizeof(ceph_le64)) - .then([this] (auto buf) { - auto seq = reinterpret_cast(buf.get()); - discard_up_to(&conn.sent, *seq); - }); -} - -seastar::future<> ProtocolV1::maybe_throttle() -{ - if (!conn.policy.throttler_bytes) { - return seastar::now(); - } - const auto to_read = (m.header.front_len + - m.header.middle_len + - m.header.data_len); - return conn.policy.throttler_bytes->get(to_read); -} - -seastar::future<> ProtocolV1::read_message() -{ - return socket->read(sizeof(m.header)) - .then([this] (bufferlist bl) { - // throttle the traffic, maybe - auto p = bl.cbegin(); - ::decode(m.header, p); - return maybe_throttle(); - }).then([this] { - // read front - return socket->read(m.header.front_len); - }).then([this] (bufferlist bl) { - m.front = std::move(bl); - // read middle - return socket->read(m.header.middle_len); - }).then([this] (bufferlist bl) { - m.middle = std::move(bl); - // read data - return socket->read(m.header.data_len); - }).then([this] (bufferlist bl) { - m.data = std::move(bl); - // read footer - return socket->read(sizeof(m.footer)); - }).then([this] (bufferlist bl) { - auto p = bl.cbegin(); - ::decode(m.footer, p); - auto conn_ref = seastar::static_pointer_cast( - conn.shared_from_this()); - auto msg = ::decode_message(nullptr, 0, m.header, m.footer, - m.front, m.middle, m.data, conn_ref); - if (unlikely(!msg)) { - logger().warn("{} decode message failed", conn); - throw std::system_error{make_error_code(error::corrupted_message)}; - } - constexpr bool add_ref = false; // Message starts with 1 ref - // TODO: change MessageRef with foreign_ptr - auto msg_ref = MessageRef{msg, add_ref}; - - if (session_security) { - if (unlikely(session_security->check_message_signature(msg))) { - logger().warn("{} message signature check failed", conn); - throw std::system_error{make_error_code(error::corrupted_message)}; - } - } - // TODO: set time stamps - msg->set_byte_throttler(conn.policy.throttler_bytes); - - if (unlikely(!conn.update_rx_seq(msg->get_seq()))) { - // skip this message - return seastar::now(); - } - - logger().debug("{} <== #{} === {} ({})", - conn, msg_ref->get_seq(), *msg_ref, msg_ref->get_type()); - // throttle the reading process by the returned future - return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); - }); -} - -seastar::future<> ProtocolV1::handle_tags() -{ - return seastar::keep_doing([this] { - // read the next tag - return socket->read_exactly(1) - .then([this] (auto buf) { - switch (buf[0]) { - case CEPH_MSGR_TAG_MSG: - return read_message(); - case CEPH_MSGR_TAG_ACK: - return handle_ack(); - case CEPH_MSGR_TAG_KEEPALIVE: - return seastar::now(); - case CEPH_MSGR_TAG_KEEPALIVE2: - return handle_keepalive2(); - case CEPH_MSGR_TAG_KEEPALIVE2_ACK: - return handle_keepalive2_ack(); - case CEPH_MSGR_TAG_CLOSE: - logger().info("{} got tag close", conn); - throw std::system_error(make_error_code(error::protocol_aborted)); - default: - logger().error("{} got unknown msgr tag {}", - conn, static_cast(buf[0])); - throw std::system_error(make_error_code(error::read_eof)); - } - }); - }); -} - -void ProtocolV1::execute_open(open_t type) -{ - logger().trace("{} trigger open, was {}", conn, static_cast(state)); - state = state_t::open; - set_write_state(write_state_t::open); - - if (type == open_t::connected) { - dispatchers.ms_handle_connect( - seastar::static_pointer_cast(conn.shared_from_this())); - } else { // type == open_t::accepted - dispatchers.ms_handle_accept( - seastar::static_pointer_cast(conn.shared_from_this())); - } - - gate.dispatch_in_background("execute_open", *this, [this] { - // start background processing of tags - return handle_tags() - .handle_exception_type([this] (const std::system_error& e) { - logger().warn("{} open fault: {}", conn, e); - if (e.code() == error::protocol_aborted || - e.code() == std::errc::connection_reset || - e.code() == error::read_eof) { - close(true); - return seastar::now(); - } else { - throw e; - } - }).handle_exception([this] (std::exception_ptr eptr) { - // TODO: handle fault in the open state - logger().warn("{} open fault: {}", conn, eptr); - close(true); - }); - }); -} - -// closing state - -void ProtocolV1::trigger_close() -{ - logger().trace("{} trigger closing, was {}", - conn, static_cast(state)); - messenger.closing_conn( - seastar::static_pointer_cast( - conn.shared_from_this())); - - if (state == state_t::accepting) { - messenger.unaccept_conn(seastar::static_pointer_cast( - conn.shared_from_this())); - } else if (state >= state_t::connecting && state < state_t::closing) { - messenger.unregister_conn(seastar::static_pointer_cast( - conn.shared_from_this())); - } else { - // cannot happen - ceph_assert(false); - } - - if (!socket) { - ceph_assert(state == state_t::connecting); - } - - state = state_t::closing; -} - -void ProtocolV1::on_closed() -{ - messenger.closed_conn( - seastar::static_pointer_cast( - conn.shared_from_this())); -} - -seastar::future<> ProtocolV1::fault() -{ - if (conn.policy.lossy) { - messenger.unregister_conn(seastar::static_pointer_cast( - conn.shared_from_this())); - } - // XXX: we decided not to support lossless connection in v1. as the - // client's default policy is - // Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX) which is - // lossy. And by the time of crimson-osd's GA, the in-cluster communication - // will all be performed using v2 protocol. - ceph_abort("lossless policy not supported for v1"); - return seastar::now(); -} - -void ProtocolV1::print(std::ostream& out) const -{ - out << conn; -} - -} // namespace crimson::net diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h deleted file mode 100644 index ed6df895415ff..0000000000000 --- a/src/crimson/net/ProtocolV1.h +++ /dev/null @@ -1,137 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#pragma once - -#include "Protocol.h" - -class AuthAuthorizer; -class AuthSessionHandler; - -namespace crimson::net { - -class ProtocolV1 final : public Protocol { - public: - ProtocolV1(ChainedDispatchers& dispatchers, - SocketConnection& conn, - SocketMessenger& messenger); - ~ProtocolV1() override; - void print(std::ostream&) const final; - private: - void on_closed() override; - bool is_connected() const override; - - void start_connect(const entity_addr_t& peer_addr, - const entity_name_t& peer_name) override; - - void start_accept(SocketRef&& socket, - const entity_addr_t& peer_addr) override; - - void trigger_close() override; - - ceph::bufferlist do_sweep_messages( - const std::deque& msgs, - size_t num_msgs, - bool require_keepalive, - std::optional keepalive_ack, - bool require_ack) override; - - private: - SocketMessenger &messenger; - - enum class state_t { - none, - accepting, - connecting, - open, - standby, - wait, - closing - }; - state_t state = state_t::none; - - // state for handshake - struct Handshake { - ceph_msg_connect connect; - ceph_msg_connect_reply reply; - ceph::bufferlist auth_payload; // auth(orizer) payload read off the wire - ceph::bufferlist auth_more; // connect-side auth retry (we added challenge) - std::chrono::milliseconds backoff; - uint32_t connect_seq = 0; - uint32_t peer_global_seq = 0; - uint32_t global_seq; - } h; - - std::unique_ptr session_security; - - // state for an incoming message - struct MessageReader { - ceph_msg_header header; - ceph_msg_footer footer; - bufferlist front; - bufferlist middle; - bufferlist data; - } m; - - struct Keepalive { - struct { - const char tag = CEPH_MSGR_TAG_KEEPALIVE2; - ceph_timespec stamp; - } __attribute__((packed)) req; - struct { - const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK; - ceph_timespec stamp; - } __attribute__((packed)) ack; - ceph_timespec ack_stamp; - } k; - - private: - // connecting - void reset_session(); - seastar::future handle_connect_reply(crimson::net::msgr_tag_t tag); - seastar::future repeat_connect(); - ceph::bufferlist get_auth_payload(); - - // accepting - seastar::future send_connect_reply( - msgr_tag_t tag, bufferlist&& authorizer_reply = {}); - seastar::future send_connect_reply_ready( - msgr_tag_t tag, bufferlist&& authorizer_reply); - seastar::future replace_existing( - SocketConnectionRef existing, - bufferlist&& authorizer_reply, - bool is_reset_from_peer = false); - seastar::future handle_connect_with_existing( - SocketConnectionRef existing, bufferlist&& authorizer_reply); - bool require_auth_feature() const; - bool require_cephx_v2_feature() const; - seastar::future repeat_handle_connect(); - - // open - seastar::future<> handle_keepalive2_ack(); - seastar::future<> handle_keepalive2(); - seastar::future<> handle_ack(); - seastar::future<> maybe_throttle(); - seastar::future<> read_message(); - seastar::future<> handle_tags(); - - enum class open_t { - connected, - accepted - }; - void execute_open(open_t type); - - // replacing - // the number of connections initiated in this session, increment when a - // new connection is established - uint32_t connect_seq() const { return h.connect_seq; } - // the client side should connect us with a gseq. it will be reset with - // the one of exsting connection if it's greater. - uint32_t peer_global_seq() const { return h.peer_global_seq; } - // current state of ProtocolV1 - state_t get_state() const { return state; } - - seastar::future<> fault(); -}; - -} // namespace crimson::net -- 2.39.5