#include <algorithm>
#include <core/shared_future.hh>
+#include <core/sleep.hh>
+#include "Config.h"
+#include "Messenger.h"
#include "SocketConnection.h"
#include "include/msgr.h"
+#include "include/random.h"
+#include "auth/Auth.h"
+#include "auth/AuthSessionHandler.h"
#include "msg/Message.h"
using namespace ceph::net;
seastar::future<bufferlist> SocketConnection::read(size_t bytes)
{
+ if (bytes == 0) {
+ return seastar::make_ready_future<bufferlist>();
+ }
r.buffer.clear();
r.remaining = bytes;
return in.consume(bufferlist_consumer{r.buffer, r.remaining})
// stop looping and notify read_header()
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
-
case CEPH_MSGR_TAG_ACK:
- return in.read_exactly(sizeof(ceph_le64))
- .then([] (auto buf) {
- auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
- std::cout << "ack " << *seq << std::endl;
- return seastar::stop_iteration::no;
- });
-
+ return handle_ack();
case CEPH_MSGR_TAG_KEEPALIVE:
break;
-
case CEPH_MSGR_TAG_KEEPALIVE2:
- return in.read_exactly(sizeof(ceph_timespec))
- .then([] (auto buf) {
- auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
- std::cout << "keepalive2 " << t->tv_sec << std::endl;
- // TODO: schedule ack
- return seastar::stop_iteration::no;
- });
-
+ return handle_keepalive2()
+ .then([this] { return seastar::stop_iteration::no; });
case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
- return in.read_exactly(sizeof(ceph_timespec))
- .then([] (auto buf) {
- auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
- std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
- return seastar::stop_iteration::no;
- });
-
+ return handle_keepalive2_ack()
+ .then([this] { return seastar::stop_iteration::no; });
case CEPH_MSGR_TAG_CLOSE:
std::cout << "close" << std::endl;
break;
});
}
+seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
+{
+ return in.read_exactly(sizeof(ceph_le64))
+ .then([this] (auto buf) {
+ auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
+ discard_up_to(&sent, *seq);
+ return seastar::stop_iteration::no;
+ });
+}
+
+void SocketConnection::discard_up_to(std::queue<MessageRef>* queue,
+ seq_num_t seq)
+{
+ while (!queue->empty() &&
+ queue->front()->get_seq() < seq) {
+ queue->pop();
+ }
+}
+
+void SocketConnection::requeue_sent()
+{
+ out_seq -= sent.size();
+ while (!sent.empty()) {
+ auto m = sent.front();
+ sent.pop();
+ out_q.push(std::move(m));
+ }
+}
+
seastar::future<MessageRef> SocketConnection::read_message()
{
return on_message.get_future()
m.front, m.middle, m.data, nullptr);
constexpr bool add_ref = false; // Message starts with 1 ref
return MessageRef{msg, add_ref};
+ }).then([this] (MessageRef msg) {
+ if (msg) {
+ // TODO: set time stamps
+ msg->set_byte_throttler(policy.throttler_bytes);
+ if (!update_rx_seq(msg->get_seq())) {
+ msg.reset();
+ }
+ }
+ return msg;
});
}
+bool SocketConnection::update_rx_seq(seq_num_t seq)
+{
+ if (seq <= in_seq) {
+ if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
+ conf.ms_die_on_old_message) {
+ assert(0 == "old msgs despite reconnect_seq feature");
+ }
+ return false;
+ } else if (seq > in_seq + 1) {
+ if (conf.ms_die_on_skipped_message) {
+ assert(0 == "skipped incoming seq");
+ }
+ return false;
+ } else {
+ in_seq = seq;
+ return true;
+ }
+}
+
seastar::future<> SocketConnection::write_message(MessageRef msg)
{
+ msg->set_seq(++out_seq);
+ msg->encode(features, get_messenger()->get_crc_flags());
bufferlist bl;
- unsigned char tag = CEPH_MSGR_TAG_MSG;
- encode(tag, bl);
- encode_message(msg.get(), 0, bl);
+ bl.append(CEPH_MSGR_TAG_MSG);
+ auto& header = msg->get_header();
+ bl.append((const char*)&header, sizeof(header));
+ bl.append(msg->get_payload());
+ bl.append(msg->get_middle());
+ bl.append(msg->get_data());
+ auto& footer = msg->get_footer();
+ if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
+ bl.append((const char*)&footer, sizeof(footer));
+ } else {
+ ceph_msg_footer_old old_footer;
+ if (get_messenger()->get_crc_flags() & MSG_CRC_HEADER) {
+ old_footer.front_crc = footer.front_crc;
+ old_footer.middle_crc = footer.middle_crc;
+ } else {
+ old_footer.front_crc = old_footer.middle_crc = 0;
+ }
+ if (get_messenger()->get_crc_flags() & MSG_CRC_DATA) {
+ old_footer.data_crc = footer.data_crc;
+ } else {
+ old_footer.data_crc = 0;
+ }
+ old_footer.flags = footer.flags;
+ bl.append((const char*)&old_footer, sizeof(old_footer));
+ }
// write as a seastar::net::packet
return out.write(std::move(bl))
- .then([this] { return out.flush(); });
+ .then([this] { return out.flush(); })
+ .then([this, msg = std::move(msg)] {
+ if (!policy.lossy) {
+ sent.push(std::move(msg));
+ }
+ });
}
seastar::future<> SocketConnection::send(MessageRef msg)
return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
}
+bool SocketConnection::require_auth_feature() const
+{
+ if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
+ return false;
+ }
+ if (conf.cephx_require_signatures) {
+ return true;
+ }
+ if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
+ h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
+ return conf.cephx_cluster_require_signatures;
+ } else {
+ return conf.cephx_service_require_signatures;
+ }
+}
+
+uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const
+{
+ constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
+ // see also OSD.h, unlike other connection of simple/async messenger,
+ // crimson msgr is only used by osd
+ constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
+ if (peer_type == my_type) {
+ // internal
+ return CEPH_OSD_PROTOCOL;
+ } else {
+ // public
+ switch (connect ? peer_type : my_type) {
+ case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
+ case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
+ case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
+ default: return 0;
+ }
+ }
+}
+
seastar::future<> SocketConnection::handle_connect()
{
- memset(&h.reply, 0, sizeof(h.reply));
+ return read(sizeof(h.connect))
+ .then([this](bufferlist bl) {
+ auto p = bl.cbegin();
+ ::decode(h.connect, p);
+ return read(h.connect.authorizer_len);
+ }).then([this] (bufferlist authorizer) {
+ if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
+ return seastar::make_ready_future<msgr_tag_t, bufferlist>(
+ CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
+ }
+ if (require_auth_feature()) {
+ policy.features_required |= CEPH_FEATURE_MSG_AUTH;
+ }
+ if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features;
+ feat_missing != 0) {
+ return seastar::make_ready_future<msgr_tag_t, bufferlist>(
+ CEPH_MSGR_TAG_FEATURES, bufferlist{});
+ }
+ return get_messenger()->verify_authorizer(get_peer_type(),
+ h.connect.authorizer_protocol,
+ authorizer);
+ }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
+ if (tag) {
+ return send_connect_reply(tag, std::move(authorizer_reply));
+ }
+ if (auto existing = get_messenger()->lookup_conn(peer_addr); existing) {
+ return handle_connect_with_existing(existing, std::move(authorizer_reply));
+ } else if (h.connect.connect_seq > 0) {
+ return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
+ std::move(authorizer_reply));
+ }
+ h.connect_seq = h.connect.connect_seq + 1;
+ h.peer_global_seq = h.connect.global_seq;
+ set_features((uint64_t)h.reply.features & (uint64_t)h.connect.features);
+ // TODO: cct
+ return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
+ });
+}
- h.reply.protocol_version = CEPH_OSDC_PROTOCOL;
- h.reply.tag = CEPH_MSGR_TAG_READY;
+seastar::future<>
+SocketConnection::send_connect_reply(msgr_tag_t tag,
+ bufferlist&& authorizer_reply)
+{
+ h.reply.tag = tag;
+ h.reply.features = static_cast<uint64_t>((h.connect.features &
+ policy.features_supported) |
+ policy.features_required);
+ h.reply.authorizer_len = authorizer_reply.length();
+ return out.write(reinterpret_cast<const char*>(&h.reply), sizeof(h.reply))
+ .then([this, reply=std::move(authorizer_reply)]() mutable {
+ out.write(std::move(reply));
+ }).then([this] {
+ return out.flush();
+ });
+}
- bufferlist bl;
- bl.append(create_static(h.reply));
+seastar::future<>
+SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
+ bufferlist&& authorizer_reply)
+{
+ h.reply.tag = tag;
+ h.reply.features = policy.features_supported;
+ h.reply.global_seq = get_messenger()->get_global_seq();
+ h.reply.connect_seq = h.connect_seq;
+ h.reply.flags = 0;
+ if (policy.lossy) {
+ h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
+ }
+ h.reply.authorizer_len = authorizer_reply.length();
+ return out.write(reinterpret_cast<const char*>(&h.reply), sizeof(h.reply))
+ .then([this, reply=std::move(authorizer_reply)]() mutable {
+ if (reply.length()) {
+ return out.write(std::move(reply));
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
+ return out.write(reinterpret_cast<const char*>(&in_seq),
+ sizeof(in_seq)).then([this] {
+ return out.flush();
+ }).then([this] {
+ return in.read_exactly(sizeof(seq_num_t));
+ }).then([this] (auto buf) {
+ auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+ discard_up_to(&out_q, *acked_seq);
+ });
+ } else {
+ return out.flush();
+ }
+ }).then([this] {
+ state = state_t::open;
+ });
+}
- return out.write(std::move(bl))
- .then([this] { return out.flush(); });
+seastar::future<>
+SocketConnection::handle_keepalive2()
+{
+ return in.read_exactly(sizeof(ceph_timespec))
+ .then([this] (auto buf) {
+ auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+ k.reply_stamp = *t;
+ std::cout << "keepalive2 " << t->tv_sec << std::endl;
+ char tag = CEPH_MSGR_TAG_KEEPALIVE2_ACK;
+ return out.write(reinterpret_cast<const char*>(&tag), sizeof(tag));
+ }).then([this] {
+ out.write(reinterpret_cast<const char*>(&k.reply_stamp),
+ sizeof(k.reply_stamp));
+ }).then([this] {
+ return out.flush();
+ });
+}
+
+seastar::future<>
+SocketConnection::handle_keepalive2_ack()
+{
+ return in.read_exactly(sizeof(ceph_timespec))
+ .then([this] (auto buf) {
+ auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+ k.ack_stamp = *t;
+ std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
+ });
+}
+
+seastar::future<>
+SocketConnection::handle_connect_with_existing(ConnectionRef existing, bufferlist&& authorizer_reply)
+{
+ if (h.connect.global_seq < existing->peer_global_seq()) {
+ h.reply.global_seq = existing->peer_global_seq();
+ return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
+ } else if (existing->is_lossy()) {
+ return replace_existing(existing, std::move(authorizer_reply));
+ } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) {
+ return replace_existing(existing, std::move(authorizer_reply), true);
+ } else if (h.connect.connect_seq < existing->connect_seq()) {
+ // old attempt, or we sent READY but they didn't get it.
+ h.reply.connect_seq = existing->connect_seq() + 1;
+ return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
+ } else if (h.connect.connect_seq == existing->connect_seq()) {
+ // if the existing connection successfully opened, and/or
+ // subsequently went to standby, then the peer should bump
+ // their connect_seq and retry: this is not a connection race
+ // we need to resolve here.
+ if (existing->get_state() == state_t::open ||
+ existing->get_state() == state_t::standby) {
+ if (policy.resetcheck && existing->connect_seq() == 0) {
+ return replace_existing(existing, std::move(authorizer_reply));
+ } else {
+ h.reply.connect_seq = existing->connect_seq() + 1;
+ return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
+ }
+ } else if (get_peer_addr() < get_my_addr() ||
+ existing->is_server_side()) {
+ // incoming wins
+ return replace_existing(existing, std::move(authorizer_reply));
+ } else {
+ return send_connect_reply(CEPH_MSGR_TAG_WAIT);
+ }
+ } else if (policy.resetcheck &&
+ existing->connect_seq() == 0) {
+ return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
+ } else {
+ return replace_existing(existing, std::move(authorizer_reply));
+ }
}
-seastar::future<> SocketConnection::handle_connect_reply()
+seastar::future<> SocketConnection::replace_existing(ConnectionRef existing,
+ bufferlist&& authorizer_reply,
+ bool is_reset_from_peer)
{
- if (h.reply.tag != CEPH_MSGR_TAG_READY) {
+ msgr_tag_t reply_tag;
+ if ((h.connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
+ reply_tag = CEPH_MSGR_TAG_SEQ;
+ } else {
+ reply_tag = CEPH_MSGR_TAG_READY;
+ }
+ get_messenger()->unregister_conn(existing);
+ if (!existing->is_lossy()) {
+ // reset the in_seq if this is a hard reset from peer,
+ // otherwise we respect our original connection's value
+ in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
+ // steal outgoing queue and out_seq
+ existing->requeue_sent();
+ std::tie(out_seq, out_q) = existing->get_out_queue();
+ }
+ return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
+}
+
+seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
+{
+ switch (tag) {
+ case CEPH_MSGR_TAG_FEATURES:
+ return fault();
+ case CEPH_MSGR_TAG_BADPROTOVER:
+ return fault();
+ case CEPH_MSGR_TAG_BADAUTHORIZER:
+ if (h.got_bad_auth) {
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ }
+ h.got_bad_auth = true;
+ // try harder
+ return get_messenger()->get_authorizer(h.peer_type, true)
+ .then([this](auto&& auth) {
+ h.authorizer = std::move(auth);
+ return seastar::now();
+ });
+ case CEPH_MSGR_TAG_RESETSESSION:
+ reset_session();
+ return seastar::now();
+ case CEPH_MSGR_TAG_RETRY_GLOBAL:
+ h.global_seq = get_messenger()->get_global_seq(h.reply.global_seq);
+ return seastar::now();
+ case CEPH_MSGR_TAG_RETRY_SESSION:
+ assert(h.reply.connect_seq > h.connect_seq);
+ h.connect_seq = h.reply.connect_seq;
+ return seastar::now();
+ case CEPH_MSGR_TAG_WAIT:
+ return fault();
+ case CEPH_MSGR_TAG_SEQ:
+ break;
+ case CEPH_MSGR_TAG_READY:
+ break;
+ }
+ if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
+ missing) {
+ return fault();
+ }
+ if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
+ return in.read_exactly(sizeof(seq_num_t))
+ .then([this] (auto buf) {
+ auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+ discard_up_to(&out_q, *acked_seq);
+ }).then([this] {
+ return out.write(reinterpret_cast<const char*>(&in_seq), sizeof(in_seq));
+ }).then([this] {
+ return out.flush();
+ }).then([this] {
+ return handle_connect_reply(CEPH_MSGR_TAG_READY);
+ });
+ }
+ if (h.reply.tag == CEPH_MSGR_TAG_READY) {
+ // hooray!
+ h.peer_global_seq = h.reply.global_seq;
+ policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
+ state = state_t::open;
+ h.connect_seq++;
+ h.backoff = 0ms;
+ set_features(h.reply.features & h.connect.features);
+ if (h.authorizer) {
+ session_security.reset(
+ get_auth_session_handler(nullptr,
+ h.authorizer->protocol,
+ h.authorizer->session_key,
+ features));
+ }
+ h.authorizer.reset();
+ return seastar::now();
+ } else {
+ // unknown tag
throw std::system_error(make_error_code(error::negotiation_failure));
}
- return seastar::now();
}
-seastar::future<> SocketConnection::client_handshake()
+void SocketConnection::reset_session()
+{
+ decltype(out_q){}.swap(out_q);
+ decltype(sent){}.swap(sent);
+ in_seq = 0;
+ h.connect_seq = 0;
+ if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
+ // Set out_seq to a random value, so CRC won't be predictable.
+ // Constant to limit starting sequence number to 2^31. Nothing special
+ // about it, just a big number.
+ constexpr uint64_t SEQ_MASK = 0x7fffffff;
+ out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
+ } else {
+ // previously, seq #'s always started at 0.
+ out_seq = 0;
+ }
+}
+
+seastar::future<> SocketConnection::connect(entity_type_t peer_type,
+ entity_type_t host_type)
+{
+ // encode ceph_msg_connect
+ h.peer_type = peer_type;
+ memset(&h.connect, 0, sizeof(h.connect));
+ h.connect.features = policy.features_supported;
+ h.connect.host_type = host_type;
+ h.connect.global_seq = h.global_seq;
+ h.connect.connect_seq = h.connect_seq;
+ h.connect.protocol_version = get_proto_version(peer_type, true);
+ // this is fyi, actually, server decides!
+ h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
+
+ return get_messenger()->get_authorizer(peer_type, false)
+ .then([this](auto&& auth) {
+ h.authorizer = std::move(auth);
+ bufferlist bl;
+ if (h.authorizer) {
+ h.connect.authorizer_protocol = h.authorizer->protocol;
+ h.connect.authorizer_len = h.authorizer->bl.length();
+ bl.append(create_static(h.connect));
+ bl.append(h.authorizer->bl);
+ } else {
+ h.connect.authorizer_protocol = 0;
+ h.connect.authorizer_len = 0;
+ bl.append(create_static(h.connect));
+ };
+ return bl;
+ }).then([this](bufferlist&& bl) {
+ return out.write(std::move(bl));
+ }).then([this] {
+ return out.flush();
+ }).then([this] {
+ // read the reply
+ return read(sizeof(h.reply));
+ }).then([this] (bufferlist bl) {
+ auto p = bl.cbegin();
+ ::decode(h.reply, p);
+ assert(p.end());
+ return read(h.reply.authorizer_len);
+ }).then([this] (bufferlist bl) {
+ if (h.authorizer) {
+ auto reply = bl.cbegin();
+ if (!h.authorizer->verify_reply(reply)) {
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ }
+ }
+ return handle_connect_reply(h.reply.tag);
+ });
+}
+
+seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type,
+ entity_type_t host_type)
{
// read server's handshake header
return read(server_header_size)
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
-
- // encode ceph_msg_connect
- memset(&h.connect, 0, sizeof(h.connect));
- h.connect.protocol_version = CEPH_OSDC_PROTOCOL;
- bl.append(create_static(h.connect));
-
- // TODO: append authorizer
- return out.write(std::move(bl))
- .then([this] { return out.flush(); });
- }).then([this] {
- // read the reply
- return read(sizeof(h.reply));
- }).then([this] (bufferlist bl) {
- auto p = bl.begin();
- ::decode(h.reply, p);
- // TODO: read authorizer
- assert(p.end());
- return handle_connect_reply();
+ return out.write(std::move(bl)).then([this] { return out.flush(); });
+ }).then([=] {
+ }).then([=] {
+ return seastar::do_until([=] { return state == state_t::open; },
+ [=] { return connect(peer_type, host_type); });
}).then([this] {
// start background processing of tags
read_tags_until_next_message();
.then([this] { return out.flush(); })
.then([this] {
// read client's handshake header and connect request
- return read(client_header_size + sizeof(h.connect));
+ return read(client_header_size);
}).then([this] (bufferlist bl) {
auto p = bl.cbegin();
validate_banner(p);
entity_addr_t addr;
::decode(addr, p);
- ::decode(h.connect, p);
assert(p.end());
- // TODO: read authorizer
-
- return handle_connect();
+ if (!addr.is_blank_ip()) {
+ peer_addr = addr;
+ }
+ }).then([this] {
+ return seastar::do_until([this] { return state == state_t::open; },
+ [this] { return handle_connect(); });
}).then([this] {
// start background processing of tags
read_tags_until_next_message();
fut.forward_to(std::move(h.promise));
});
}
+
+seastar::future<> SocketConnection::fault()
+{
+ if (h.backoff.count()) {
+ h.backoff += h.backoff;
+ } else {
+ h.backoff = conf.ms_initial_backoff;
+ }
+ if (h.backoff > conf.ms_max_backoff) {
+ h.backoff = conf.ms_max_backoff;
+ }
+ return seastar::sleep(h.backoff);
+}
#include <core/reactor.hh>
+#include "msg/Policy.h"
#include "Connection.h"
-namespace ceph {
-namespace net {
+class AuthSessionHandler;
+
+namespace ceph::net {
class SocketConnection : public Connection {
seastar::connected_socket socket;
seastar::input_stream<char> in;
seastar::output_stream<char> out;
+ state_t state = state_t::none;
+
/// buffer state for read()
struct Reader {
bufferlist buffer;
struct Handshake {
ceph_msg_connect connect;
ceph_msg_connect_reply reply;
+ bool got_bad_auth = false;
+ std::unique_ptr<AuthAuthorizer> authorizer;
+ peer_type_t peer_type;
+ std::chrono::milliseconds backoff;
+ uint32_t connect_seq = 0;
+ uint32_t peer_global_seq = 0;
+ uint32_t global_seq;
seastar::promise<> promise;
} h;
/// server side of handshake negotiation
seastar::future<> handle_connect();
+ seastar::future<> handle_connect_with_existing(ConnectionRef existing,
+ bufferlist&& authorizer_reply);
+ seastar::future<> replace_existing(ConnectionRef existing,
+ bufferlist&& authorizer_reply,
+ bool is_reset_from_peer = false);
+ seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag,
+ bufferlist&& authorizer_reply = {});
+ seastar::future<> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
+ bufferlist&& authorizer_reply);
+
+ seastar::future<> handle_keepalive2();
+ seastar::future<> handle_keepalive2_ack();
+
+ bool require_auth_feature() const;
+ int get_peer_type() const {
+ return h.connect.host_type;
+ }
+ uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
/// client side of handshake negotiation
- seastar::future<> handle_connect_reply();
+ seastar::future<> connect(entity_type_t peer_type, entity_type_t host_type);
+ seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag);
+ void reset_session();
/// state for an incoming message
struct MessageReader {
seastar::promise<> on_message;
void read_tags_until_next_message();
+ seastar::future<seastar::stop_iteration> handle_ack();
/// becomes available when handshake completes, and when all previous messages
/// have been sent to the output stream. send() chains new messages as
/// encode/write a message
seastar::future<> write_message(MessageRef msg);
+ ceph::net::Policy policy;
+ uint64_t features;
+ void set_features(uint64_t new_features) {
+ features = new_features;
+ }
+ bool has_feature(uint64_t feature) const {
+ return features & feature;
+ }
+
+ /// the seq num of the last transmitted message
+ seq_num_t out_seq = 0;
+ /// the seq num of the last received message
+ seq_num_t in_seq = 0;
+ /// update the seq num of last received message
+ /// @returns true if the @c seq is valid, and @c in_seq is updated,
+ /// false otherwise.
+ bool update_rx_seq(seq_num_t seq);
+
+ std::unique_ptr<AuthSessionHandler> session_security;
+
+ // messages to be resent after connection gets reset
+ std::queue<MessageRef> out_q;
+ // messages sent, but not yet acked by peer
+ std::queue<MessageRef> sent;
+ static void discard_up_to(std::queue<MessageRef>*, seq_num_t);
+
+ struct Keepalive {
+ ceph_timespec reply_stamp;
+ ceph_timespec ack_stamp;
+ } k;
+
+ seastar::future<> fault();
+
public:
SocketConnection(Messenger *messenger,
const entity_addr_t& my_addr,
bool is_connected() override;
- seastar::future<> client_handshake() override;
+ seastar::future<> client_handshake(entity_type_t peer_type,
+ entity_type_t host_type) override;
seastar::future<> server_handshake() override;
seastar::future<> send(MessageRef msg) override;
seastar::future<> close() override;
+
+ uint32_t connect_seq() const override {
+ return h.connect_seq;
+ }
+ uint32_t peer_global_seq() const override {
+ return h.peer_global_seq;
+ }
+ seq_num_t rx_seq_num() const {
+ return in_seq;
+ }
+ state_t get_state() const override {
+ return state;
+ }
+ bool is_server_side() const override {
+ return policy.server;
+ }
+ bool is_lossy() const override {
+ return policy.lossy;
+ }
+
+private:
+ void requeue_sent() override;
+ std::tuple<seq_num_t, std::queue<MessageRef>> get_out_queue() override {
+ return {out_seq, std::move(out_q)};
+ }
+
};
-} // namespace net
-} // namespace ceph
+} // namespace ceph::net