1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2017 Red Hat, Inc
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "SocketConnection.h"
18 #include <seastar/core/shared_future.hh>
19 #include <seastar/core/sleep.hh>
20 #include <seastar/net/packet.hh>
22 #include "include/msgr.h"
23 #include "include/random.h"
24 #include "auth/Auth.h"
25 #include "auth/AuthSessionHandler.h"
27 #include "crimson/common/log.h"
30 #include "SocketMessenger.h"
32 using namespace ceph::net;
35 seastar::net::packet make_static_packet(const T& value) {
36 return { reinterpret_cast<const char*>(&value), sizeof(value) };
40 seastar::logger& logger() {
41 return ceph::get_logger(ceph_subsys_ms);
45 SocketConnection::SocketConnection(SocketMessenger& messenger,
46 const entity_addr_t& my_addr)
47 : Connection(my_addr),
49 send_ready(h.promise.get_future())
53 SocketConnection::~SocketConnection()
55 // errors were reported to callers of send()
56 ceph_assert(send_ready.available());
57 send_ready.ignore_ready_future();
61 SocketConnection::get_messenger() const {
65 bool SocketConnection::is_connected()
67 return !send_ready.failed();
70 void SocketConnection::read_tags_until_next_message()
72 seastar::repeat([this] {
74 return socket->read_exactly(1)
75 .then([this] (auto buf) {
77 throw std::system_error(make_error_code(error::read_eof));
80 case CEPH_MSGR_TAG_MSG:
81 // stop looping and notify read_header()
82 return seastar::make_ready_future<stop_t>(stop_t::yes);
83 case CEPH_MSGR_TAG_ACK:
85 case CEPH_MSGR_TAG_KEEPALIVE:
87 case CEPH_MSGR_TAG_KEEPALIVE2:
88 return handle_keepalive2()
89 .then([this] { return stop_t::no; });
90 case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
91 return handle_keepalive2_ack()
92 .then([this] { return stop_t::no; });
93 case CEPH_MSGR_TAG_CLOSE:
94 std::cout << "close" << std::endl;
97 return seastar::make_ready_future<stop_t>(stop_t::no);
99 }).handle_exception_type([this] (const std::system_error& e) {
100 if (e.code() == error::read_eof) {
104 }).then_wrapped([this] (auto fut) {
105 // satisfy the message promise
106 fut.forward_to(std::move(on_message));
110 seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
112 return socket->read_exactly(sizeof(ceph_le64))
113 .then([this] (auto buf) {
114 auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
115 discard_up_to(&sent, *seq);
120 void SocketConnection::discard_up_to(std::queue<MessageRef>* queue,
123 while (!queue->empty() &&
124 queue->front()->get_seq() < seq) {
129 void SocketConnection::requeue_sent()
131 out_seq -= sent.size();
132 while (!sent.empty()) {
133 auto m = sent.front();
135 out_q.push(std::move(m));
139 seastar::future<> SocketConnection::maybe_throttle()
141 if (!policy.throttler_bytes) {
142 return seastar::now();
144 const auto to_read = (m.header.front_len +
145 m.header.middle_len +
147 return policy.throttler_bytes->get(to_read);
150 seastar::future<MessageRef> SocketConnection::do_read_message()
152 return on_message.get_future()
154 on_message = seastar::promise<>{};
156 return socket->read(sizeof(m.header));
157 }).then([this] (bufferlist bl) {
158 // throttle the traffic, maybe
159 auto p = bl.cbegin();
160 ::decode(m.header, p);
161 return maybe_throttle();
164 return socket->read(m.header.front_len);
165 }).then([this] (bufferlist bl) {
166 m.front = std::move(bl);
168 return socket->read(m.header.middle_len);
169 }).then([this] (bufferlist bl) {
170 m.middle = std::move(bl);
172 return socket->read(m.header.data_len);
173 }).then([this] (bufferlist bl) {
174 m.data = std::move(bl);
176 return socket->read(sizeof(m.footer));
177 }).then([this] (bufferlist bl) {
178 // resume background processing of tags
179 read_tags_until_next_message();
181 auto p = bl.cbegin();
182 ::decode(m.footer, p);
183 auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
184 m.front, m.middle, m.data, nullptr);
185 // TODO: set time stamps
186 msg->set_byte_throttler(policy.throttler_bytes);
187 constexpr bool add_ref = false; // Message starts with 1 ref
188 return MessageRef{msg, add_ref};
192 seastar::future<MessageRef> SocketConnection::read_message()
194 return seastar::repeat_until_value([this] {
195 return do_read_message()
196 .then([this] (MessageRef msg) -> std::optional<MessageRef> {
197 if (!update_rx_seq(msg->get_seq())) {
198 // skip this request and read the next
206 bool SocketConnection::update_rx_seq(seq_num_t seq)
209 if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
210 conf.ms_die_on_old_message) {
211 ceph_abort_msg("old msgs despite reconnect_seq feature");
214 } else if (seq > in_seq + 1) {
215 if (conf.ms_die_on_skipped_message) {
216 ceph_abort_msg("skipped incoming seq");
225 seastar::future<> SocketConnection::write_message(MessageRef msg)
227 msg->set_seq(++out_seq);
228 msg->encode(features, messenger.get_crc_flags());
230 bl.append(CEPH_MSGR_TAG_MSG);
231 auto& header = msg->get_header();
232 bl.append((const char*)&header, sizeof(header));
233 bl.append(msg->get_payload());
234 bl.append(msg->get_middle());
235 bl.append(msg->get_data());
236 auto& footer = msg->get_footer();
237 if (HAVE_FEATURE(features, MSG_AUTH)) {
238 bl.append((const char*)&footer, sizeof(footer));
240 ceph_msg_footer_old old_footer;
241 if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
242 old_footer.front_crc = footer.front_crc;
243 old_footer.middle_crc = footer.middle_crc;
245 old_footer.front_crc = old_footer.middle_crc = 0;
247 if (messenger.get_crc_flags() & MSG_CRC_DATA) {
248 old_footer.data_crc = footer.data_crc;
250 old_footer.data_crc = 0;
252 old_footer.flags = footer.flags;
253 bl.append((const char*)&old_footer, sizeof(old_footer));
255 // write as a seastar::net::packet
256 return socket->write_flush(std::move(bl))
257 .then([this, msg = std::move(msg)] {
259 sent.push(std::move(msg));
264 seastar::future<> SocketConnection::send(MessageRef msg)
266 // chain the message after the last message is sent
267 seastar::shared_future<> f = send_ready.then(
268 [this, msg = std::move(msg)] {
269 return write_message(std::move(msg));
272 // chain any later messages after this one completes
273 send_ready = f.get_future();
274 // allow the caller to wait on the same future
275 return f.get_future();
278 seastar::future<> SocketConnection::keepalive()
280 seastar::shared_future<> f = send_ready.then([this] {
281 k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
282 ceph::coarse_real_clock::now());
283 return socket->write_flush(make_static_packet(k.req));
285 send_ready = f.get_future();
286 return f.get_future();
289 seastar::future<> SocketConnection::close()
291 if (state == state_t::closing) {
293 assert(close_ready.valid());
294 return close_ready.get_future();
297 // unregister_conn() drops a reference, so hold another until completion
298 auto cleanup = [conn = SocketConnectionRef(this)] {};
300 if (state == state_t::accepting) {
301 messenger.unaccept_conn(this);
302 } else if (state >= state_t::connecting && state < state_t::closing) {
303 messenger.unregister_conn(this);
309 // close_ready become valid only after state is state_t::closing
310 assert(!close_ready.valid());
313 close_ready = socket->close().finally(std::move(cleanup));
315 ceph_assert(state == state_t::connecting);
316 close_ready = seastar::now();
318 state = state_t::closing;
319 return close_ready.get_future();
324 /// store the banner in a non-const string for buffer::create_static()
325 static char banner[] = CEPH_BANNER;
326 constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
328 constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
329 constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
331 WRITE_RAW_ENCODER(ceph_msg_connect);
332 WRITE_RAW_ENCODER(ceph_msg_connect_reply);
334 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
336 return out << "connect{features=" << std::hex << c.features << std::dec
337 << " host_type=" << c.host_type
338 << " global_seq=" << c.global_seq
339 << " connect_seq=" << c.connect_seq
340 << " protocol_version=" << c.protocol_version
341 << " authorizer_protocol=" << c.authorizer_protocol
342 << " authorizer_len=" << c.authorizer_len
343 << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
346 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
348 return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
349 << " features=" << std::hex << r.features << std::dec
350 << " global_seq=" << r.global_seq
351 << " connect_seq=" << r.connect_seq
352 << " protocol_version=" << r.protocol_version
353 << " authorizer_len=" << r.authorizer_len
354 << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
357 // check that the buffer starts with a valid banner without requiring it to
358 // be contiguous in memory
359 static void validate_banner(bufferlist::const_iterator& p)
361 auto b = std::cbegin(banner);
362 auto end = b + banner_size;
364 const char *buf{nullptr};
365 auto remaining = std::distance(b, end);
366 auto len = p.get_ptr_and_advance(remaining, &buf);
367 if (!std::equal(buf, buf + len, b)) {
368 throw std::system_error(make_error_code(error::bad_connect_banner));
374 // make sure that we agree with the peer about its address
375 static void validate_peer_addr(const entity_addr_t& addr,
376 const entity_addr_t& expected)
378 if (addr == expected) {
381 // ok if server bound anonymously, as long as port/nonce match
382 if (addr.is_blank_ip() &&
383 addr.get_port() == expected.get_port() &&
384 addr.get_nonce() == expected.get_nonce()) {
387 throw std::system_error(make_error_code(error::bad_peer_address));
391 /// return a static bufferptr to the given object
392 template <typename T>
393 bufferptr create_static(T& obj)
395 return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
398 bool SocketConnection::require_auth_feature() const
400 if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
403 if (conf.cephx_require_signatures) {
406 if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
407 h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
408 return conf.cephx_cluster_require_signatures;
410 return conf.cephx_service_require_signatures;
414 uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const
416 constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
417 // see also OSD.h, unlike other connection of simple/async messenger,
418 // crimson msgr is only used by osd
419 constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
420 if (peer_type == my_type) {
422 return CEPH_OSD_PROTOCOL;
425 switch (connect ? peer_type : my_type) {
426 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
427 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
428 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
435 SocketConnection::repeat_handle_connect()
437 return socket->read(sizeof(h.connect))
438 .then([this](bufferlist bl) {
439 auto p = bl.cbegin();
440 ::decode(h.connect, p);
441 peer_type = h.connect.host_type;
442 return socket->read(h.connect.authorizer_len);
443 }).then([this] (bufferlist authorizer) {
444 if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
445 return seastar::make_ready_future<msgr_tag_t, bufferlist>(
446 CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
448 if (require_auth_feature()) {
449 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
451 if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features;
453 return seastar::make_ready_future<msgr_tag_t, bufferlist>(
454 CEPH_MSGR_TAG_FEATURES, bufferlist{});
456 return messenger.verify_authorizer(peer_type,
457 h.connect.authorizer_protocol,
459 }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
460 memset(&h.reply, 0, sizeof(h.reply));
462 return send_connect_reply(tag, std::move(authorizer_reply));
464 if (auto existing = messenger.lookup_conn(peer_addr); existing) {
465 return handle_connect_with_existing(existing, std::move(authorizer_reply));
466 } else if (h.connect.connect_seq > 0) {
467 return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
468 std::move(authorizer_reply));
470 h.connect_seq = h.connect.connect_seq + 1;
471 h.peer_global_seq = h.connect.global_seq;
472 set_features((uint64_t)policy.features_supported & (uint64_t)h.connect.features);
474 return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
479 SocketConnection::send_connect_reply(msgr_tag_t tag,
480 bufferlist&& authorizer_reply)
483 h.reply.features = static_cast<uint64_t>((h.connect.features &
484 policy.features_supported) |
485 policy.features_required);
486 h.reply.authorizer_len = authorizer_reply.length();
487 return socket->write(make_static_packet(h.reply))
488 .then([this, reply=std::move(authorizer_reply)]() mutable {
489 return socket->write_flush(std::move(reply));
494 SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
495 bufferlist&& authorizer_reply)
497 h.global_seq = messenger.get_global_seq();
499 h.reply.features = policy.features_supported;
500 h.reply.global_seq = h.global_seq;
501 h.reply.connect_seq = h.connect_seq;
504 h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
506 h.reply.authorizer_len = authorizer_reply.length();
507 return socket->write(make_static_packet(h.reply))
508 .then([this, reply=std::move(authorizer_reply)]() mutable {
509 if (reply.length()) {
510 return socket->write(std::move(reply));
512 return seastar::now();
515 if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
516 return socket->write_flush(make_static_packet(in_seq))
518 return socket->read_exactly(sizeof(seq_num_t));
519 }).then([this] (auto buf) {
520 auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
521 discard_up_to(&out_q, *acked_seq);
524 return socket->flush();
527 messenger.register_conn(this);
528 messenger.unaccept_conn(this);
529 state = state_t::open;
534 SocketConnection::handle_keepalive2()
536 return socket->read_exactly(sizeof(ceph_timespec))
537 .then([this] (auto buf) {
538 k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
539 std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
540 return socket->write_flush(make_static_packet(k.ack));
545 SocketConnection::handle_keepalive2_ack()
547 return socket->read_exactly(sizeof(ceph_timespec))
548 .then([this] (auto buf) {
549 auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
551 std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
556 SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply)
558 if (h.connect.global_seq < existing->peer_global_seq()) {
559 h.reply.global_seq = existing->peer_global_seq();
560 return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
561 } else if (existing->is_lossy()) {
562 return replace_existing(existing, std::move(authorizer_reply));
563 } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) {
564 return replace_existing(existing, std::move(authorizer_reply), true);
565 } else if (h.connect.connect_seq < existing->connect_seq()) {
566 // old attempt, or we sent READY but they didn't get it.
567 h.reply.connect_seq = existing->connect_seq() + 1;
568 return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
569 } else if (h.connect.connect_seq == existing->connect_seq()) {
570 // if the existing connection successfully opened, and/or
571 // subsequently went to standby, then the peer should bump
572 // their connect_seq and retry: this is not a connection race
573 // we need to resolve here.
574 if (existing->get_state() == state_t::open ||
575 existing->get_state() == state_t::standby) {
576 if (policy.resetcheck && existing->connect_seq() == 0) {
577 return replace_existing(existing, std::move(authorizer_reply));
579 h.reply.connect_seq = existing->connect_seq() + 1;
580 return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
582 } else if (peer_addr < my_addr ||
583 existing->is_server_side()) {
585 return replace_existing(existing, std::move(authorizer_reply));
587 return send_connect_reply(CEPH_MSGR_TAG_WAIT);
589 } else if (policy.resetcheck &&
590 existing->connect_seq() == 0) {
591 return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
593 return replace_existing(existing, std::move(authorizer_reply));
597 seastar::future<> SocketConnection::replace_existing(SocketConnectionRef existing,
598 bufferlist&& authorizer_reply,
599 bool is_reset_from_peer)
601 msgr_tag_t reply_tag;
602 if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
603 !is_reset_from_peer) {
604 reply_tag = CEPH_MSGR_TAG_SEQ;
606 reply_tag = CEPH_MSGR_TAG_READY;
608 messenger.unregister_conn(existing);
609 if (!existing->is_lossy()) {
610 // reset the in_seq if this is a hard reset from peer,
611 // otherwise we respect our original connection's value
612 in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
613 // steal outgoing queue and out_seq
614 existing->requeue_sent();
615 std::tie(out_seq, out_q) = existing->get_out_queue();
617 return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
620 seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
623 case CEPH_MSGR_TAG_FEATURES:
625 case CEPH_MSGR_TAG_BADPROTOVER:
627 case CEPH_MSGR_TAG_BADAUTHORIZER:
628 if (h.got_bad_auth) {
629 logger().error("{} got bad authorizer", __func__);
630 throw std::system_error(make_error_code(error::negotiation_failure));
632 h.got_bad_auth = true;
634 return messenger.get_authorizer(peer_type, true)
635 .then([this](auto&& auth) {
636 h.authorizer = std::move(auth);
637 return seastar::now();
639 case CEPH_MSGR_TAG_RESETSESSION:
641 return seastar::now();
642 case CEPH_MSGR_TAG_RETRY_GLOBAL:
643 h.global_seq = messenger.get_global_seq(h.reply.global_seq);
644 return seastar::now();
645 case CEPH_MSGR_TAG_RETRY_SESSION:
646 ceph_assert(h.reply.connect_seq > h.connect_seq);
647 h.connect_seq = h.reply.connect_seq;
648 return seastar::now();
649 case CEPH_MSGR_TAG_WAIT:
651 case CEPH_MSGR_TAG_SEQ:
653 case CEPH_MSGR_TAG_READY:
656 if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
660 if (tag == CEPH_MSGR_TAG_SEQ) {
661 return socket->read_exactly(sizeof(seq_num_t))
662 .then([this] (auto buf) {
663 auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
664 discard_up_to(&out_q, *acked_seq);
665 return socket->write_flush(make_static_packet(in_seq));
667 return handle_connect_reply(CEPH_MSGR_TAG_READY);
670 if (tag == CEPH_MSGR_TAG_READY) {
672 h.peer_global_seq = h.reply.global_seq;
673 policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
674 state = state_t::open;
677 set_features(h.reply.features & h.connect.features);
679 session_security.reset(
680 get_auth_session_handler(nullptr,
681 h.authorizer->protocol,
682 h.authorizer->session_key,
685 h.authorizer.reset();
686 return seastar::now();
689 logger().error("{} got unknown tag", __func__, int(tag));
690 throw std::system_error(make_error_code(error::negotiation_failure));
694 void SocketConnection::reset_session()
696 decltype(out_q){}.swap(out_q);
697 decltype(sent){}.swap(sent);
700 if (HAVE_FEATURE(features, MSG_AUTH)) {
701 // Set out_seq to a random value, so CRC won't be predictable.
702 // Constant to limit starting sequence number to 2^31. Nothing special
703 // about it, just a big number.
704 constexpr uint64_t SEQ_MASK = 0x7fffffff;
705 out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
707 // previously, seq #'s always started at 0.
712 seastar::future<> SocketConnection::repeat_connect()
714 // encode ceph_msg_connect
715 memset(&h.connect, 0, sizeof(h.connect));
716 h.connect.features = policy.features_supported;
717 h.connect.host_type = messenger.get_myname().type();
718 h.connect.global_seq = h.global_seq;
719 h.connect.connect_seq = h.connect_seq;
720 h.connect.protocol_version = get_proto_version(peer_type, true);
721 // this is fyi, actually, server decides!
722 h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
724 return messenger.get_authorizer(peer_type, false)
725 .then([this](auto&& auth) {
726 h.authorizer = std::move(auth);
729 h.connect.authorizer_protocol = h.authorizer->protocol;
730 h.connect.authorizer_len = h.authorizer->bl.length();
731 bl.append(create_static(h.connect));
732 bl.append(h.authorizer->bl);
734 h.connect.authorizer_protocol = 0;
735 h.connect.authorizer_len = 0;
736 bl.append(create_static(h.connect));
738 return socket->write_flush(std::move(bl));
741 return socket->read(sizeof(h.reply));
742 }).then([this] (bufferlist bl) {
743 auto p = bl.cbegin();
744 ::decode(h.reply, p);
745 ceph_assert(p.end());
746 return socket->read(h.reply.authorizer_len);
747 }).then([this] (bufferlist bl) {
749 auto reply = bl.cbegin();
750 if (!h.authorizer->verify_reply(reply)) {
751 logger().error("{} authorizer failed to verify reply", __func__);
752 throw std::system_error(make_error_code(error::negotiation_failure));
755 return handle_connect_reply(h.reply.tag);
760 SocketConnection::start_connect(const entity_addr_t& _peer_addr,
761 const entity_type_t& _peer_type)
763 ceph_assert(state == state_t::none);
764 ceph_assert(!socket);
765 peer_addr = _peer_addr;
766 peer_type = _peer_type;
767 messenger.register_conn(this);
768 state = state_t::connecting;
769 return seastar::connect(peer_addr.in4_addr())
770 .then([this](seastar::connected_socket fd) {
771 if (state == state_t::closing) {
773 fd.shutdown_output();
774 throw std::system_error(make_error_code(error::connection_aborted));
776 socket.emplace(std::move(fd));
777 // read server's handshake header
778 return socket->read(server_header_size);
779 }).then([this] (bufferlist headerbl) {
780 auto p = headerbl.cbegin();
782 entity_addr_t saddr, caddr;
785 ceph_assert(p.end());
786 validate_peer_addr(saddr, peer_addr);
788 if (my_addr != caddr) {
789 // take peer's address for me, but preserve my nonce
790 caddr.nonce = my_addr.nonce;
793 // encode/send client's handshake header
795 bl.append(buffer::create_static(banner_size, banner));
796 ::encode(my_addr, bl, 0);
797 h.global_seq = messenger.get_global_seq();
798 return socket->write_flush(std::move(bl));
800 return seastar::do_until([=] { return state == state_t::open; },
801 [=] { return repeat_connect(); });
803 // start background processing of tags
804 read_tags_until_next_message();
805 }).then_wrapped([this] (auto fut) {
806 // satisfy the handshake's promise
807 fut.forward_to(std::move(h.promise));
812 SocketConnection::start_accept(seastar::connected_socket&& fd,
813 const entity_addr_t& _peer_addr)
815 ceph_assert(state == state_t::none);
816 ceph_assert(!socket);
817 peer_addr = _peer_addr;
818 socket.emplace(std::move(fd));
819 messenger.accept_conn(this);
820 state = state_t::accepting;
821 // encode/send server's handshake header
823 bl.append(buffer::create_static(banner_size, banner));
824 ::encode(my_addr, bl, 0);
825 ::encode(peer_addr, bl, 0);
826 return socket->write_flush(std::move(bl))
828 // read client's handshake header and connect request
829 return socket->read(client_header_size);
830 }).then([this] (bufferlist bl) {
831 auto p = bl.cbegin();
835 ceph_assert(p.end());
836 if (!addr.is_blank_ip()) {
840 return seastar::do_until([this] { return state == state_t::open; },
841 [this] { return repeat_handle_connect(); });
843 // start background processing of tags
844 read_tags_until_next_message();
845 }).then_wrapped([this] (auto fut) {
846 // satisfy the handshake's promise
847 fut.forward_to(std::move(h.promise));
851 seastar::future<> SocketConnection::fault()
854 messenger.unregister_conn(this);
856 if (h.backoff.count()) {
857 h.backoff += h.backoff;
859 h.backoff = conf.ms_initial_backoff;
861 if (h.backoff > conf.ms_max_backoff) {
862 h.backoff = conf.ms_max_backoff;
864 return seastar::sleep(h.backoff);