From a4c7dcd192f28047894a4f95f12684f973a7f700 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Fri, 25 May 2018 14:40:54 +0800 Subject: [PATCH] crimson/net: support the lossless connection and auth Signed-off-by: Kefu Chai --- src/crimson/net/CMakeLists.txt | 1 + src/crimson/net/Connection.h | 35 +- src/crimson/net/Dispatcher.cc | 11 + src/crimson/net/Dispatcher.h | 17 +- src/crimson/net/Fwd.h | 6 + src/crimson/net/Messenger.h | 35 +- src/crimson/net/SocketConnection.cc | 552 ++++++++++++++++++++++++---- src/crimson/net/SocketConnection.h | 103 +++++- src/crimson/net/SocketMessenger.cc | 33 +- src/crimson/net/SocketMessenger.h | 15 +- src/test/crimson/CMakeLists.txt | 3 +- src/test/crimson/test_messenger.cc | 2 +- 12 files changed, 731 insertions(+), 82 deletions(-) create mode 100644 src/crimson/net/Dispatcher.cc diff --git a/src/crimson/net/CMakeLists.txt b/src/crimson/net/CMakeLists.txt index c707adddf63da..99cd65b03ad23 100644 --- a/src/crimson/net/CMakeLists.txt +++ b/src/crimson/net/CMakeLists.txt @@ -1,4 +1,5 @@ set(crimson_net_srcs + Dispatcher.cc Errors.cc SocketConnection.cc SocketMessenger.cc) diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 67ba97e6de3ec..963ccc655c925 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -21,6 +22,8 @@ namespace ceph::net { +using seq_num_t = uint64_t; + class Connection : public boost::intrusive_ref_counter { protected: Messenger *const messenger; @@ -42,7 +45,8 @@ class Connection : public boost::intrusive_ref_counter { virtual bool is_connected() = 0; /// complete a handshake from the client's perspective - virtual seastar::future<> client_handshake() = 0; + virtual seastar::future<> client_handshake(entity_type_t peer_type, + entity_type_t host_type) = 0; /// complete a handshake from the server's perspective virtual seastar::future<> server_handshake() = 0; @@ -55,6 +59,35 @@ class Connection : public boost::intrusive_ref_counter { /// close the connection and cancel any any pending futures from read/send virtual seastar::future<> close() = 0; + + /// move all messages in the sent list back into the queue + virtual void requeue_sent() = 0; + + /// get all messages in the out queue + virtual std::tuple> get_out_queue() = 0; + +public: + enum class state_t { + none, + open, + standby, + closed, + wait + }; + /// the number of connections initiated in this session, increment when a + /// new connection is established + virtual uint32_t connect_seq() const = 0; + + /// the client side should connect us with a gseq. it will be reset with a + /// the one of exsting connection if it's greater. + virtual uint32_t peer_global_seq() const = 0; + + virtual seq_num_t rx_seq_num() const = 0; + + /// current state of connection + virtual state_t get_state() const = 0; + virtual bool is_server_side() const = 0; + virtual bool is_lossy() const = 0; }; } // namespace ceph::net diff --git a/src/crimson/net/Dispatcher.cc b/src/crimson/net/Dispatcher.cc new file mode 100644 index 0000000000000..e47290a7025f1 --- /dev/null +++ b/src/crimson/net/Dispatcher.cc @@ -0,0 +1,11 @@ +#include "auth/Auth.h" +#include "Dispatcher.h" + +namespace ceph::net +{ +seastar::future> +Dispatcher::ms_get_authorizer(peer_type_t, bool force_new) +{ + return seastar::make_ready_future>(nullptr); +} +} diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index 646c70f3c9511..494895946616d 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -18,8 +18,9 @@ #include "Fwd.h" -namespace ceph { -namespace net { +class AuthAuthorizer; + +namespace ceph::net { class Dispatcher { public: @@ -45,8 +46,14 @@ class Dispatcher { return seastar::make_ready_future<>(); } - // TODO: authorizer + virtual seastar::future + ms_verify_authorizer(peer_type_t, + auth_proto_t, + bufferlist&) { + return seastar::make_ready_future(0, bufferlist{}); + } + virtual seastar::future> + ms_get_authorizer(peer_type_t, bool force_new); }; -} // namespace net -} // namespace ceph +} // namespace ceph::net diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index c928bdebaf0f9..d234048a35c32 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -18,12 +18,18 @@ #include "Errors.h" #include "msg/msg_types.h" +#include "msg/Message.h" + +using peer_type_t = int; +using auth_proto_t = int; class Message; using MessageRef = boost::intrusive_ptr; namespace ceph::net { +using msgr_tag_t = uint8_t; + class Connection; using ConnectionRef = boost::intrusive_ptr; diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index 5230b77c6d152..db01ac247a4bd 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -25,6 +25,8 @@ namespace ceph::net { class Messenger { entity_name_t my_name; entity_addr_t my_addr; + uint32_t global_seq = 0; + uint32_t crc_flags = 0; public: Messenger(const entity_name_t& name) @@ -45,11 +47,42 @@ class Messenger { virtual seastar::future<> start(Dispatcher *dispatcher) = 0; /// establish a client connection and complete a handshake - virtual seastar::future connect(const entity_addr_t& addr) = 0; + virtual seastar::future connect(const entity_addr_t& addr, + entity_type_t peer_type) = 0; /// stop listenening and wait for all connections to close. safe to destruct /// after this future becomes available virtual seastar::future<> shutdown() = 0; + + uint32_t get_global_seq(uint32_t old=0) { + if (old > global_seq) { + global_seq = old; + } + return ++global_seq; + } + ConnectionRef lookup_conn(const entity_addr_t&) { + // TODO: replace handling + return nullptr; + } + + // @returns a tuple of + virtual seastar::future /// auth_reply + verify_authorizer(peer_type_t peer_type, + auth_proto_t protocol, + bufferlist& auth) = 0; + virtual seastar::future> + get_authorizer(peer_type_t peer_type, + bool force_new) = 0; + uint32_t get_crc_flags() const { + return crc_flags; + } + void set_crc_data() { + crc_flags |= MSG_CRC_DATA; + } + void set_crc_header() { + crc_flags |= MSG_CRC_HEADER; + } }; } // namespace ceph::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 87905114b3fac..e820d058ba769 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -14,10 +14,16 @@ #include #include +#include +#include "Config.h" +#include "Messenger.h" #include "SocketConnection.h" #include "include/msgr.h" +#include "include/random.h" +#include "auth/Auth.h" +#include "auth/AuthSessionHandler.h" #include "msg/Message.h" using namespace ceph::net; @@ -88,6 +94,9 @@ struct bufferlist_consumer { seastar::future SocketConnection::read(size_t bytes) { + if (bytes == 0) { + return seastar::make_ready_future(); + } r.buffer.clear(); r.remaining = bytes; return in.consume(bufferlist_consumer{r.buffer, r.remaining}) @@ -113,35 +122,16 @@ void SocketConnection::read_tags_until_next_message() // stop looping and notify read_header() return seastar::make_ready_future( seastar::stop_iteration::yes); - case CEPH_MSGR_TAG_ACK: - return in.read_exactly(sizeof(ceph_le64)) - .then([] (auto buf) { - auto seq = reinterpret_cast(buf.get()); - std::cout << "ack " << *seq << std::endl; - return seastar::stop_iteration::no; - }); - + return handle_ack(); case CEPH_MSGR_TAG_KEEPALIVE: break; - case CEPH_MSGR_TAG_KEEPALIVE2: - return in.read_exactly(sizeof(ceph_timespec)) - .then([] (auto buf) { - auto t = reinterpret_cast(buf.get()); - std::cout << "keepalive2 " << t->tv_sec << std::endl; - // TODO: schedule ack - return seastar::stop_iteration::no; - }); - + return handle_keepalive2() + .then([this] { return seastar::stop_iteration::no; }); case CEPH_MSGR_TAG_KEEPALIVE2_ACK: - return in.read_exactly(sizeof(ceph_timespec)) - .then([] (auto buf) { - auto t = reinterpret_cast(buf.get()); - std::cout << "keepalive2 ack " << t->tv_sec << std::endl; - return seastar::stop_iteration::no; - }); - + return handle_keepalive2_ack() + .then([this] { return seastar::stop_iteration::no; }); case CEPH_MSGR_TAG_CLOSE: std::cout << "close" << std::endl; break; @@ -156,6 +146,35 @@ void SocketConnection::read_tags_until_next_message() }); } +seastar::future SocketConnection::handle_ack() +{ + return in.read_exactly(sizeof(ceph_le64)) + .then([this] (auto buf) { + auto seq = reinterpret_cast(buf.get()); + discard_up_to(&sent, *seq); + return seastar::stop_iteration::no; + }); +} + +void SocketConnection::discard_up_to(std::queue* queue, + seq_num_t seq) +{ + while (!queue->empty() && + queue->front()->get_seq() < seq) { + queue->pop(); + } +} + +void SocketConnection::requeue_sent() +{ + out_seq -= sent.size(); + while (!sent.empty()) { + auto m = sent.front(); + sent.pop(); + out_q.push(std::move(m)); + } +} + seastar::future SocketConnection::read_message() { return on_message.get_future() @@ -190,18 +209,75 @@ seastar::future SocketConnection::read_message() m.front, m.middle, m.data, nullptr); constexpr bool add_ref = false; // Message starts with 1 ref return MessageRef{msg, add_ref}; + }).then([this] (MessageRef msg) { + if (msg) { + // TODO: set time stamps + msg->set_byte_throttler(policy.throttler_bytes); + if (!update_rx_seq(msg->get_seq())) { + msg.reset(); + } + } + return msg; }); } +bool SocketConnection::update_rx_seq(seq_num_t seq) +{ + if (seq <= in_seq) { + if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && + conf.ms_die_on_old_message) { + assert(0 == "old msgs despite reconnect_seq feature"); + } + return false; + } else if (seq > in_seq + 1) { + if (conf.ms_die_on_skipped_message) { + assert(0 == "skipped incoming seq"); + } + return false; + } else { + in_seq = seq; + return true; + } +} + seastar::future<> SocketConnection::write_message(MessageRef msg) { + msg->set_seq(++out_seq); + msg->encode(features, get_messenger()->get_crc_flags()); bufferlist bl; - unsigned char tag = CEPH_MSGR_TAG_MSG; - encode(tag, bl); - encode_message(msg.get(), 0, bl); + bl.append(CEPH_MSGR_TAG_MSG); + auto& header = msg->get_header(); + bl.append((const char*)&header, sizeof(header)); + bl.append(msg->get_payload()); + bl.append(msg->get_middle()); + bl.append(msg->get_data()); + auto& footer = msg->get_footer(); + if (has_feature(CEPH_FEATURE_MSG_AUTH)) { + bl.append((const char*)&footer, sizeof(footer)); + } else { + ceph_msg_footer_old old_footer; + if (get_messenger()->get_crc_flags() & MSG_CRC_HEADER) { + old_footer.front_crc = footer.front_crc; + old_footer.middle_crc = footer.middle_crc; + } else { + old_footer.front_crc = old_footer.middle_crc = 0; + } + if (get_messenger()->get_crc_flags() & MSG_CRC_DATA) { + old_footer.data_crc = footer.data_crc; + } else { + old_footer.data_crc = 0; + } + old_footer.flags = footer.flags; + bl.append((const char*)&old_footer, sizeof(old_footer)); + } // write as a seastar::net::packet return out.write(std::move(bl)) - .then([this] { return out.flush(); }); + .then([this] { return out.flush(); }) + .then([this, msg = std::move(msg)] { + if (!policy.lossy) { + sent.push(std::move(msg)); + } + }); } seastar::future<> SocketConnection::send(MessageRef msg) @@ -299,29 +375,379 @@ bufferptr create_static(T& obj) return buffer::create_static(sizeof(obj), reinterpret_cast(&obj)); } +bool SocketConnection::require_auth_feature() const +{ + if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) { + return false; + } + if (conf.cephx_require_signatures) { + return true; + } + if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD || + h.connect.host_type == CEPH_ENTITY_TYPE_MDS) { + return conf.cephx_cluster_require_signatures; + } else { + return conf.cephx_service_require_signatures; + } +} + +uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const +{ + constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD; + // see also OSD.h, unlike other connection of simple/async messenger, + // crimson msgr is only used by osd + constexpr uint32_t CEPH_OSD_PROTOCOL = 10; + if (peer_type == my_type) { + // internal + return CEPH_OSD_PROTOCOL; + } else { + // public + switch (connect ? peer_type : my_type) { + case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL; + case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL; + case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL; + default: return 0; + } + } +} + seastar::future<> SocketConnection::handle_connect() { - memset(&h.reply, 0, sizeof(h.reply)); + return read(sizeof(h.connect)) + .then([this](bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.connect, p); + return read(h.connect.authorizer_len); + }).then([this] (bufferlist authorizer) { + if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) { + return seastar::make_ready_future( + CEPH_MSGR_TAG_BADPROTOVER, bufferlist{}); + } + if (require_auth_feature()) { + policy.features_required |= CEPH_FEATURE_MSG_AUTH; + } + if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features; + feat_missing != 0) { + return seastar::make_ready_future( + CEPH_MSGR_TAG_FEATURES, bufferlist{}); + } + return get_messenger()->verify_authorizer(get_peer_type(), + h.connect.authorizer_protocol, + authorizer); + }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) { + if (tag) { + return send_connect_reply(tag, std::move(authorizer_reply)); + } + if (auto existing = get_messenger()->lookup_conn(peer_addr); existing) { + return handle_connect_with_existing(existing, std::move(authorizer_reply)); + } else if (h.connect.connect_seq > 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION, + std::move(authorizer_reply)); + } + h.connect_seq = h.connect.connect_seq + 1; + h.peer_global_seq = h.connect.global_seq; + set_features((uint64_t)h.reply.features & (uint64_t)h.connect.features); + // TODO: cct + return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply)); + }); +} - h.reply.protocol_version = CEPH_OSDC_PROTOCOL; - h.reply.tag = CEPH_MSGR_TAG_READY; +seastar::future<> +SocketConnection::send_connect_reply(msgr_tag_t tag, + bufferlist&& authorizer_reply) +{ + h.reply.tag = tag; + h.reply.features = static_cast((h.connect.features & + policy.features_supported) | + policy.features_required); + h.reply.authorizer_len = authorizer_reply.length(); + return out.write(reinterpret_cast(&h.reply), sizeof(h.reply)) + .then([this, reply=std::move(authorizer_reply)]() mutable { + out.write(std::move(reply)); + }).then([this] { + return out.flush(); + }); +} - bufferlist bl; - bl.append(create_static(h.reply)); +seastar::future<> +SocketConnection::send_connect_reply_ready(msgr_tag_t tag, + bufferlist&& authorizer_reply) +{ + h.reply.tag = tag; + h.reply.features = policy.features_supported; + h.reply.global_seq = get_messenger()->get_global_seq(); + h.reply.connect_seq = h.connect_seq; + h.reply.flags = 0; + if (policy.lossy) { + h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; + } + h.reply.authorizer_len = authorizer_reply.length(); + return out.write(reinterpret_cast(&h.reply), sizeof(h.reply)) + .then([this, reply=std::move(authorizer_reply)]() mutable { + if (reply.length()) { + return out.write(std::move(reply)); + } else { + return seastar::now(); + } + }).then([this] { + if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { + return out.write(reinterpret_cast(&in_seq), + sizeof(in_seq)).then([this] { + return out.flush(); + }).then([this] { + return in.read_exactly(sizeof(seq_num_t)); + }).then([this] (auto buf) { + auto acked_seq = reinterpret_cast(buf.get()); + discard_up_to(&out_q, *acked_seq); + }); + } else { + return out.flush(); + } + }).then([this] { + state = state_t::open; + }); +} - return out.write(std::move(bl)) - .then([this] { return out.flush(); }); +seastar::future<> +SocketConnection::handle_keepalive2() +{ + return in.read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + auto t = reinterpret_cast(buf.get()); + k.reply_stamp = *t; + std::cout << "keepalive2 " << t->tv_sec << std::endl; + char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK; + return out.write(reinterpret_cast(&tag), sizeof(tag)); + }).then([this] { + out.write(reinterpret_cast(&k.reply_stamp), + sizeof(k.reply_stamp)); + }).then([this] { + return out.flush(); + }); +} + +seastar::future<> +SocketConnection::handle_keepalive2_ack() +{ + return in.read_exactly(sizeof(ceph_timespec)) + .then([this] (auto buf) { + auto t = reinterpret_cast(buf.get()); + k.ack_stamp = *t; + std::cout << "keepalive2 ack " << t->tv_sec << std::endl; + }); +} + +seastar::future<> +SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlist&& authorizer_reply) +{ + if (h.connect.global_seq < existing->peer_global_seq()) { + h.reply.global_seq = existing->peer_global_seq(); + return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL); + } else if (existing->is_lossy()) { + return replace_existing(existing, std::move(authorizer_reply)); + } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) { + return replace_existing(existing, std::move(authorizer_reply), true); + } else if (h.connect.connect_seq < existing->connect_seq()) { + // old attempt, or we sent READY but they didn't get it. + h.reply.connect_seq = existing->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } else if (h.connect.connect_seq == existing->connect_seq()) { + // if the existing connection successfully opened, and/or + // subsequently went to standby, then the peer should bump + // their connect_seq and retry: this is not a connection race + // we need to resolve here. + if (existing->get_state() == state_t::open || + existing->get_state() == state_t::standby) { + if (policy.resetcheck && existing->connect_seq() == 0) { + return replace_existing(existing, std::move(authorizer_reply)); + } else { + h.reply.connect_seq = existing->connect_seq() + 1; + return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); + } + } else if (get_peer_addr() < get_my_addr() || + existing->is_server_side()) { + // incoming wins + return replace_existing(existing, std::move(authorizer_reply)); + } else { + return send_connect_reply(CEPH_MSGR_TAG_WAIT); + } + } else if (policy.resetcheck && + existing->connect_seq() == 0) { + return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION); + } else { + return replace_existing(existing, std::move(authorizer_reply)); + } } -seastar::future<> SocketConnection::handle_connect_reply() +seastar::future<> SocketConnection::replace_existing(ConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer) { - if (h.reply.tag != CEPH_MSGR_TAG_READY) { + msgr_tag_t reply_tag; + if ((h.connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) { + reply_tag = CEPH_MSGR_TAG_SEQ; + } else { + reply_tag = CEPH_MSGR_TAG_READY; + } + get_messenger()->unregister_conn(existing); + if (!existing->is_lossy()) { + // reset the in_seq if this is a hard reset from peer, + // otherwise we respect our original connection's value + in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num(); + // steal outgoing queue and out_seq + existing->requeue_sent(); + std::tie(out_seq, out_q) = existing->get_out_queue(); + } + return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); +} + +seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag) +{ + switch (tag) { + case CEPH_MSGR_TAG_FEATURES: + return fault(); + case CEPH_MSGR_TAG_BADPROTOVER: + return fault(); + case CEPH_MSGR_TAG_BADAUTHORIZER: + if (h.got_bad_auth) { + throw std::system_error(make_error_code(error::negotiation_failure)); + } + h.got_bad_auth = true; + // try harder + return get_messenger()->get_authorizer(h.peer_type, true) + .then([this](auto&& auth) { + h.authorizer = std::move(auth); + return seastar::now(); + }); + case CEPH_MSGR_TAG_RESETSESSION: + reset_session(); + return seastar::now(); + case CEPH_MSGR_TAG_RETRY_GLOBAL: + h.global_seq = get_messenger()->get_global_seq(h.reply.global_seq); + return seastar::now(); + case CEPH_MSGR_TAG_RETRY_SESSION: + assert(h.reply.connect_seq > h.connect_seq); + h.connect_seq = h.reply.connect_seq; + return seastar::now(); + case CEPH_MSGR_TAG_WAIT: + return fault(); + case CEPH_MSGR_TAG_SEQ: + break; + case CEPH_MSGR_TAG_READY: + break; + } + if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features); + missing) { + return fault(); + } + if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { + return in.read_exactly(sizeof(seq_num_t)) + .then([this] (auto buf) { + auto acked_seq = reinterpret_cast(buf.get()); + discard_up_to(&out_q, *acked_seq); + }).then([this] { + return out.write(reinterpret_cast(&in_seq), sizeof(in_seq)); + }).then([this] { + return out.flush(); + }).then([this] { + return handle_connect_reply(CEPH_MSGR_TAG_READY); + }); + } + if (h.reply.tag == CEPH_MSGR_TAG_READY) { + // hooray! + h.peer_global_seq = h.reply.global_seq; + policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY; + state = state_t::open; + h.connect_seq++; + h.backoff = 0ms; + set_features(h.reply.features & h.connect.features); + if (h.authorizer) { + session_security.reset( + get_auth_session_handler(nullptr, + h.authorizer->protocol, + h.authorizer->session_key, + features)); + } + h.authorizer.reset(); + return seastar::now(); + } else { + // unknown tag throw std::system_error(make_error_code(error::negotiation_failure)); } - return seastar::now(); } -seastar::future<> SocketConnection::client_handshake() +void SocketConnection::reset_session() +{ + decltype(out_q){}.swap(out_q); + decltype(sent){}.swap(sent); + in_seq = 0; + h.connect_seq = 0; + if (has_feature(CEPH_FEATURE_MSG_AUTH)) { + // Set out_seq to a random value, so CRC won't be predictable. + // Constant to limit starting sequence number to 2^31. Nothing special + // about it, just a big number. + constexpr uint64_t SEQ_MASK = 0x7fffffff; + out_seq = ceph::util::generate_random_number(0, SEQ_MASK); + } else { + // previously, seq #'s always started at 0. + out_seq = 0; + } +} + +seastar::future<> SocketConnection::connect(entity_type_t peer_type, + entity_type_t host_type) +{ + // encode ceph_msg_connect + h.peer_type = peer_type; + memset(&h.connect, 0, sizeof(h.connect)); + h.connect.features = policy.features_supported; + h.connect.host_type = host_type; + h.connect.global_seq = h.global_seq; + h.connect.connect_seq = h.connect_seq; + h.connect.protocol_version = get_proto_version(peer_type, true); + // this is fyi, actually, server decides! + h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0; + + return get_messenger()->get_authorizer(peer_type, false) + .then([this](auto&& auth) { + h.authorizer = std::move(auth); + bufferlist bl; + if (h.authorizer) { + h.connect.authorizer_protocol = h.authorizer->protocol; + h.connect.authorizer_len = h.authorizer->bl.length(); + bl.append(create_static(h.connect)); + bl.append(h.authorizer->bl); + } else { + h.connect.authorizer_protocol = 0; + h.connect.authorizer_len = 0; + bl.append(create_static(h.connect)); + }; + return bl; + }).then([this](bufferlist&& bl) { + return out.write(std::move(bl)); + }).then([this] { + return out.flush(); + }).then([this] { + // read the reply + return read(sizeof(h.reply)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(h.reply, p); + assert(p.end()); + return read(h.reply.authorizer_len); + }).then([this] (bufferlist bl) { + if (h.authorizer) { + auto reply = bl.cbegin(); + if (!h.authorizer->verify_reply(reply)) { + throw std::system_error(make_error_code(error::negotiation_failure)); + } + } + return handle_connect_reply(h.reply.tag); + }); +} + +seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type, + entity_type_t host_type) { // read server's handshake header return read(server_header_size) @@ -344,24 +770,11 @@ seastar::future<> SocketConnection::client_handshake() bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); ::encode(my_addr, bl, 0); - - // encode ceph_msg_connect - memset(&h.connect, 0, sizeof(h.connect)); - h.connect.protocol_version = CEPH_OSDC_PROTOCOL; - bl.append(create_static(h.connect)); - - // TODO: append authorizer - return out.write(std::move(bl)) - .then([this] { return out.flush(); }); - }).then([this] { - // read the reply - return read(sizeof(h.reply)); - }).then([this] (bufferlist bl) { - auto p = bl.begin(); - ::decode(h.reply, p); - // TODO: read authorizer - assert(p.end()); - return handle_connect_reply(); + return out.write(std::move(bl)).then([this] { return out.flush(); }); + }).then([=] { + }).then([=] { + return seastar::do_until([=] { return state == state_t::open; }, + [=] { return connect(peer_type, host_type); }); }).then([this] { // start background processing of tags read_tags_until_next_message(); @@ -382,17 +795,19 @@ seastar::future<> SocketConnection::server_handshake() .then([this] { return out.flush(); }) .then([this] { // read client's handshake header and connect request - return read(client_header_size + sizeof(h.connect)); + return read(client_header_size); }).then([this] (bufferlist bl) { auto p = bl.cbegin(); validate_banner(p); entity_addr_t addr; ::decode(addr, p); - ::decode(h.connect, p); assert(p.end()); - // TODO: read authorizer - - return handle_connect(); + if (!addr.is_blank_ip()) { + peer_addr = addr; + } + }).then([this] { + return seastar::do_until([this] { return state == state_t::open; }, + [this] { return handle_connect(); }); }).then([this] { // start background processing of tags read_tags_until_next_message(); @@ -401,3 +816,16 @@ seastar::future<> SocketConnection::server_handshake() fut.forward_to(std::move(h.promise)); }); } + +seastar::future<> SocketConnection::fault() +{ + if (h.backoff.count()) { + h.backoff += h.backoff; + } else { + h.backoff = conf.ms_initial_backoff; + } + if (h.backoff > conf.ms_max_backoff) { + h.backoff = conf.ms_max_backoff; + } + return seastar::sleep(h.backoff); +} diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index bf86803fe185f..30a54dc5810eb 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -16,16 +16,20 @@ #include +#include "msg/Policy.h" #include "Connection.h" -namespace ceph { -namespace net { +class AuthSessionHandler; + +namespace ceph::net { class SocketConnection : public Connection { seastar::connected_socket socket; seastar::input_stream in; seastar::output_stream out; + state_t state = state_t::none; + /// buffer state for read() struct Reader { bufferlist buffer; @@ -39,13 +43,40 @@ class SocketConnection : public Connection { struct Handshake { ceph_msg_connect connect; ceph_msg_connect_reply reply; + bool got_bad_auth = false; + std::unique_ptr authorizer; + peer_type_t peer_type; + std::chrono::milliseconds backoff; + uint32_t connect_seq = 0; + uint32_t peer_global_seq = 0; + uint32_t global_seq; seastar::promise<> promise; } h; /// server side of handshake negotiation seastar::future<> handle_connect(); + seastar::future<> handle_connect_with_existing(ConnectionRef existing, + bufferlist&& authorizer_reply); + seastar::future<> replace_existing(ConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer = false); + seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag, + bufferlist&& authorizer_reply = {}); + seastar::future<> send_connect_reply_ready(ceph::net::msgr_tag_t tag, + bufferlist&& authorizer_reply); + + seastar::future<> handle_keepalive2(); + seastar::future<> handle_keepalive2_ack(); + + bool require_auth_feature() const; + int get_peer_type() const { + return h.connect.host_type; + } + uint32_t get_proto_version(entity_type_t peer_type, bool connec) const; /// client side of handshake negotiation - seastar::future<> handle_connect_reply(); + seastar::future<> connect(entity_type_t peer_type, entity_type_t host_type); + seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag); + void reset_session(); /// state for an incoming message struct MessageReader { @@ -61,6 +92,7 @@ class SocketConnection : public Connection { seastar::promise<> on_message; void read_tags_until_next_message(); + seastar::future handle_ack(); /// becomes available when handshake completes, and when all previous messages /// have been sent to the output stream. send() chains new messages as @@ -70,6 +102,39 @@ class SocketConnection : public Connection { /// encode/write a message seastar::future<> write_message(MessageRef msg); + ceph::net::Policy policy; + uint64_t features; + void set_features(uint64_t new_features) { + features = new_features; + } + bool has_feature(uint64_t feature) const { + return features & feature; + } + + /// the seq num of the last transmitted message + seq_num_t out_seq = 0; + /// the seq num of the last received message + seq_num_t in_seq = 0; + /// update the seq num of last received message + /// @returns true if the @c seq is valid, and @c in_seq is updated, + /// false otherwise. + bool update_rx_seq(seq_num_t seq); + + std::unique_ptr session_security; + + // messages to be resent after connection gets reset + std::queue out_q; + // messages sent, but not yet acked by peer + std::queue sent; + static void discard_up_to(std::queue*, seq_num_t); + + struct Keepalive { + ceph_timespec reply_stamp; + ceph_timespec ack_stamp; + } k; + + seastar::future<> fault(); + public: SocketConnection(Messenger *messenger, const entity_addr_t& my_addr, @@ -79,7 +144,8 @@ class SocketConnection : public Connection { bool is_connected() override; - seastar::future<> client_handshake() override; + seastar::future<> client_handshake(entity_type_t peer_type, + entity_type_t host_type) override; seastar::future<> server_handshake() override; @@ -88,7 +154,32 @@ class SocketConnection : public Connection { seastar::future<> send(MessageRef msg) override; seastar::future<> close() override; + + uint32_t connect_seq() const override { + return h.connect_seq; + } + uint32_t peer_global_seq() const override { + return h.peer_global_seq; + } + seq_num_t rx_seq_num() const { + return in_seq; + } + state_t get_state() const override { + return state; + } + bool is_server_side() const override { + return policy.server; + } + bool is_lossy() const override { + return policy.lossy; + } + +private: + void requeue_sent() override; + std::tuple> get_out_queue() override { + return {out_seq, std::move(out_q)}; + } + }; -} // namespace net -} // namespace ceph +} // namespace ceph::net diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 62e0cb3cdd7f9..ebfc715191f91 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -12,6 +12,7 @@ * */ +#include "auth/Auth.h" #include "SocketMessenger.h" #include "SocketConnection.h" #include "Dispatcher.h" @@ -113,8 +114,9 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) return seastar::now(); } -seastar::future SocketMessenger::connect(const entity_addr_t& addr, - const entity_addr_t& myaddr) +seastar::future +SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type, + const entity_addr_t& myaddr, entity_type_t host_type) { if (auto found = std::find_if(connections.begin(), connections.end(), @@ -129,11 +131,12 @@ seastar::future SocketMessenger::connect(const entity_ ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr, std::move(socket)); // complete the handshake before returning to the caller - return conn->client_handshake() + return conn->client_handshake(peer_type, host_type) .handle_exception([conn] (std::exception_ptr eptr) { // close the connection before returning errors return seastar::make_exception_future<>(eptr) .finally([conn] { return conn->close(); }); + // TODO: retry on fault }).then([=] { dispatcher->ms_handle_connect(conn); // dispatch replies on this connection @@ -154,3 +157,27 @@ seastar::future<> SocketMessenger::shutdown() return conn->close(); }).finally([this] { connections.clear(); }); } + +seastar::future +SocketMessenger::verify_authorizer(peer_type_t peer_type, + auth_proto_t protocol, + bufferlist& auth) +{ + if (dispatcher) { + return dispatcher->ms_verify_authorizer(peer_type, protocol, auth); + } else { + return seastar::make_ready_future( + CEPH_MSGR_TAG_BADAUTHORIZER, + bufferlist{}); + } +} + +seastar::future> +SocketMessenger::get_authorizer(peer_type_t peer_type, bool force_new) +{ + if (dispatcher) { + return dispatcher->ms_get_authorizer(peer_type, force_new); + } else { + return seastar::make_ready_future>(nullptr); + } +} diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 5cca096a65908..34bf375f66220 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -22,9 +22,11 @@ namespace ceph::net { -class SocketMessenger : public Messenger { +class SocketMessenger final : public Messenger { boost::optional listener; Dispatcher *dispatcher = nullptr; + uint32_t global_seq = 0; + std::list connections; seastar::future<> dispatch(ConnectionRef conn); @@ -40,9 +42,18 @@ class SocketMessenger : public Messenger { seastar::future<> start(Dispatcher *dispatcher) override; seastar::future connect(const entity_addr_t& addr, - const entity_addr_t& myaddr) override; + entity_type_t peer_type, + const entity_addr_t& myaddr, + entity_type_t host_type) override; seastar::future<> shutdown() override; + seastar::future + verify_authorizer(peer_type_t peer_type, + auth_proto_t protocol, + bufferlist& auth) override; + seastar::future> + get_authorizer(peer_type_t peer_type, + bool force_new) override; }; } // namespace ceph::net diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index 24ce135a9dfd8..691db771fec7b 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -15,7 +15,8 @@ target_link_libraries(unittest_seastar_denc ceph-common global Seastar::seastar) set(test_messenger_srcs test_messenger.cc $ - $) + $ + $) add_executable(unittest_seastar_messenger ${test_messenger_srcs}) add_ceph_unittest(unittest_seastar_messenger) target_link_libraries(unittest_seastar_messenger ceph-common Seastar::seastar) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 70b6b29220110..13e2e1ffdb55f 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -46,7 +46,7 @@ static seastar::future<> test_echo() .then([&] { return t.client.messenger.start(&t.client.dispatcher) .then([&] { - return t.client.messenger.connect(t.addr); + return t.client.messenger.connect(t.addr, entity_name_t::TYPE_OSD); }).then([] (ceph::net::ConnectionRef conn) { std::cout << "client connected" << std::endl; return conn->send(MessageRef{new MPing(), false}); -- 2.39.5