From ce95af1cb6b3272a99889657c127091f8b946d4d Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 6 Mar 2019 22:59:37 +0800 Subject: [PATCH] crimson/net: banner exchange and HelloFrame Signed-off-by: Yingxin Cheng --- src/crimson/net/ProtocolV2.cc | 133 +++++++++++++++++++++++++++++++--- src/crimson/net/ProtocolV2.h | 6 +- 2 files changed, 128 insertions(+), 11 deletions(-) diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 21415e16ab50d..94919fc8cabb7 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -309,15 +309,110 @@ seastar::future<> ProtocolV2::fault() return seastar::now(); } -seastar::future<> ProtocolV2::banner_exchange() -{ - // 1. - // 2. then: - // 3. then: - // 4. then: - // 5. then: - // 6. then: - return seastar::now(); +void ProtocolV2::dispatch_reset() +{ + seastar::with_gate(pending_dispatch, [this] { + return dispatcher.ms_handle_reset( + seastar::static_pointer_cast(conn.shared_from_this())) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_handle_reset caust exception: {}", conn, eptr); + }); + }); +} + +seastar::future ProtocolV2::banner_exchange() +{ + // 1. prepare and send banner + bufferlist banner_payload; + encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0); + encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0); + + bufferlist bl; + bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX)); + encode((uint16_t)banner_payload.length(), bl, 0); + bl.claim_append(banner_payload); + return write_flush(std::move(bl)) + .then([this] { + // 2. read peer banner + unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16); + return read_exactly(banner_len); // or read exactly? + }).then([this] (auto bl) { + // 3. process peer banner and read banner_payload + unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); + + if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) { + if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) { + logger().error("{} peer is using V1 protocol", conn); + } else { + logger().error("{} peer sent bad banner", conn); + } + abort_in_fault(); + } + bl.trim_front(banner_prefix_len); + + uint16_t payload_len; + bufferlist buf; + buf.append(buffer::create(std::move(bl))); + auto ti = buf.cbegin(); + try { + decode(payload_len, ti); + } catch (const buffer::error &e) { + logger().error("{} decode banner payload len failed", conn); + abort_in_fault(); + } + return read(payload_len); + }).then([this] (bufferlist bl) { + // 4. process peer banner_payload and send HelloFrame + auto p = bl.cbegin(); + uint64_t peer_supported_features; + uint64_t peer_required_features; + try { + decode(peer_supported_features, p); + decode(peer_required_features, p); + } catch (const buffer::error &e) { + logger().error("{} decode banner payload failed", conn); + abort_in_fault(); + } + logger().debug("{} supported={} required={}", + conn, peer_supported_features, peer_required_features); + + // Check feature bit compatibility + uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES; + uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES; + if ((required_features & peer_supported_features) != required_features) { + logger().error("{} peer does not support all required features" + " required={} peer_supported={}", + conn, required_features, peer_supported_features); + abort_in_close(); + } + if ((supported_features & peer_required_features) != peer_required_features) { + logger().error("{} we do not support all peer required features" + " peer_required={} supported={}", + conn, peer_required_features, supported_features); + abort_in_close(); + } + this->peer_required_features = peer_required_features; + if (this->peer_required_features == 0) { + this->connection_features = msgr2_required; + } + + auto hello = HelloFrame::Encode(messenger.get_mytype(), + conn.target_addr); + return write_frame(hello); + }).then([this] { + //5. read peer HelloFrame + return read_main_preamble(); + }).then([this] (Tag tag) { + expect_tag(Tag::HELLO, tag, conn, __func__); + return read_frame_payload(); + }).then([this] { + // 6. process peer HelloFrame + auto hello = HelloFrame::Decode(rx_segments_data.back()); + logger().debug("{} received hello: peer_type={} peer_addr_for_me={}", + conn, (int)hello.entity_type(), hello.peer_addr()); + return seastar::make_ready_future( + hello.entity_type(), hello.peer_addr()); + }); } // CONNECTING state @@ -429,6 +524,14 @@ void ProtocolV2::execute_connecting() return seastar::now(); }).then([this] { return banner_exchange(); + }).then([this] (entity_type_t _peer_type, + entity_addr_t _peer_addr) { + if (conn.peer_type != _peer_type) { + logger().debug("{} connection peer type does not match what peer advertises {} != {}", + conn, conn.peer_type, (int)_peer_type); + dispatch_reset(); + abort_in_close(); + } }).then([this] { return client_auth(); }).then([this] { @@ -585,7 +688,17 @@ void ProtocolV2::execute_accepting() trigger_state(state_t::ACCEPTING, write_state_t::none, false); seastar::with_gate(pending_dispatch, [this] { return banner_exchange() - .then([this] { + .then([this] (entity_type_t _peer_type, + entity_addr_t _peer_addr) { + ceph_assert(conn.get_peer_type() == -1); + conn.peer_type = _peer_type; + + // TODO: lossless policy + conn.policy = SocketPolicy::stateless_server(0); + logger().debug("{} accept of host type {}, lossy={} server={} standby={} resetcheck={}", + conn, (int)_peer_type, + conn.policy.lossy, conn.policy.server, + conn.policy.standby, conn.policy.resetcheck); return server_auth(); }).then([this] { // return read_main_preamble() diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index a57d266870600..423fa25794e7f 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -62,6 +62,9 @@ class ProtocolV2 final : public Protocol { void trigger_state(state_t state, write_state_t write_state, bool reentrant); + uint64_t connection_features = 0; + uint64_t peer_required_features = 0; + uint64_t global_seq = 0; // TODO: Frame related implementations, probably to a separate class. @@ -90,7 +93,8 @@ class ProtocolV2 final : public Protocol { private: seastar::future<> fault(); - seastar::future<> banner_exchange(); + void dispatch_reset(); + seastar::future banner_exchange(); // CONNECTING (client) seastar::future<> handle_auth_reply(); -- 2.39.5