From: Yingxin Cheng Date: Thu, 14 Feb 2019 07:49:34 +0000 (+0800) Subject: crimson/net: introduce protocol-level abstraction X-Git-Tag: v15.1.0~3027^2~30 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2ab8b741d302769d2f8f8466d81df3431b59c2b9;p=ceph.git crimson/net: introduce protocol-level abstraction Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 20c545dad178..cb82ba81c859 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -122,7 +122,9 @@ set(crimson_net_srcs net/Messenger.cc net/SocketConnection.cc net/SocketMessenger.cc - net/Socket.cc) + net/Socket.cc + net/Protocol.cc + net/ProtocolV1.cc) set(crimson_thread_srcs thread/ThreadPool.cc thread/Throttle.cc) diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 8a0a1c96f22c..8dbb3cabbc65 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -26,6 +26,7 @@ using auth_proto_t = int; namespace ceph::net { using msgr_tag_t = uint8_t; +using stop_t = seastar::stop_iteration; class Connection; using ConnectionRef = seastar::shared_ptr; diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc new file mode 100644 index 000000000000..193918b867f8 --- /dev/null +++ b/src/crimson/net/Protocol.cc @@ -0,0 +1,167 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Protocol.h" + +#include "crimson/common/log.h" +#include "Socket.h" +#include "SocketConnection.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); + } +} + +namespace ceph::net { + +Protocol::Protocol(int type, + Dispatcher& dispatcher, + SocketConnection& conn) + : proto_type(type), + dispatcher(dispatcher), + conn(conn) {} + +Protocol::~Protocol() +{ + ceph_assert(pending_dispatch.is_closed()); +} + +bool Protocol::is_connected() const +{ + return write_state == write_state_t::open; +} + +seastar::future<> Protocol::close() +{ + if (closed) { + // already closing + assert(close_ready.valid()); + return close_ready.get_future(); + } + + // unregister_conn() drops a reference, so hold another until completion + auto cleanup = [conn_ref = conn.shared_from_this(), this] { + logger().debug("{} closed!", conn); + }; + + trigger_close(); + + // close_ready become valid only after state is state_t::closing + assert(!close_ready.valid()); + + if (socket) { + close_ready = socket->close() + .then([this] { + return pending_dispatch.close(); + }).finally(std::move(cleanup)); + } else { + close_ready = pending_dispatch.close().finally(std::move(cleanup)); + } + + closed = true; + set_write_state(write_state_t::drop); + + return close_ready.get_future(); +} + +seastar::future<> Protocol::send(MessageRef msg) +{ + if (write_state != write_state_t::drop) { + conn.out_q.push(std::move(msg)); + write_event(); + } + return seastar::now(); +} + +seastar::future<> Protocol::keepalive() +{ + if (!need_keepalive) { + need_keepalive = true; + write_event(); + } + return seastar::now(); +} + +void Protocol::notify_keepalive_ack() +{ + if (!need_keepalive_ack) { + need_keepalive_ack = true; + write_event(); + } +} + +void Protocol::write_event() +{ + if (write_dispatching) { + // already dispatching + return; + } + write_dispatching = true; + switch (write_state) { + case write_state_t::open: + [[fallthrough]]; + case write_state_t::delay: + seastar::with_gate(pending_dispatch, [this] { + return seastar::repeat([this] { + switch (write_state) { + case write_state_t::open: + return seastar::futurize_apply([this] { + if (need_keepalive) { + return do_keepalive() + .then([this] { need_keepalive = false; }); + } + return seastar::now(); + }).then([this] { + if (need_keepalive_ack) { + return do_keepalive_ack() + .then([this] { need_keepalive_ack = false; }); + } + return seastar::now(); + }).then([this] { + if (!conn.out_q.empty()){ + MessageRef msg = conn.out_q.front(); + return write_message(msg) + .then([this, msg] { + if (msg == conn.out_q.front()) { + conn.out_q.pop(); + } + return stop_t::no; + }); + } else { + return socket->flush() + .then([this] { + if (!conn.out_q.empty()) { + return stop_t::no; + } else { + write_dispatching = false; + return stop_t::yes; + } + }); + } + }).handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} write_event fault: {}", conn, eptr); + close(); + return stop_t::no; + }); + case write_state_t::delay: + // delay dispatching writes until open + return state_changed.get_shared_future() + .then([] { return stop_t::no; }); + case write_state_t::drop: + write_dispatching = false; + return seastar::make_ready_future(stop_t::yes); + default: + ceph_assert(false); + } + }); + }); + return; + case write_state_t::drop: + write_dispatching = false; + default: + ceph_assert(false); + } +} + +} // namespace ceph::net diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h new file mode 100644 index 000000000000..6a0f76e88bcd --- /dev/null +++ b/src/crimson/net/Protocol.h @@ -0,0 +1,89 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "Fwd.h" +#include "SocketConnection.h" + +namespace ceph::net { + +class Protocol { + public: + Protocol(Protocol&&) = delete; + virtual ~Protocol(); + + bool is_connected() const; + + // Reentrant closing + seastar::future<> close(); + + seastar::future<> send(MessageRef msg); + + seastar::future<> keepalive(); + + virtual void start_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) = 0; + + virtual void start_accept(seastar::foreign_ptr>&& socket, + const entity_addr_t& peer_addr) = 0; + + protected: + Protocol(int type, + Dispatcher& dispatcher, + SocketConnection& conn); + + virtual void trigger_close() = 0; + + // encode/write a message + virtual seastar::future<> write_message(MessageRef msg) = 0; + + virtual seastar::future<> do_keepalive() = 0; + + virtual seastar::future<> do_keepalive_ack() = 0; + + public: + const int proto_type; + + protected: + Dispatcher &dispatcher; + SocketConnection &conn; + + seastar::foreign_ptr> socket; + seastar::gate pending_dispatch; + + // write_state is changed with state atomically, indicating the write + // behavior of the according state. + enum class write_state_t { + none, + delay, + open, + drop + }; + void set_write_state(const write_state_t& state) { + write_state = state; + state_changed.set_value(); + state_changed = seastar::shared_promise<>(); + } + + void notify_keepalive_ack(); + + private: + write_state_t write_state = write_state_t::none; + // wait until current state changed + seastar::shared_promise<> state_changed; + + bool closed = false; + // become valid only after closed == true + seastar::shared_future<> close_ready; + + bool need_keepalive = false; + bool need_keepalive_ack = false; + bool write_dispatching = false; + void write_event(); +}; + +} // namespace ceph::net diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc new file mode 100644 index 000000000000..cbfa4d28ee7c --- /dev/null +++ b/src/crimson/net/ProtocolV1.cc @@ -0,0 +1,851 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "ProtocolV1.h" + +#include +#include +#include + +#include "include/msgr.h" +#include "include/random.h" +#include "auth/Auth.h" +#include "auth/AuthSessionHandler.h" + +#include "crimson/common/log.h" +#include "Config.h" +#include "Dispatcher.h" +#include "Errors.h" +#include "Socket.h" +#include "SocketConnection.h" +#include "SocketMessenger.h" + +WRITE_RAW_ENCODER(ceph_msg_connect); +WRITE_RAW_ENCODER(ceph_msg_connect_reply); + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c) +{ + return out << "connect{features=" << std::hex << c.features << std::dec + << " host_type=" << c.host_type + << " global_seq=" << c.global_seq + << " connect_seq=" << c.connect_seq + << " protocol_version=" << c.protocol_version + << " authorizer_protocol=" << c.authorizer_protocol + << " authorizer_len=" << c.authorizer_len + << " flags=" << std::hex << static_cast(c.flags) << std::dec << '}'; +} + +std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r) +{ + return out << "connect_reply{tag=" << static_cast(r.tag) + << " features=" << std::hex << r.features << std::dec + << " global_seq=" << r.global_seq + << " connect_seq=" << r.connect_seq + << " protocol_version=" << r.protocol_version + << " authorizer_len=" << r.authorizer_len + << " flags=" << std::hex << static_cast(r.flags) << std::dec << '}'; +} + +namespace { + +seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); +} + +template +seastar::net::packet make_static_packet(const T& value) { + return { reinterpret_cast(&value), sizeof(value) }; +} + +// store the banner in a non-const string for buffer::create_static() +char banner[] = CEPH_BANNER; +constexpr size_t banner_size = sizeof(CEPH_BANNER)-1; + +constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr); +constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr); + +// check that the buffer starts with a valid banner without requiring it to +// be contiguous in memory +void validate_banner(bufferlist::const_iterator& p) +{ + auto b = std::cbegin(banner); + auto end = b + banner_size; + while (b != end) { + const char *buf{nullptr}; + auto remaining = std::distance(b, end); + auto len = p.get_ptr_and_advance(remaining, &buf); + if (!std::equal(buf, buf + len, b)) { + throw std::system_error( + make_error_code(ceph::net::error::bad_connect_banner)); + } + b += len; + } +} + +// make sure that we agree with the peer about its address +void validate_peer_addr(const entity_addr_t& addr, + const entity_addr_t& expected) +{ + if (addr == expected) { + return; + } + // ok if server bound anonymously, as long as port/nonce match + if (addr.is_blank_ip() && + addr.get_port() == expected.get_port() && + addr.get_nonce() == expected.get_nonce()) { + return; + } else { + throw std::system_error( + make_error_code(ceph::net::error::bad_peer_address)); + } +} + +// return a static bufferptr to the given object +template +bufferptr create_static(T& obj) +{ + return buffer::create_static(sizeof(obj), reinterpret_cast(&obj)); +} + +uint32_t get_proto_version(entity_type_t peer_type, bool connect) +{ + constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD; + // see also OSD.h, unlike other connection of simple/async messenger, + // crimson msgr is only used by osd + constexpr uint32_t CEPH_OSD_PROTOCOL = 10; + if (peer_type == my_type) { + // internal + return CEPH_OSD_PROTOCOL; + } else { + // public + switch (connect ? peer_type : my_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + default: return 0; + } + } +} + +void discard_up_to(std::queue* queue, + ceph::net::seq_num_t seq) +{ + while (!queue->empty() && + queue->front()->get_seq() < seq) { + queue->pop(); + } +} + +} // namespace anonymous + +namespace ceph::net { + +ProtocolV1::ProtocolV1(Dispatcher& dispatcher, + SocketConnection& conn, + SocketMessenger& messenger) + : Protocol(1, dispatcher, conn), messenger{messenger} {} + +ProtocolV1::~ProtocolV1() {} + +// connecting state + +void ProtocolV1::reset_session() +{ + conn.out_q = {}; + conn.sent = {}; + conn.in_seq = 0; + h.connect_seq = 0; + if (HAVE_FEATURE(conn.features, MSG_AUTH)) { + // Set out_seq to a random value, so CRC won't be predictable. + // Constant to limit starting sequence number to 2^31. Nothing special + // about it, just a big number. + constexpr uint64_t SEQ_MASK = 0x7fffffff; + conn.out_seq = ceph::util::generate_random_number(0, SEQ_MASK); + } else { + // previously, seq #'s always started at 0. + conn.out_seq = 0; + } +} + +seastar::future +ProtocolV1::handle_connect_reply(msgr_tag_t tag) +{ + switch (tag) { + case CEPH_MSGR_TAG_FEATURES: + logger().error("{} connect protocol feature mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_BADPROTOVER: + logger().error("{} connect protocol version mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_BADAUTHORIZER: + logger().error("{} got bad authorizer", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_RESETSESSION: + reset_session(); + return seastar::make_ready_future(stop_t::no); + case CEPH_MSGR_TAG_RETRY_GLOBAL: + h.global_seq = messenger.get_global_seq(h.reply.global_seq); + return seastar::make_ready_future(stop_t::no); + case CEPH_MSGR_TAG_RETRY_SESSION: + ceph_assert(h.reply.connect_seq > h.connect_seq); + h.connect_seq = h.reply.connect_seq; + return seastar::make_ready_future(stop_t::no); + case CEPH_MSGR_TAG_WAIT: + // TODO: state wait + throw std::system_error(make_error_code(error::negotiation_failure)); + case CEPH_MSGR_TAG_SEQ: + case CEPH_MSGR_TAG_READY: + if (auto missing = (conn.policy.features_required & ~(uint64_t)h.reply.features); + missing) { + logger().error("{} missing required features", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + } + return seastar::futurize_apply([this, tag] { + if (tag == CEPH_MSGR_TAG_SEQ) { + return socket->read_exactly(sizeof(seq_num_t)) + .then([this] (auto buf) { + auto acked_seq = reinterpret_cast(buf.get()); + discard_up_to(&conn.out_q, *acked_seq); + return socket->write_flush(make_static_packet(conn.in_seq)); + }); + } + // tag CEPH_MSGR_TAG_READY + return seastar::now(); + }).then([this] { + // hooray! + h.peer_global_seq = h.reply.global_seq; + conn.policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY; + h.connect_seq++; + h.backoff = 0ms; + conn.set_features(h.reply.features & h.connect.features); + if (h.authorizer) { + session_security.reset( + get_auth_session_handler(nullptr, + h.authorizer->protocol, + h.authorizer->session_key, + conn.features)); + } + h.authorizer = nullptr; + return seastar::make_ready_future(stop_t::yes); + }); + break; + default: + // unknown tag + logger().error("{} got unknown tag", __func__, int(tag)); + throw std::system_error(make_error_code(error::negotiation_failure)); + } +} + +seastar::future +ProtocolV1::repeat_connect() +{ + // encode ceph_msg_connect + memset(&h.connect, 0, sizeof(h.connect)); + h.connect.features = conn.policy.features_supported; + h.connect.host_type = messenger.get_myname().type(); + h.connect.global_seq = h.global_seq; + h.connect.connect_seq = h.connect_seq; + h.connect.protocol_version = get_proto_version(conn.peer_type, true); + // this is fyi, actually, server decides! + h.connect.flags = conn.policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; + + h.authorizer = dispatcher.ms_get_authorizer(conn.peer_type); + bufferlist bl; + if (h.authorizer) { + h.connect.authorizer_protocol = h.authorizer->protocol; + h.connect.authorizer_len = h.authorizer->bl.length(); + bl.append(create_static(h.connect)); + bl.append(h.authorizer->bl); + } else { + h.connect.authorizer_protocol = 0; + h.connect.authorizer_len = 0; + bl.append(create_static(h.connect)); + }; + return socket->write_flush(std::move(bl)) + .then([this] { + // read the reply + return socket->read(sizeof(h.reply)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.reply, p); + ceph_assert(p.end()); + return socket->read(h.reply.authorizer_len); + }).then([this] (bufferlist bl) { + if (h.authorizer) { + auto reply = bl.cbegin(); + if (!h.authorizer->verify_reply(reply, nullptr)) { + logger().error("{} authorizer failed to verify reply", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); + } + } + return handle_connect_reply(h.reply.tag); + }); +} + +void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, + const entity_type_t& _peer_type) +{ + ceph_assert(state == state_t::none); + logger().debug("{} trigger connecting, was {}", conn, static_cast(state)); + state = state_t::connecting; + set_write_state(write_state_t::delay); + + ceph_assert(!socket); + conn.peer_addr = _peer_addr; + conn.peer_type = _peer_type; + messenger.register_conn( + seastar::static_pointer_cast(conn.shared_from_this())); + seastar::with_gate(pending_dispatch, [this] { + return seastar::connect(conn.peer_addr.in4_addr()) + .then([this](seastar::connected_socket fd) { + if (state == state_t::closing) { + fd.shutdown_input(); + fd.shutdown_output(); + throw std::system_error(make_error_code(error::connection_aborted)); + } + socket = seastar::make_foreign(std::make_unique(std::move(fd))); + // read server's handshake header + return socket->read(server_header_size); + }).then([this] (bufferlist headerbl) { + auto p = headerbl.cbegin(); + validate_banner(p); + entity_addr_t saddr, caddr; + ::decode(saddr, p); + ::decode(caddr, p); + ceph_assert(p.end()); + validate_peer_addr(saddr, conn.peer_addr); + + conn.side = SocketConnection::side_t::connector; + conn.socket_port = caddr.get_port(); + return messenger.learned_addr(caddr); + }).then([this] { + // encode/send client's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(messenger.get_myaddr(), bl, 0); + h.global_seq = messenger.get_global_seq(); + return socket->write_flush(std::move(bl)); + }).then([=] { + return seastar::repeat([this] { + return repeat_connect(); + }); + }).then([this] { + // notify the dispatcher and allow them to reject the connection + return dispatcher.ms_handle_connect( + seastar::static_pointer_cast( + conn.shared_from_this())); + }).then([this] { + execute_open(); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the connecting state + logger().warn("{} connecting fault: {}", conn, eptr); + close(); + }); + }); +} + +// accepting state + +seastar::future ProtocolV1::send_connect_reply( + msgr_tag_t tag, bufferlist&& authorizer_reply) +{ + h.reply.tag = tag; + h.reply.features = static_cast((h.connect.features & + conn.policy.features_supported) | + conn.policy.features_required); + h.reply.authorizer_len = authorizer_reply.length(); + return socket->write(make_static_packet(h.reply)) + .then([this, reply=std::move(authorizer_reply)]() mutable { + return socket->write_flush(std::move(reply)); + }).then([] { + return stop_t::no; + }); +} + +seastar::future ProtocolV1::send_connect_reply_ready( + msgr_tag_t tag, bufferlist&& authorizer_reply) +{ + h.global_seq = messenger.get_global_seq(); + h.reply.tag = tag; + h.reply.features = conn.policy.features_supported; + h.reply.global_seq = h.global_seq; + h.reply.connect_seq = h.connect_seq; + h.reply.flags = 0; + if (conn.policy.lossy) { + h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; + } + h.reply.authorizer_len = authorizer_reply.length(); + return socket->write(make_static_packet(h.reply)) + .then([this, reply=std::move(authorizer_reply)]() mutable { + if (reply.length()) { + return socket->write(std::move(reply)); + } else { + return seastar::now(); + } + }).then([this] { + if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { + return socket->write_flush(make_static_packet(conn.in_seq)) + .then([this] { + return socket->read_exactly(sizeof(seq_num_t)); + }).then([this] (auto buf) { + auto acked_seq = reinterpret_cast(buf.get()); + discard_up_to(&conn.out_q, *acked_seq); + }); + } else { + return socket->flush(); + } + }).then([this] { + return stop_t::yes; + }); +} + +seastar::future ProtocolV1::replace_existing( + SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer) +{ + msgr_tag_t reply_tag; + if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) && + !is_reset_from_peer) { + reply_tag = CEPH_MSGR_TAG_SEQ; + } else { + reply_tag = CEPH_MSGR_TAG_READY; + } + messenger.unregister_conn(existing); + if (!existing->is_lossy()) { + // reset the in_seq if this is a hard reset from peer, + // otherwise we respect our original connection's value + conn.in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num(); + // steal outgoing queue and out_seq + existing->requeue_sent(); + std::tie(conn.out_seq, conn.out_q) = existing->get_out_queue(); + } + return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); +} + +seastar::future ProtocolV1::handle_connect_with_existing( + SocketConnectionRef existing, bufferlist&& authorizer_reply) +{ + ProtocolV1 *exproto = dynamic_cast(existing->protocol.get()); + + if (h.connect.global_seq < exproto->peer_global_seq()) { + h.reply.global_seq = exproto->peer_global_seq(); + return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL); + } else if (existing->is_lossy()) { + return replace_existing(existing, std::move(authorizer_reply)); + } else if (h.connect.connect_seq == 0 && exproto->connect_seq() > 0) { + return replace_existing(existing, std::move(authorizer_reply), true); + } else if (h.connect.connect_seq < exproto->connect_seq()) { + // old attempt, or we sent READY but they didn't get it. + h.reply.connect_seq = exproto->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } else if (h.connect.connect_seq == exproto->connect_seq()) { + // if the existing connection successfully opened, and/or + // subsequently went to standby, then the peer should bump + // their connect_seq and retry: this is not a connection race + // we need to resolve here. + if (exproto->get_state() == state_t::open || + exproto->get_state() == state_t::standby) { + if (conn.policy.resetcheck && exproto->connect_seq() == 0) { + return replace_existing(existing, std::move(authorizer_reply)); + } else { + h.reply.connect_seq = exproto->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } + } else if (conn.peer_addr < messenger.get_myaddr() || + existing->is_server_side()) { + // incoming wins + return replace_existing(existing, std::move(authorizer_reply)); + } else { + return send_connect_reply(CEPH_MSGR_TAG_WAIT); + } + } else if (conn.policy.resetcheck && + exproto->connect_seq() == 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION); + } else { + return replace_existing(existing, std::move(authorizer_reply)); + } +} + +bool ProtocolV1::require_auth_feature() const +{ + if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { + return false; + } + if (conf.cephx_require_signatures) { + return true; + } + if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || + h.connect.host_type == CEPH_ENTITY_TYPE_MDS) { + return conf.cephx_cluster_require_signatures; + } else { + return conf.cephx_service_require_signatures; + } +} + +seastar::future ProtocolV1::repeat_handle_connect() +{ + return socket->read(sizeof(h.connect)) + .then([this](bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.connect, p); + conn.peer_type = h.connect.host_type; + return socket->read(h.connect.authorizer_len); + }).then([this] (bufferlist authorizer) { + if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) { + return seastar::make_ready_future( + CEPH_MSGR_TAG_BADPROTOVER, bufferlist{}); + } + if (require_auth_feature()) { + conn.policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + if (auto feat_missing = conn.policy.features_required & ~(uint64_t)h.connect.features; + feat_missing != 0) { + return seastar::make_ready_future( + CEPH_MSGR_TAG_FEATURES, bufferlist{}); + } + return dispatcher.ms_verify_authorizer(conn.peer_type, + h.connect.authorizer_protocol, + authorizer); + }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) { + memset(&h.reply, 0, sizeof(h.reply)); + if (tag) { + return send_connect_reply(tag, std::move(authorizer_reply)); + } + if (auto existing = messenger.lookup_conn(conn.peer_addr); existing) { + if (existing->protocol->proto_type != 1) { + logger().warn("{} existing {} proto version is {} not 1, close existing", + conn, *existing, existing->protocol->proto_type); + existing->close(); + } else { + return handle_connect_with_existing(existing, std::move(authorizer_reply)); + } + } + if (h.connect.connect_seq > 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION, + std::move(authorizer_reply)); + } + h.connect_seq = h.connect.connect_seq + 1; + h.peer_global_seq = h.connect.global_seq; + conn.set_features((uint64_t)conn.policy.features_supported & (uint64_t)h.connect.features); + // TODO: cct + return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply)); + }); +} + +void ProtocolV1::start_accept(seastar::foreign_ptr>&& sock, + const entity_addr_t& _peer_addr) +{ + ceph_assert(state == state_t::none); + logger().debug("{} trigger accepting, was {}", + conn, static_cast(state)); + state = state_t::accepting; + set_write_state(write_state_t::delay); + + ceph_assert(!socket); + conn.peer_addr.u = _peer_addr.u; + conn.peer_addr.set_port(0); + conn.side = SocketConnection::side_t::acceptor; + conn.socket_port = _peer_addr.get_port(); + socket = std::move(sock); + messenger.accept_conn( + seastar::static_pointer_cast(conn.shared_from_this())); + seastar::with_gate(pending_dispatch, [this, _peer_addr] { + // encode/send server's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(messenger.get_myaddr(), bl, 0); + ::encode(_peer_addr, bl, 0); + return socket->write_flush(std::move(bl)) + .then([this] { + // read client's handshake header and connect request + return socket->read(client_header_size); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + validate_banner(p); + entity_addr_t addr; + ::decode(addr, p); + ceph_assert(p.end()); + conn.peer_addr.set_type(addr.get_type()); + conn.peer_addr.set_port(addr.get_port()); + conn.peer_addr.set_nonce(addr.get_nonce()); + return seastar::repeat([this] { + return repeat_handle_connect(); + }); + }).then([this] { + // notify the dispatcher and allow them to reject the connection + return dispatcher.ms_handle_accept( + seastar::static_pointer_cast(conn.shared_from_this())); + }).then([this] { + messenger.register_conn( + seastar::static_pointer_cast(conn.shared_from_this())); + messenger.unaccept_conn( + seastar::static_pointer_cast(conn.shared_from_this())); + execute_open(); + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the accepting state + logger().warn("{} accepting fault: {}", conn, eptr); + close(); + }); + }); +} + +// open state + +seastar::future<> ProtocolV1::write_message(MessageRef msg) +{ + msg->set_seq(++conn.out_seq); + auto& header = msg->get_header(); + header.src = messenger.get_myname(); + msg->encode(conn.features, messenger.get_crc_flags()); + bufferlist bl; + bl.append(CEPH_MSGR_TAG_MSG); + bl.append((const char*)&header, sizeof(header)); + bl.append(msg->get_payload()); + bl.append(msg->get_middle()); + bl.append(msg->get_data()); + auto& footer = msg->get_footer(); + if (HAVE_FEATURE(conn.features, MSG_AUTH)) { + bl.append((const char*)&footer, sizeof(footer)); + } else { + ceph_msg_footer_old old_footer; + if (messenger.get_crc_flags() & MSG_CRC_HEADER) { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + } else { + old_footer.front_crc = old_footer.middle_crc = 0; + } + if (messenger.get_crc_flags() & MSG_CRC_DATA) { + old_footer.data_crc = footer.data_crc; + } else { + old_footer.data_crc = 0; + } + old_footer.flags = footer.flags; + bl.append((const char*)&old_footer, sizeof(old_footer)); + } + // write as a seastar::net::packet + return socket->write(std::move(bl)); + // TODO: lossless policy + // .then([this, msg = std::move(msg)] { + // if (!policy.lossy) { + // sent.push(std::move(msg)); + // } + // }); +} + +seastar::future<> ProtocolV1::do_keepalive() +{ + k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( + ceph::coarse_real_clock::now()); + logger().debug("{} write keepalive2 {}", conn, k.req.stamp.tv_sec); + return socket->write(make_static_packet(k.req)); +} + +seastar::future<> ProtocolV1::do_keepalive_ack() +{ + logger().debug("{} write keepalive2 ack {}", conn, k.ack.stamp.tv_sec); + return socket->write(make_static_packet(k.ack)); +} + +seastar::future<> ProtocolV1::handle_keepalive2_ack() +{ + return socket->read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + auto t = reinterpret_cast(buf.get()); + k.ack_stamp = *t; + logger().debug("{} got keepalive2 ack {}", conn, t->tv_sec); + }); +} + +seastar::future<> ProtocolV1::handle_keepalive2() +{ + return socket->read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + k.ack.stamp = *reinterpret_cast(buf.get()); + logger().debug("{} got keepalive2 {}", conn, k.ack.stamp.tv_sec); + notify_keepalive_ack(); + }); +} + +seastar::future<> ProtocolV1::handle_ack() +{ + return socket->read_exactly(sizeof(ceph_le64)) + .then([this] (auto buf) { + auto seq = reinterpret_cast(buf.get()); + discard_up_to(&conn.sent, *seq); + }); +} + +seastar::future<> ProtocolV1::maybe_throttle() +{ + if (!conn.policy.throttler_bytes) { + return seastar::now(); + } + const auto to_read = (m.header.front_len + + m.header.middle_len + + m.header.data_len); + return conn.policy.throttler_bytes->get(to_read); +} + +seastar::future<> ProtocolV1::read_message() +{ + return socket->read(sizeof(m.header)) + .then([this] (bufferlist bl) { + // throttle the traffic, maybe + auto p = bl.cbegin(); + ::decode(m.header, p); + return maybe_throttle(); + }).then([this] { + // read front + return socket->read(m.header.front_len); + }).then([this] (bufferlist bl) { + m.front = std::move(bl); + // read middle + return socket->read(m.header.middle_len); + }).then([this] (bufferlist bl) { + m.middle = std::move(bl); + // read data + return socket->read(m.header.data_len); + }).then([this] (bufferlist bl) { + m.data = std::move(bl); + // read footer + return socket->read(sizeof(m.footer)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(m.footer, p); + auto msg = ::decode_message(nullptr, 0, m.header, m.footer, + m.front, m.middle, m.data, nullptr); + // TODO: set time stamps + msg->set_byte_throttler(conn.policy.throttler_bytes); + + if (!conn.update_rx_seq(msg->get_seq())) { + // skip this message + return; + } + + constexpr bool add_ref = false; // Message starts with 1 ref + // TODO: change MessageRef with foreign_ptr + auto msg_ref = MessageRef{msg, add_ref}; + // start dispatch, ignoring exceptions from the application layer + seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { + logger().debug("{} <= {}@{} === {}", messenger, + msg->get_source(), conn.peer_addr, *msg); + return dispatcher.ms_dispatch( + seastar::static_pointer_cast( + conn.shared_from_this()), + std::move(msg)) + .handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} ms_dispatch caught exception: {}", conn, eptr); + ceph_assert(false); + }); + }); + }); +} + +seastar::future<> ProtocolV1::handle_tags() +{ + return seastar::keep_doing([this] { + // read the next tag + return socket->read_exactly(1) + .then([this] (auto buf) { + switch (buf[0]) { + case CEPH_MSGR_TAG_MSG: + return read_message(); + case CEPH_MSGR_TAG_ACK: + return handle_ack(); + case CEPH_MSGR_TAG_KEEPALIVE: + return seastar::now(); + case CEPH_MSGR_TAG_KEEPALIVE2: + return handle_keepalive2(); + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + return handle_keepalive2_ack(); + case CEPH_MSGR_TAG_CLOSE: + logger().info("{} got tag close", conn); + throw std::system_error(make_error_code(error::connection_aborted)); + default: + logger().error("{} got unknown msgr tag {}", + conn, static_cast(buf[0])); + throw std::system_error(make_error_code(error::read_eof)); + } + }); + }); +} + +void ProtocolV1::execute_open() +{ + logger().debug("{} trigger open, was {}", conn, static_cast(state)); + state = state_t::open; + set_write_state(write_state_t::open); + + seastar::with_gate(pending_dispatch, [this] { + // start background processing of tags + return handle_tags() + .handle_exception_type([this] (const std::system_error& e) { + logger().warn("{} open fault: {}", conn, e); + if (e.code() == error::connection_aborted || + e.code() == error::connection_reset) { + return dispatcher.ms_handle_reset( + seastar::static_pointer_cast(conn.shared_from_this())) + .then([this] { + close(); + }); + } else if (e.code() == error::read_eof) { + return dispatcher.ms_handle_remote_reset( + seastar::static_pointer_cast(conn.shared_from_this())) + .then([this] { + close(); + }); + } else { + throw e; + } + }).handle_exception([this] (std::exception_ptr eptr) { + // TODO: handle fault in the open state + logger().warn("{} open fault: {}", conn, eptr); + close(); + }); + }); +} + +// closing state + +void ProtocolV1::trigger_close() +{ + logger().debug("{} trigger closing, was {}", + conn, static_cast(state)); + + if (state == state_t::accepting) { + messenger.unaccept_conn(seastar::static_pointer_cast( + conn.shared_from_this())); + } else if (state >= state_t::connecting && state < state_t::closing) { + messenger.unregister_conn(seastar::static_pointer_cast( + conn.shared_from_this())); + } else { + // cannot happen + ceph_assert(false); + } + + if (!socket) { + ceph_assert(state == state_t::connecting); + } + + state = state_t::closing; +} + +seastar::future<> ProtocolV1::fault() +{ + if (conn.policy.lossy) { + messenger.unregister_conn(seastar::static_pointer_cast( + conn.shared_from_this())); + } + if (h.backoff.count()) { + h.backoff += h.backoff; + } else { + h.backoff = conf.ms_initial_backoff; + } + if (h.backoff > conf.ms_max_backoff) { + h.backoff = conf.ms_max_backoff; + } + return seastar::sleep(h.backoff); +} + +} // namespace ceph::net diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h new file mode 100644 index 000000000000..183c7e635bfc --- /dev/null +++ b/src/crimson/net/ProtocolV1.h @@ -0,0 +1,125 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "Protocol.h" + +class AuthAuthorizer; +class AuthSessionHandler; + +namespace ceph::net { + +class ProtocolV1 final : public Protocol { + public: + ProtocolV1(Dispatcher& dispatcher, + SocketConnection& conn, + SocketMessenger& messenger); + ~ProtocolV1() override; + + private: + void start_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) override; + + void start_accept(seastar::foreign_ptr>&& socket, + const entity_addr_t& peer_addr) override; + + void trigger_close() override; + + seastar::future<> write_message(MessageRef msg) override; + + seastar::future<> do_keepalive() override; + + seastar::future<> do_keepalive_ack() override; + + private: + SocketMessenger &messenger; + + enum class state_t { + none, + accepting, + connecting, + open, + standby, + wait, + closing + }; + state_t state = state_t::none; + + // state for handshake + struct Handshake { + ceph_msg_connect connect; + ceph_msg_connect_reply reply; + AuthAuthorizer* authorizer = nullptr; + std::chrono::milliseconds backoff; + uint32_t connect_seq = 0; + uint32_t peer_global_seq = 0; + uint32_t global_seq; + } h; + + std::unique_ptr session_security; + + // state for an incoming message + struct MessageReader { + ceph_msg_header header; + ceph_msg_footer footer; + bufferlist front; + bufferlist middle; + bufferlist data; + } m; + + struct Keepalive { + struct { + const char tag = CEPH_MSGR_TAG_KEEPALIVE2; + ceph_timespec stamp; + } __attribute__((packed)) req; + struct { + const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK; + ceph_timespec stamp; + } __attribute__((packed)) ack; + ceph_timespec ack_stamp; + } k; + + private: + // connecting + void reset_session(); + seastar::future handle_connect_reply(ceph::net::msgr_tag_t tag); + seastar::future repeat_connect(); + + // accepting + seastar::future send_connect_reply( + msgr_tag_t tag, bufferlist&& authorizer_reply = {}); + seastar::future send_connect_reply_ready( + msgr_tag_t tag, bufferlist&& authorizer_reply); + seastar::future replace_existing( + SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer = false); + seastar::future handle_connect_with_existing( + SocketConnectionRef existing, bufferlist&& authorizer_reply); + bool require_auth_feature() const; + seastar::future repeat_handle_connect(); + + // open + seastar::future<> handle_keepalive2_ack(); + seastar::future<> handle_keepalive2(); + seastar::future<> handle_ack(); + seastar::future<> maybe_throttle(); + seastar::future<> read_message(); + seastar::future<> handle_tags(); + void execute_open(); + + // replacing + // the number of connections initiated in this session, increment when a + // new connection is established + uint32_t connect_seq() const { return h.connect_seq; } + // the client side should connect us with a gseq. it will be reset with + // the one of exsting connection if it's greater. + uint32_t peer_global_seq() const { return h.peer_global_seq; } + // current state of ProtocolV1 + state_t get_state() const { return state; } + + seastar::future<> fault(); +}; + +} // namespace ceph::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index ff0c9c07b378..9a547960a33a 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -14,29 +14,12 @@ #include "SocketConnection.h" -#include -#include -#include -#include - -#include "include/msgr.h" -#include "include/random.h" -#include "auth/Auth.h" -#include "auth/AuthSessionHandler.h" - -#include "crimson/common/log.h" #include "Config.h" -#include "Dispatcher.h" -#include "Errors.h" +#include "ProtocolV1.h" #include "SocketMessenger.h" using namespace ceph::net; -template -seastar::net::packet make_static_packet(const T& value) { - return { reinterpret_cast(&value), sizeof(value) }; -} - namespace { seastar::logger& logger() { return ceph::get_logger(ceph_subsys_ms); @@ -45,16 +28,13 @@ namespace { SocketConnection::SocketConnection(SocketMessenger& messenger, Dispatcher& dispatcher) - : messenger(messenger), - dispatcher(dispatcher) + : messenger(messenger) { ceph_assert(&messenger.container().local() == &messenger); + protocol = std::make_unique(dispatcher, *this, messenger); } -SocketConnection::~SocketConnection() -{ - ceph_assert(pending_dispatch.is_closed()); -} +SocketConnection::~SocketConnection() {} ceph::net::Messenger* SocketConnection::get_messenger() const { @@ -64,156 +44,32 @@ SocketConnection::get_messenger() const { seastar::future SocketConnection::is_connected() { return seastar::smp::submit_to(shard_id(), [this] { - return write_state == write_state_t::open; - }); -} - -void SocketConnection::write_event() -{ - if (write_dispatching) { - // already dispatching - return; - } - write_dispatching = true; - switch (write_state) { - case write_state_t::open: - case write_state_t::delay: - seastar::with_gate(pending_dispatch, [this] { - return seastar::repeat([this] { - switch (write_state) { - case write_state_t::open: - return seastar::futurize_apply([this] { - if (m_keepalive) { - return do_keepalive() - .then([this] { m_keepalive = false; }); - } - return seastar::now(); - }).then([this] { - if (m_keepalive_ack) { - return do_keepalive_ack() - .then([this] { m_keepalive_ack = false; }); - } - return seastar::now(); - }).then([this] { - if (!out_q.empty()){ - MessageRef msg = out_q.front(); - return write_message(msg) - .then([this, msg] { - if (msg == out_q.front()) { - out_q.pop(); - } - return stop_t::no; - }); - } else { - return socket->flush() - .then([this] { - if (!out_q.empty()) { - return stop_t::no; - } else { - write_dispatching = false; - return stop_t::yes; - } - }); - } - }).handle_exception([this] (std::exception_ptr eptr) { - logger().warn("{} write_event fault: {}", *this, eptr); - close(); - return stop_t::no; - }); - case write_state_t::delay: - // delay dispatching writes until open - return state_changed.get_shared_future() - .then([] { return stop_t::no; }); - case write_state_t::drop: - write_dispatching = false; - return seastar::make_ready_future(stop_t::yes); - default: - ceph_assert(false); - } - }); + return protocol->is_connected(); }); - return; - case write_state_t::drop: - write_dispatching = false; - default: - ceph_assert(false); - } } seastar::future<> SocketConnection::send(MessageRef msg) { logger().debug("{} --> {} === {}", messenger, get_peer_addr(), *msg); return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] { - if (write_state != write_state_t::drop) { - out_q.push(std::move(msg)); - write_event(); - } + return protocol->send(std::move(msg)); }); } seastar::future<> SocketConnection::keepalive() { return seastar::smp::submit_to(shard_id(), [this] { - if (!m_keepalive) { - m_keepalive = true; - write_event(); - } + return protocol->keepalive(); }); } seastar::future<> SocketConnection::close() { return seastar::smp::submit_to(shard_id(), [this] { - return do_close(); - }); -} - -seastar::future<> SocketConnection::handle_tags() -{ - return seastar::keep_doing([this] { - // read the next tag - return socket->read_exactly(1) - .then([this] (auto buf) { - switch (buf[0]) { - case CEPH_MSGR_TAG_MSG: - return read_message(); - case CEPH_MSGR_TAG_ACK: - return handle_ack(); - case CEPH_MSGR_TAG_KEEPALIVE: - return seastar::now(); - case CEPH_MSGR_TAG_KEEPALIVE2: - return handle_keepalive2(); - case CEPH_MSGR_TAG_KEEPALIVE2_ACK: - return handle_keepalive2_ack(); - case CEPH_MSGR_TAG_CLOSE: - logger().info("{} got tag close", *this); - throw std::system_error(make_error_code(error::connection_aborted)); - default: - logger().error("{} got unknown msgr tag {}", *this, static_cast(buf[0])); - throw std::system_error(make_error_code(error::read_eof)); - } - }); - }); -} - -seastar::future<> SocketConnection::handle_ack() -{ - return socket->read_exactly(sizeof(ceph_le64)) - .then([this] (auto buf) { - auto seq = reinterpret_cast(buf.get()); - discard_up_to(&sent, *seq); + return protocol->close(); }); } -void SocketConnection::discard_up_to(std::queue* queue, - seq_num_t seq) -{ - while (!queue->empty() && - queue->front()->get_seq() < seq) { - queue->pop(); - } -} - void SocketConnection::requeue_sent() { out_seq -= sent.size(); @@ -224,71 +80,6 @@ void SocketConnection::requeue_sent() } } -seastar::future<> SocketConnection::maybe_throttle() -{ - if (!policy.throttler_bytes) { - return seastar::now(); - } - const auto to_read = (m.header.front_len + - m.header.middle_len + - m.header.data_len); - return policy.throttler_bytes->get(to_read); -} - -seastar::future<> SocketConnection::read_message() -{ - return socket->read(sizeof(m.header)) - .then([this] (bufferlist bl) { - // throttle the traffic, maybe - auto p = bl.cbegin(); - ::decode(m.header, p); - return maybe_throttle(); - }).then([this] { - // read front - return socket->read(m.header.front_len); - }).then([this] (bufferlist bl) { - m.front = std::move(bl); - // read middle - return socket->read(m.header.middle_len); - }).then([this] (bufferlist bl) { - m.middle = std::move(bl); - // read data - return socket->read(m.header.data_len); - }).then([this] (bufferlist bl) { - m.data = std::move(bl); - // read footer - return socket->read(sizeof(m.footer)); - }).then([this] (bufferlist bl) { - auto p = bl.cbegin(); - ::decode(m.footer, p); - auto msg = ::decode_message(nullptr, 0, m.header, m.footer, - m.front, m.middle, m.data, nullptr); - // TODO: set time stamps - msg->set_byte_throttler(policy.throttler_bytes); - - if (!update_rx_seq(msg->get_seq())) { - // skip this message - return; - } - - constexpr bool add_ref = false; // Message starts with 1 ref - // TODO: change MessageRef with foreign_ptr - auto msg_ref = MessageRef{msg, add_ref}; - // start dispatch, ignoring exceptions from the application layer - seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { - logger().debug("{} <= {}@{} === {}", messenger, - msg->get_source(), get_peer_addr(), *msg); - return dispatcher.ms_dispatch( - seastar::static_pointer_cast(shared_from_this()), - std::move(msg)) - .handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} ms_dispatch caught exception: {}", *this, eptr); - ceph_assert(false); - }); - }); - }); -} - bool SocketConnection::update_rx_seq(seq_num_t seq) { if (seq <= in_seq) { @@ -308,707 +99,18 @@ bool SocketConnection::update_rx_seq(seq_num_t seq) } } -seastar::future<> SocketConnection::write_message(MessageRef msg) -{ - msg->set_seq(++out_seq); - auto& header = msg->get_header(); - header.src = messenger.get_myname(); - msg->encode(features, messenger.get_crc_flags()); - bufferlist bl; - bl.append(CEPH_MSGR_TAG_MSG); - bl.append((const char*)&header, sizeof(header)); - bl.append(msg->get_payload()); - bl.append(msg->get_middle()); - bl.append(msg->get_data()); - auto& footer = msg->get_footer(); - if (HAVE_FEATURE(features, MSG_AUTH)) { - bl.append((const char*)&footer, sizeof(footer)); - } else { - ceph_msg_footer_old old_footer; - if (messenger.get_crc_flags() & MSG_CRC_HEADER) { - old_footer.front_crc = footer.front_crc; - old_footer.middle_crc = footer.middle_crc; - } else { - old_footer.front_crc = old_footer.middle_crc = 0; - } - if (messenger.get_crc_flags() & MSG_CRC_DATA) { - old_footer.data_crc = footer.data_crc; - } else { - old_footer.data_crc = 0; - } - old_footer.flags = footer.flags; - bl.append((const char*)&old_footer, sizeof(old_footer)); - } - // write as a seastar::net::packet - return socket->write(std::move(bl)); - // TODO: lossless policy - // .then([this, msg = std::move(msg)] { - // if (!policy.lossy) { - // sent.push(std::move(msg)); - // } - // }); -} - -seastar::future<> SocketConnection::do_keepalive() -{ - k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( - ceph::coarse_real_clock::now()); - logger().debug("{} write keepalive2 {}", *this, k.req.stamp.tv_sec); - return socket->write(make_static_packet(k.req)); -} - -seastar::future<> SocketConnection::do_keepalive_ack() -{ - logger().debug("{} write keepalive2 ack {}", *this, k.ack.stamp.tv_sec); - return socket->write(make_static_packet(k.ack)); -} - -seastar::future<> SocketConnection::do_close() -{ - if (state == state_t::closing) { - // already closing - assert(close_ready.valid()); - return close_ready.get_future(); - } - - // unregister_conn() drops a reference, so hold another until completion - auto cleanup = [conn_ref = shared_from_this(), this] { - logger().debug("{} closed!", *this); - }; - - if (state == state_t::accepting) { - messenger.unaccept_conn(seastar::static_pointer_cast(shared_from_this())); - } else if (state >= state_t::connecting && state < state_t::closing) { - messenger.unregister_conn(seastar::static_pointer_cast(shared_from_this())); - } else { - // cannot happen - ceph_assert(false); - } - - // close_ready become valid only after state is state_t::closing - assert(!close_ready.valid()); - - if (socket) { - close_ready = socket->close() - .then([this] { - return pending_dispatch.close(); - }).finally(std::move(cleanup)); - } else { - ceph_assert(state == state_t::connecting); - close_ready = pending_dispatch.close().finally(std::move(cleanup)); - } - - logger().debug("{} trigger closing, was {}", *this, static_cast(state)); - state = state_t::closing; - write_state = write_state_t::drop; - state_changed.set_value(); - state_changed = seastar::shared_promise<>(); - - return close_ready.get_future(); -} - -// handshake - -/// store the banner in a non-const string for buffer::create_static() -static char banner[] = CEPH_BANNER; -constexpr size_t banner_size = sizeof(CEPH_BANNER)-1; - -constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr); -constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr); - -WRITE_RAW_ENCODER(ceph_msg_connect); -WRITE_RAW_ENCODER(ceph_msg_connect_reply); - -std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c) -{ - return out << "connect{features=" << std::hex << c.features << std::dec - << " host_type=" << c.host_type - << " global_seq=" << c.global_seq - << " connect_seq=" << c.connect_seq - << " protocol_version=" << c.protocol_version - << " authorizer_protocol=" << c.authorizer_protocol - << " authorizer_len=" << c.authorizer_len - << " flags=" << std::hex << static_cast(c.flags) << std::dec << '}'; -} - -std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r) -{ - return out << "connect_reply{tag=" << static_cast(r.tag) - << " features=" << std::hex << r.features << std::dec - << " global_seq=" << r.global_seq - << " connect_seq=" << r.connect_seq - << " protocol_version=" << r.protocol_version - << " authorizer_len=" << r.authorizer_len - << " flags=" << std::hex << static_cast(r.flags) << std::dec << '}'; -} - -// check that the buffer starts with a valid banner without requiring it to -// be contiguous in memory -static void validate_banner(bufferlist::const_iterator& p) -{ - auto b = std::cbegin(banner); - auto end = b + banner_size; - while (b != end) { - const char *buf{nullptr}; - auto remaining = std::distance(b, end); - auto len = p.get_ptr_and_advance(remaining, &buf); - if (!std::equal(buf, buf + len, b)) { - throw std::system_error(make_error_code(error::bad_connect_banner)); - } - b += len; - } -} - -// make sure that we agree with the peer about its address -static void validate_peer_addr(const entity_addr_t& addr, - const entity_addr_t& expected) -{ - if (addr == expected) { - return; - } - // ok if server bound anonymously, as long as port/nonce match - if (addr.is_blank_ip() && - addr.get_port() == expected.get_port() && - addr.get_nonce() == expected.get_nonce()) { - return; - } else { - throw std::system_error(make_error_code(error::bad_peer_address)); - } -} - -/// return a static bufferptr to the given object -template -bufferptr create_static(T& obj) -{ - return buffer::create_static(sizeof(obj), reinterpret_cast(&obj)); -} - -bool SocketConnection::require_auth_feature() const -{ - if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { - return false; - } - if (conf.cephx_require_signatures) { - return true; - } - if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || - h.connect.host_type == CEPH_ENTITY_TYPE_MDS) { - return conf.cephx_cluster_require_signatures; - } else { - return conf.cephx_service_require_signatures; - } -} - -uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const -{ - constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD; - // see also OSD.h, unlike other connection of simple/async messenger, - // crimson msgr is only used by osd - constexpr uint32_t CEPH_OSD_PROTOCOL = 10; - if (peer_type == my_type) { - // internal - return CEPH_OSD_PROTOCOL; - } else { - // public - switch (connect ? peer_type : my_type) { - case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; - case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; - case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; - default: return 0; - } - } -} - -seastar::future -SocketConnection::repeat_handle_connect() -{ - return socket->read(sizeof(h.connect)) - .then([this](bufferlist bl) { - auto p = bl.cbegin(); - ::decode(h.connect, p); - peer_type = h.connect.host_type; - return socket->read(h.connect.authorizer_len); - }).then([this] (bufferlist authorizer) { - if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) { - return seastar::make_ready_future( - CEPH_MSGR_TAG_BADPROTOVER, bufferlist{}); - } - if (require_auth_feature()) { - policy.features_required |= CEPH_FEATURE_MSG_AUTH; - } - if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features; - feat_missing != 0) { - return seastar::make_ready_future( - CEPH_MSGR_TAG_FEATURES, bufferlist{}); - } - return dispatcher.ms_verify_authorizer(peer_type, - h.connect.authorizer_protocol, - authorizer); - }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) { - memset(&h.reply, 0, sizeof(h.reply)); - if (tag) { - return send_connect_reply(tag, std::move(authorizer_reply)); - } - if (auto existing = messenger.lookup_conn(peer_addr); existing) { - return handle_connect_with_existing(existing, std::move(authorizer_reply)); - } else if (h.connect.connect_seq > 0) { - return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION, - std::move(authorizer_reply)); - } - h.connect_seq = h.connect.connect_seq + 1; - h.peer_global_seq = h.connect.global_seq; - set_features((uint64_t)policy.features_supported & (uint64_t)h.connect.features); - // TODO: cct - return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply)); - }); -} - -seastar::future -SocketConnection::send_connect_reply(msgr_tag_t tag, - bufferlist&& authorizer_reply) -{ - h.reply.tag = tag; - h.reply.features = static_cast((h.connect.features & - policy.features_supported) | - policy.features_required); - h.reply.authorizer_len = authorizer_reply.length(); - return socket->write(make_static_packet(h.reply)) - .then([this, reply=std::move(authorizer_reply)]() mutable { - return socket->write_flush(std::move(reply)); - }).then([] { - return stop_t::no; - }); -} - -seastar::future -SocketConnection::send_connect_reply_ready(msgr_tag_t tag, - bufferlist&& authorizer_reply) -{ - h.global_seq = messenger.get_global_seq(); - h.reply.tag = tag; - h.reply.features = policy.features_supported; - h.reply.global_seq = h.global_seq; - h.reply.connect_seq = h.connect_seq; - h.reply.flags = 0; - if (policy.lossy) { - h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; - } - h.reply.authorizer_len = authorizer_reply.length(); - return socket->write(make_static_packet(h.reply)) - .then([this, reply=std::move(authorizer_reply)]() mutable { - if (reply.length()) { - return socket->write(std::move(reply)); - } else { - return seastar::now(); - } - }).then([this] { - if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { - return socket->write_flush(make_static_packet(in_seq)) - .then([this] { - return socket->read_exactly(sizeof(seq_num_t)); - }).then([this] (auto buf) { - auto acked_seq = reinterpret_cast(buf.get()); - discard_up_to(&out_q, *acked_seq); - }); - } else { - return socket->flush(); - } - }).then([this] { - return stop_t::yes; - }); -} - -seastar::future<> -SocketConnection::handle_keepalive2() -{ - return socket->read_exactly(sizeof(ceph_timespec)) - .then([this] (auto buf) { - k.ack.stamp = *reinterpret_cast(buf.get()); - logger().debug("{} got keepalive2 {}", *this, k.ack.stamp.tv_sec); - if (!m_keepalive_ack) { - m_keepalive_ack = true; - write_event(); - } - }); -} - -seastar::future<> -SocketConnection::handle_keepalive2_ack() -{ - return socket->read_exactly(sizeof(ceph_timespec)) - .then([this] (auto buf) { - auto t = reinterpret_cast(buf.get()); - k.ack_stamp = *t; - logger().debug("{} got keepalive2 ack {}", *this, t->tv_sec); - }); -} - -seastar::future -SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply) -{ - if (h.connect.global_seq < existing->peer_global_seq()) { - h.reply.global_seq = existing->peer_global_seq(); - return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL); - } else if (existing->is_lossy()) { - return replace_existing(existing, std::move(authorizer_reply)); - } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) { - return replace_existing(existing, std::move(authorizer_reply), true); - } else if (h.connect.connect_seq < existing->connect_seq()) { - // old attempt, or we sent READY but they didn't get it. - h.reply.connect_seq = existing->connect_seq() + 1; - return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); - } else if (h.connect.connect_seq == existing->connect_seq()) { - // if the existing connection successfully opened, and/or - // subsequently went to standby, then the peer should bump - // their connect_seq and retry: this is not a connection race - // we need to resolve here. - if (existing->get_state() == state_t::open || - existing->get_state() == state_t::standby) { - if (policy.resetcheck && existing->connect_seq() == 0) { - return replace_existing(existing, std::move(authorizer_reply)); - } else { - h.reply.connect_seq = existing->connect_seq() + 1; - return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); - } - } else if (peer_addr < messenger.get_myaddr() || - existing->is_server_side()) { - // incoming wins - return replace_existing(existing, std::move(authorizer_reply)); - } else { - return send_connect_reply(CEPH_MSGR_TAG_WAIT); - } - } else if (policy.resetcheck && - existing->connect_seq() == 0) { - return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION); - } else { - return replace_existing(existing, std::move(authorizer_reply)); - } -} - -seastar::future -SocketConnection::replace_existing(SocketConnectionRef existing, - bufferlist&& authorizer_reply, - bool is_reset_from_peer) -{ - msgr_tag_t reply_tag; - if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) && - !is_reset_from_peer) { - reply_tag = CEPH_MSGR_TAG_SEQ; - } else { - reply_tag = CEPH_MSGR_TAG_READY; - } - messenger.unregister_conn(existing); - if (!existing->is_lossy()) { - // reset the in_seq if this is a hard reset from peer, - // otherwise we respect our original connection's value - in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num(); - // steal outgoing queue and out_seq - existing->requeue_sent(); - std::tie(out_seq, out_q) = existing->get_out_queue(); - } - return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); -} - -seastar::future -SocketConnection::handle_connect_reply(msgr_tag_t tag) -{ - switch (tag) { - case CEPH_MSGR_TAG_FEATURES: - logger().error("{} connect protocol feature mispatch", __func__); - throw std::system_error(make_error_code(error::negotiation_failure)); - case CEPH_MSGR_TAG_BADPROTOVER: - logger().error("{} connect protocol version mispatch", __func__); - throw std::system_error(make_error_code(error::negotiation_failure)); - case CEPH_MSGR_TAG_BADAUTHORIZER: - logger().error("{} got bad authorizer", __func__); - throw std::system_error(make_error_code(error::negotiation_failure)); - case CEPH_MSGR_TAG_RESETSESSION: - reset_session(); - return seastar::make_ready_future(stop_t::no); - case CEPH_MSGR_TAG_RETRY_GLOBAL: - h.global_seq = messenger.get_global_seq(h.reply.global_seq); - return seastar::make_ready_future(stop_t::no); - case CEPH_MSGR_TAG_RETRY_SESSION: - ceph_assert(h.reply.connect_seq > h.connect_seq); - h.connect_seq = h.reply.connect_seq; - return seastar::make_ready_future(stop_t::no); - case CEPH_MSGR_TAG_WAIT: - // TODO: state wait - throw std::system_error(make_error_code(error::negotiation_failure)); - case CEPH_MSGR_TAG_SEQ: - case CEPH_MSGR_TAG_READY: - if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features); - missing) { - logger().error("{} missing required features", __func__); - throw std::system_error(make_error_code(error::negotiation_failure)); - } - return seastar::futurize_apply([this, tag] { - if (tag == CEPH_MSGR_TAG_SEQ) { - return socket->read_exactly(sizeof(seq_num_t)) - .then([this] (auto buf) { - auto acked_seq = reinterpret_cast(buf.get()); - discard_up_to(&out_q, *acked_seq); - return socket->write_flush(make_static_packet(in_seq)); - }); - } - // tag CEPH_MSGR_TAG_READY - return seastar::now(); - }).then([this] { - // hooray! - h.peer_global_seq = h.reply.global_seq; - policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY; - h.connect_seq++; - h.backoff = 0ms; - set_features(h.reply.features & h.connect.features); - if (h.authorizer) { - session_security.reset( - get_auth_session_handler(nullptr, - h.authorizer->protocol, - h.authorizer->session_key, - features)); - } - h.authorizer = nullptr; - return seastar::make_ready_future(stop_t::yes); - }); - break; - default: - // unknown tag - logger().error("{} got unknown tag", __func__, int(tag)); - throw std::system_error(make_error_code(error::negotiation_failure)); - } -} - -void SocketConnection::reset_session() -{ - decltype(out_q){}.swap(out_q); - decltype(sent){}.swap(sent); - in_seq = 0; - h.connect_seq = 0; - if (HAVE_FEATURE(features, MSG_AUTH)) { - // Set out_seq to a random value, so CRC won't be predictable. - // Constant to limit starting sequence number to 2^31. Nothing special - // about it, just a big number. - constexpr uint64_t SEQ_MASK = 0x7fffffff; - out_seq = ceph::util::generate_random_number(0, SEQ_MASK); - } else { - // previously, seq #'s always started at 0. - out_seq = 0; - } -} - -seastar::future -SocketConnection::repeat_connect() -{ - // encode ceph_msg_connect - memset(&h.connect, 0, sizeof(h.connect)); - h.connect.features = policy.features_supported; - h.connect.host_type = messenger.get_myname().type(); - h.connect.global_seq = h.global_seq; - h.connect.connect_seq = h.connect_seq; - h.connect.protocol_version = get_proto_version(peer_type, true); - // this is fyi, actually, server decides! - h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; - - h.authorizer = dispatcher.ms_get_authorizer(peer_type); - bufferlist bl; - if (h.authorizer) { - h.connect.authorizer_protocol = h.authorizer->protocol; - h.connect.authorizer_len = h.authorizer->bl.length(); - bl.append(create_static(h.connect)); - bl.append(h.authorizer->bl); - } else { - h.connect.authorizer_protocol = 0; - h.connect.authorizer_len = 0; - bl.append(create_static(h.connect)); - } - return socket->write_flush(std::move(bl)) - .then([this] { - // read the reply - return socket->read(sizeof(h.reply)); - }).then([this] (bufferlist bl) { - auto p = bl.cbegin(); - ::decode(h.reply, p); - ceph_assert(p.end()); - return socket->read(h.reply.authorizer_len); - }).then([this] (bufferlist bl) { - if (h.authorizer) { - auto reply = bl.cbegin(); - if (!h.authorizer->verify_reply(reply, nullptr)) { - logger().error("{} authorizer failed to verify reply", __func__); - throw std::system_error(make_error_code(error::negotiation_failure)); - } - } - return handle_connect_reply(h.reply.tag); - }); -} - void SocketConnection::start_connect(const entity_addr_t& _peer_addr, const entity_type_t& _peer_type) { - ceph_assert(state == state_t::none); - logger().debug("{} trigger connecting, was {}", *this, static_cast(state)); - state = state_t::connecting; - write_state = write_state_t::delay; - state_changed.set_value(); - state_changed = seastar::shared_promise<>(); - - ceph_assert(!socket); - peer_addr = _peer_addr; - peer_type = _peer_type; - messenger.register_conn(seastar::static_pointer_cast(shared_from_this())); - seastar::with_gate(pending_dispatch, [this] { - return seastar::connect(peer_addr.in4_addr()) - .then([this](seastar::connected_socket fd) { - if (state == state_t::closing) { - fd.shutdown_input(); - fd.shutdown_output(); - throw std::system_error(make_error_code(error::connection_aborted)); - } - socket = seastar::make_foreign(std::make_unique(std::move(fd))); - // read server's handshake header - return socket->read(server_header_size); - }).then([this] (bufferlist headerbl) { - auto p = headerbl.cbegin(); - validate_banner(p); - entity_addr_t saddr, caddr; - ::decode(saddr, p); - ::decode(caddr, p); - ceph_assert(p.end()); - validate_peer_addr(saddr, peer_addr); - - side = side_t::connector; - socket_port = caddr.get_port(); - return messenger.learned_addr(caddr); - }).then([this] { - // encode/send client's handshake header - bufferlist bl; - bl.append(buffer::create_static(banner_size, banner)); - ::encode(messenger.get_myaddr(), bl, 0); - h.global_seq = messenger.get_global_seq(); - return socket->write_flush(std::move(bl)); - }).then([=] { - return seastar::repeat([this] { - return repeat_connect(); - }); - }).then([this] { - // notify the dispatcher and allow them to reject the connection - return dispatcher.ms_handle_connect(seastar::static_pointer_cast(shared_from_this())); - }).then([this] { - execute_open(); - }).handle_exception([this] (std::exception_ptr eptr) { - // TODO: handle fault in the connecting state - logger().warn("{} connecting fault: {}", *this, eptr); - close(); - }); - }); + protocol->start_connect(_peer_addr, _peer_type); } void SocketConnection::start_accept(seastar::foreign_ptr>&& sock, const entity_addr_t& _peer_addr) { - ceph_assert(state == state_t::none); - logger().debug("{} trigger accepting, was {}", *this, static_cast(state)); - state = state_t::accepting; - write_state = write_state_t::delay; - state_changed.set_value(); - state_changed = seastar::shared_promise<>(); - - ceph_assert(!socket); - peer_addr.u = _peer_addr.u; - peer_addr.set_port(0); - side = side_t::acceptor; - socket_port = _peer_addr.get_port(); - socket = std::move(sock); - messenger.accept_conn(seastar::static_pointer_cast(shared_from_this())); - seastar::with_gate(pending_dispatch, [this, _peer_addr] { - // encode/send server's handshake header - bufferlist bl; - bl.append(buffer::create_static(banner_size, banner)); - ::encode(messenger.get_myaddr(), bl, 0); - ::encode(_peer_addr, bl, 0); - return socket->write_flush(std::move(bl)) - .then([this] { - // read client's handshake header and connect request - return socket->read(client_header_size); - }).then([this] (bufferlist bl) { - auto p = bl.cbegin(); - validate_banner(p); - entity_addr_t addr; - ::decode(addr, p); - ceph_assert(p.end()); - peer_addr.set_type(addr.get_type()); - peer_addr.set_port(addr.get_port()); - peer_addr.set_nonce(addr.get_nonce()); - return seastar::repeat([this] { - return repeat_handle_connect(); - }); - }).then([this] { - // notify the dispatcher and allow them to reject the connection - return dispatcher.ms_handle_accept(seastar::static_pointer_cast(shared_from_this())); - }).then([this] { - messenger.register_conn(seastar::static_pointer_cast(shared_from_this())); - messenger.unaccept_conn(seastar::static_pointer_cast(shared_from_this())); - execute_open(); - }).handle_exception([this] (std::exception_ptr eptr) { - // TODO: handle fault in the accepting state - logger().warn("{} accepting fault: {}", *this, eptr); - close(); - }); - }); -} - -void -SocketConnection::execute_open() -{ - logger().debug("{} trigger open, was {}", *this, static_cast(state)); - state = state_t::open; - write_state = write_state_t::open; - state_changed.set_value(); - state_changed = seastar::shared_promise<>(); - - seastar::with_gate(pending_dispatch, [this] { - // start background processing of tags - return handle_tags() - .handle_exception_type([this] (const std::system_error& e) { - logger().warn("{} open fault: {}", *this, e); - if (e.code() == error::connection_aborted || - e.code() == error::connection_reset) { - return dispatcher.ms_handle_reset(seastar::static_pointer_cast(shared_from_this())) - .then([this] { - close(); - }); - } else if (e.code() == error::read_eof) { - return dispatcher.ms_handle_remote_reset(seastar::static_pointer_cast(shared_from_this())) - .then([this] { - close(); - }); - } else { - throw e; - } - }).handle_exception([this] (std::exception_ptr eptr) { - // TODO: handle fault in the open state - logger().warn("{} open fault: {}", *this, eptr); - close(); - }); - }); -} - -seastar::future<> SocketConnection::fault() -{ - if (policy.lossy) { - messenger.unregister_conn(seastar::static_pointer_cast(shared_from_this())); - } - if (h.backoff.count()) { - h.backoff += h.backoff; - } else { - h.backoff = conf.ms_initial_backoff; - } - if (h.backoff > conf.ms_max_backoff) { - h.backoff = conf.ms_max_backoff; - } - return seastar::sleep(h.backoff); + protocol->start_accept(std::move(sock), _peer_addr); } seastar::shard_id SocketConnection::shard_id() const { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 93794310c0c8..a403b6790d31 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -14,32 +14,23 @@ #pragma once -#include -#include -#include #include #include "msg/Policy.h" #include "Connection.h" -#include "Socket.h" #include "crimson/thread/Throttle.h" -class AuthAuthorizer; -class AuthSessionHandler; - namespace ceph::net { -using stop_t = seastar::stop_iteration; - +class Protocol; +class Socket; class SocketMessenger; class SocketConnection; using SocketConnectionRef = seastar::shared_ptr; class SocketConnection : public Connection { SocketMessenger& messenger; - seastar::foreign_ptr> socket; - Dispatcher& dispatcher; - seastar::gate pending_dispatch; + std::unique_ptr protocol; // if acceptor side, socket_port is different from peer_addr.get_port(); // if connector side, socket_port is different from my_addr.get_port(). @@ -51,84 +42,6 @@ class SocketConnection : public Connection { side_t side = side_t::none; uint16_t socket_port = 0; - enum class state_t { - none, - accepting, - connecting, - open, - standby, - wait, - closing - }; - state_t state = state_t::none; - // wait until current state changed - seastar::shared_promise<> state_changed; - - // write_state is changed with state atomically, indicating the write - // behavior of the according state. - enum class write_state_t { - none, - delay, - open, - drop - }; - write_state_t write_state = write_state_t::none; - - /// become valid only when state is state_t::closing - seastar::shared_future<> close_ready; - - /// state for handshake - struct Handshake { - ceph_msg_connect connect; - ceph_msg_connect_reply reply; - AuthAuthorizer* authorizer = nullptr; - std::chrono::milliseconds backoff; - uint32_t connect_seq = 0; - uint32_t peer_global_seq = 0; - uint32_t global_seq; - } h; - - /// server side of handshake negotiation - seastar::future repeat_handle_connect(); - seastar::future handle_connect_with_existing(SocketConnectionRef existing, - bufferlist&& authorizer_reply); - seastar::future replace_existing(SocketConnectionRef existing, - bufferlist&& authorizer_reply, - bool is_reset_from_peer = false); - seastar::future send_connect_reply(ceph::net::msgr_tag_t tag, - bufferlist&& authorizer_reply = {}); - seastar::future send_connect_reply_ready(ceph::net::msgr_tag_t tag, - bufferlist&& authorizer_reply); - - seastar::future<> handle_keepalive2(); - seastar::future<> handle_keepalive2_ack(); - - bool require_auth_feature() const; - uint32_t get_proto_version(entity_type_t peer_type, bool connec) const; - /// client side of handshake negotiation - seastar::future repeat_connect(); - seastar::future handle_connect_reply(ceph::net::msgr_tag_t tag); - void reset_session(); - - /// state for an incoming message - struct MessageReader { - ceph_msg_header header; - ceph_msg_footer footer; - bufferlist front; - bufferlist middle; - bufferlist data; - } m; - - seastar::future<> maybe_throttle(); - seastar::future<> handle_tags(); - seastar::future<> handle_ack(); - - bool write_dispatching = false; - void write_event(); - - /// encode/write a message - seastar::future<> write_message(MessageRef msg); - ceph::net::Policy policy; uint64_t features; void set_features(uint64_t new_features) { @@ -144,42 +57,15 @@ class SocketConnection : public Connection { /// false otherwise. bool update_rx_seq(seq_num_t seq); - seastar::future<> read_message(); - - std::unique_ptr session_security; - // messages to be resent after connection gets reset std::queue out_q; // messages sent, but not yet acked by peer std::queue sent; - static void discard_up_to(std::queue*, seq_num_t); - - struct Keepalive { - struct { - const char tag = CEPH_MSGR_TAG_KEEPALIVE2; - ceph_timespec stamp; - } __attribute__((packed)) req; - struct { - const char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK; - ceph_timespec stamp; - } __attribute__((packed)) ack; - ceph_timespec ack_stamp; - } k; - bool m_keepalive = false; - bool m_keepalive_ack = false; - - seastar::future<> fault(); - - void execute_open(); - - seastar::future<> do_keepalive(); - seastar::future<> do_keepalive_ack(); - seastar::future<> do_close(); public: SocketConnection(SocketMessenger& messenger, Dispatcher& dispatcher); - ~SocketConnection(); + ~SocketConnection() override; Messenger* get_messenger() const override; @@ -209,28 +95,14 @@ class SocketConnection : public Connection { void start_accept(seastar::foreign_ptr>&& socket, const entity_addr_t& peer_addr); - /// the number of connections initiated in this session, increment when a - /// new connection is established - uint32_t connect_seq() const { - return h.connect_seq; - } - - /// the client side should connect us with a gseq. it will be reset with - /// the one of exsting connection if it's greater. - uint32_t peer_global_seq() const { - return h.peer_global_seq; - } seq_num_t rx_seq_num() const { return in_seq; } - /// current state of connection - state_t get_state() const { - return state; - } bool is_server_side() const { return policy.server; } + bool is_lossy() const { return policy.lossy; } @@ -241,6 +113,9 @@ class SocketConnection : public Connection { std::tuple> get_out_queue() { return {out_seq, std::move(out_q)}; } + + friend class Protocol; + friend class ProtocolV1; }; } // namespace ceph::net