1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2017 Red Hat, Inc
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "SocketConnection.h"
18 #include <seastar/core/shared_future.hh>
19 #include <seastar/core/sleep.hh>
20 #include <seastar/net/packet.hh>
22 #include "include/msgr.h"
23 #include "include/random.h"
24 #include "auth/Auth.h"
25 #include "auth/AuthSessionHandler.h"
27 #include "crimson/common/log.h"
30 #include "SocketMessenger.h"
32 using namespace ceph::net;
35 seastar::net::packet make_static_packet(const T& value) {
36 return { reinterpret_cast<const char*>(&value), sizeof(value) };
40 seastar::logger& logger() {
41 return ceph::get_logger(ceph_subsys_ms);
45 SocketConnection::SocketConnection(SocketMessenger& messenger,
46 const entity_addr_t& my_addr)
47 : Connection(my_addr),
49 send_ready(h.promise.get_future())
53 SocketConnection::~SocketConnection()
55 // errors were reported to callers of send()
56 ceph_assert(send_ready.available());
57 send_ready.ignore_ready_future();
61 SocketConnection::get_messenger() const {
65 bool SocketConnection::is_connected()
67 return !send_ready.failed();
70 void SocketConnection::read_tags_until_next_message()
72 seastar::repeat([this] {
74 return socket->read_exactly(1)
75 .then([this] (auto buf) {
77 throw std::system_error(make_error_code(error::read_eof));
80 case CEPH_MSGR_TAG_MSG:
81 // stop looping and notify read_header()
82 return seastar::make_ready_future<stop_t>(stop_t::yes);
83 case CEPH_MSGR_TAG_ACK:
85 case CEPH_MSGR_TAG_KEEPALIVE:
87 case CEPH_MSGR_TAG_KEEPALIVE2:
88 return handle_keepalive2()
89 .then([this] { return stop_t::no; });
90 case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
91 return handle_keepalive2_ack()
92 .then([this] { return stop_t::no; });
93 case CEPH_MSGR_TAG_CLOSE:
94 std::cout << "close" << std::endl;
97 return seastar::make_ready_future<stop_t>(stop_t::no);
99 }).handle_exception_type([this] (const std::system_error& e) {
100 if (e.code() == error::read_eof) {
104 }).then_wrapped([this] (auto fut) {
105 // satisfy the message promise
106 fut.forward_to(std::move(on_message));
110 seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
112 return socket->read_exactly(sizeof(ceph_le64))
113 .then([this] (auto buf) {
114 auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
115 discard_up_to(&sent, *seq);
120 void SocketConnection::discard_up_to(std::queue<MessageRef>* queue,
123 while (!queue->empty() &&
124 queue->front()->get_seq() < seq) {
129 void SocketConnection::requeue_sent()
131 out_seq -= sent.size();
132 while (!sent.empty()) {
133 auto m = sent.front();
135 out_q.push(std::move(m));
139 seastar::future<> SocketConnection::maybe_throttle()
141 if (!policy.throttler_bytes) {
142 return seastar::now();
144 const auto to_read = (m.header.front_len +
145 m.header.middle_len +
147 return policy.throttler_bytes->get(to_read);
150 seastar::future<MessageRef> SocketConnection::do_read_message()
152 return on_message.get_future()
154 on_message = seastar::promise<>{};
156 return socket->read(sizeof(m.header));
157 }).then([this] (bufferlist bl) {
158 // throttle the traffic, maybe
159 auto p = bl.cbegin();
160 ::decode(m.header, p);
161 return maybe_throttle();
164 return socket->read(m.header.front_len);
165 }).then([this] (bufferlist bl) {
166 m.front = std::move(bl);
168 return socket->read(m.header.middle_len);
169 }).then([this] (bufferlist bl) {
170 m.middle = std::move(bl);
172 return socket->read(m.header.data_len);
173 }).then([this] (bufferlist bl) {
174 m.data = std::move(bl);
176 return socket->read(sizeof(m.footer));
177 }).then([this] (bufferlist bl) {
178 // resume background processing of tags
179 read_tags_until_next_message();
181 auto p = bl.cbegin();
182 ::decode(m.footer, p);
183 auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
184 m.front, m.middle, m.data, nullptr);
185 // TODO: set time stamps
186 msg->set_byte_throttler(policy.throttler_bytes);
187 constexpr bool add_ref = false; // Message starts with 1 ref
188 return MessageRef{msg, add_ref};
192 seastar::future<MessageRef> SocketConnection::read_message()
194 return seastar::repeat_until_value([this] {
195 return do_read_message()
196 .then([this] (MessageRef msg) -> std::optional<MessageRef> {
197 if (!update_rx_seq(msg->get_seq())) {
198 // skip this request and read the next
206 bool SocketConnection::update_rx_seq(seq_num_t seq)
209 if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
210 conf.ms_die_on_old_message) {
211 ceph_abort_msg("old msgs despite reconnect_seq feature");
214 } else if (seq > in_seq + 1) {
215 if (conf.ms_die_on_skipped_message) {
216 ceph_abort_msg("skipped incoming seq");
225 seastar::future<> SocketConnection::write_message(MessageRef msg)
227 msg->set_seq(++out_seq);
228 msg->encode(features, messenger.get_crc_flags());
230 bl.append(CEPH_MSGR_TAG_MSG);
231 auto& header = msg->get_header();
232 bl.append((const char*)&header, sizeof(header));
233 bl.append(msg->get_payload());
234 bl.append(msg->get_middle());
235 bl.append(msg->get_data());
236 auto& footer = msg->get_footer();
237 if (HAVE_FEATURE(features, MSG_AUTH)) {
238 bl.append((const char*)&footer, sizeof(footer));
240 ceph_msg_footer_old old_footer;
241 if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
242 old_footer.front_crc = footer.front_crc;
243 old_footer.middle_crc = footer.middle_crc;
245 old_footer.front_crc = old_footer.middle_crc = 0;
247 if (messenger.get_crc_flags() & MSG_CRC_DATA) {
248 old_footer.data_crc = footer.data_crc;
250 old_footer.data_crc = 0;
252 old_footer.flags = footer.flags;
253 bl.append((const char*)&old_footer, sizeof(old_footer));
255 // write as a seastar::net::packet
256 return socket->write_flush(std::move(bl))
257 .then([this, msg = std::move(msg)] {
259 sent.push(std::move(msg));
264 seastar::future<> SocketConnection::send(MessageRef msg)
266 // chain the message after the last message is sent
267 seastar::shared_future<> f = send_ready.then(
268 [this, msg = std::move(msg)] {
269 return write_message(std::move(msg));
272 // chain any later messages after this one completes
273 send_ready = f.get_future();
274 // allow the caller to wait on the same future
275 return f.get_future();
278 seastar::future<> SocketConnection::keepalive()
280 seastar::shared_future<> f = send_ready.then([this] {
281 k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
282 ceph::coarse_real_clock::now());
283 return socket->write_flush(make_static_packet(k.req));
285 send_ready = f.get_future();
286 return f.get_future();
289 seastar::future<> SocketConnection::close()
291 if (state == state_t::closing) {
293 assert(close_ready.valid());
294 return close_ready.get_future();
297 // unregister_conn() drops a reference, so hold another until completion
298 auto cleanup = [conn = SocketConnectionRef(this)] {};
300 if (state == state_t::accepting) {
301 messenger.unaccept_conn(this);
302 } else if (state >= state_t::connecting && state < state_t::closing) {
303 messenger.unregister_conn(this);
309 // close_ready become valid only after state is state_t::closing
310 assert(!close_ready.valid());
313 close_ready = socket->close().finally(std::move(cleanup));
315 ceph_assert(state == state_t::connecting);
316 close_ready = seastar::now();
318 state = state_t::closing;
319 return close_ready.get_future();
324 /// store the banner in a non-const string for buffer::create_static()
325 static char banner[] = CEPH_BANNER;
326 constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
328 constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
329 constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
331 WRITE_RAW_ENCODER(ceph_msg_connect);
332 WRITE_RAW_ENCODER(ceph_msg_connect_reply);
334 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
336 return out << "connect{features=" << std::hex << c.features << std::dec
337 << " host_type=" << c.host_type
338 << " global_seq=" << c.global_seq
339 << " connect_seq=" << c.connect_seq
340 << " protocol_version=" << c.protocol_version
341 << " authorizer_protocol=" << c.authorizer_protocol
342 << " authorizer_len=" << c.authorizer_len
343 << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
346 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
348 return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
349 << " features=" << std::hex << r.features << std::dec
350 << " global_seq=" << r.global_seq
351 << " connect_seq=" << r.connect_seq
352 << " protocol_version=" << r.protocol_version
353 << " authorizer_len=" << r.authorizer_len
354 << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
357 // check that the buffer starts with a valid banner without requiring it to
358 // be contiguous in memory
359 static void validate_banner(bufferlist::const_iterator& p)
361 auto b = std::cbegin(banner);
362 auto end = b + banner_size;
364 const char *buf{nullptr};
365 auto remaining = std::distance(b, end);
366 auto len = p.get_ptr_and_advance(remaining, &buf);
367 if (!std::equal(buf, buf + len, b)) {
368 throw std::system_error(make_error_code(error::bad_connect_banner));
374 // make sure that we agree with the peer about its address
375 static void validate_peer_addr(const entity_addr_t& addr,
376 const entity_addr_t& expected)
378 if (addr == expected) {
381 // ok if server bound anonymously, as long as port/nonce match
382 if (addr.is_blank_ip() &&
383 addr.get_port() == expected.get_port() &&
384 addr.get_nonce() == expected.get_nonce()) {
387 throw std::system_error(make_error_code(error::bad_peer_address));
391 /// return a static bufferptr to the given object
392 template <typename T>
393 bufferptr create_static(T& obj)
395 return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
398 bool SocketConnection::require_auth_feature() const
400 if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
403 if (conf.cephx_require_signatures) {
406 if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
407 h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
408 return conf.cephx_cluster_require_signatures;
410 return conf.cephx_service_require_signatures;
414 uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const
416 constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
417 // see also OSD.h, unlike other connection of simple/async messenger,
418 // crimson msgr is only used by osd
419 constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
420 if (peer_type == my_type) {
422 return CEPH_OSD_PROTOCOL;
425 switch (connect ? peer_type : my_type) {
426 case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
427 case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
428 case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
434 seastar::future<seastar::stop_iteration>
435 SocketConnection::repeat_handle_connect()
437 return socket->read(sizeof(h.connect))
438 .then([this](bufferlist bl) {
439 auto p = bl.cbegin();
440 ::decode(h.connect, p);
441 peer_type = h.connect.host_type;
442 return socket->read(h.connect.authorizer_len);
443 }).then([this] (bufferlist authorizer) {
444 if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
445 return seastar::make_ready_future<msgr_tag_t, bufferlist>(
446 CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
448 if (require_auth_feature()) {
449 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
451 if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features;
453 return seastar::make_ready_future<msgr_tag_t, bufferlist>(
454 CEPH_MSGR_TAG_FEATURES, bufferlist{});
456 return messenger.verify_authorizer(peer_type,
457 h.connect.authorizer_protocol,
459 }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
460 memset(&h.reply, 0, sizeof(h.reply));
462 return send_connect_reply(tag, std::move(authorizer_reply));
464 if (auto existing = messenger.lookup_conn(peer_addr); existing) {
465 return handle_connect_with_existing(existing, std::move(authorizer_reply));
466 } else if (h.connect.connect_seq > 0) {
467 return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
468 std::move(authorizer_reply));
470 h.connect_seq = h.connect.connect_seq + 1;
471 h.peer_global_seq = h.connect.global_seq;
472 set_features((uint64_t)policy.features_supported & (uint64_t)h.connect.features);
474 return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
478 seastar::future<seastar::stop_iteration>
479 SocketConnection::send_connect_reply(msgr_tag_t tag,
480 bufferlist&& authorizer_reply)
483 h.reply.features = static_cast<uint64_t>((h.connect.features &
484 policy.features_supported) |
485 policy.features_required);
486 h.reply.authorizer_len = authorizer_reply.length();
487 return socket->write(make_static_packet(h.reply))
488 .then([this, reply=std::move(authorizer_reply)]() mutable {
489 return socket->write_flush(std::move(reply));
495 seastar::future<seastar::stop_iteration>
496 SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
497 bufferlist&& authorizer_reply)
499 h.global_seq = messenger.get_global_seq();
501 h.reply.features = policy.features_supported;
502 h.reply.global_seq = h.global_seq;
503 h.reply.connect_seq = h.connect_seq;
506 h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
508 h.reply.authorizer_len = authorizer_reply.length();
509 return socket->write(make_static_packet(h.reply))
510 .then([this, reply=std::move(authorizer_reply)]() mutable {
511 if (reply.length()) {
512 return socket->write(std::move(reply));
514 return seastar::now();
517 if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
518 return socket->write_flush(make_static_packet(in_seq))
520 return socket->read_exactly(sizeof(seq_num_t));
521 }).then([this] (auto buf) {
522 auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
523 discard_up_to(&out_q, *acked_seq);
526 return socket->flush();
529 messenger.register_conn(this);
530 messenger.unaccept_conn(this);
536 SocketConnection::handle_keepalive2()
538 return socket->read_exactly(sizeof(ceph_timespec))
539 .then([this] (auto buf) {
540 k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
541 std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
542 return socket->write_flush(make_static_packet(k.ack));
547 SocketConnection::handle_keepalive2_ack()
549 return socket->read_exactly(sizeof(ceph_timespec))
550 .then([this] (auto buf) {
551 auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
553 std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
557 seastar::future<seastar::stop_iteration>
558 SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply)
560 if (h.connect.global_seq < existing->peer_global_seq()) {
561 h.reply.global_seq = existing->peer_global_seq();
562 return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
563 } else if (existing->is_lossy()) {
564 return replace_existing(existing, std::move(authorizer_reply));
565 } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) {
566 return replace_existing(existing, std::move(authorizer_reply), true);
567 } else if (h.connect.connect_seq < existing->connect_seq()) {
568 // old attempt, or we sent READY but they didn't get it.
569 h.reply.connect_seq = existing->connect_seq() + 1;
570 return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
571 } else if (h.connect.connect_seq == existing->connect_seq()) {
572 // if the existing connection successfully opened, and/or
573 // subsequently went to standby, then the peer should bump
574 // their connect_seq and retry: this is not a connection race
575 // we need to resolve here.
576 if (existing->get_state() == state_t::open ||
577 existing->get_state() == state_t::standby) {
578 if (policy.resetcheck && existing->connect_seq() == 0) {
579 return replace_existing(existing, std::move(authorizer_reply));
581 h.reply.connect_seq = existing->connect_seq() + 1;
582 return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
584 } else if (peer_addr < my_addr ||
585 existing->is_server_side()) {
587 return replace_existing(existing, std::move(authorizer_reply));
589 return send_connect_reply(CEPH_MSGR_TAG_WAIT);
591 } else if (policy.resetcheck &&
592 existing->connect_seq() == 0) {
593 return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
595 return replace_existing(existing, std::move(authorizer_reply));
599 seastar::future<seastar::stop_iteration>
600 SocketConnection::replace_existing(SocketConnectionRef existing,
601 bufferlist&& authorizer_reply,
602 bool is_reset_from_peer)
604 msgr_tag_t reply_tag;
605 if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
606 !is_reset_from_peer) {
607 reply_tag = CEPH_MSGR_TAG_SEQ;
609 reply_tag = CEPH_MSGR_TAG_READY;
611 messenger.unregister_conn(existing);
612 if (!existing->is_lossy()) {
613 // reset the in_seq if this is a hard reset from peer,
614 // otherwise we respect our original connection's value
615 in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
616 // steal outgoing queue and out_seq
617 existing->requeue_sent();
618 std::tie(out_seq, out_q) = existing->get_out_queue();
620 return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
623 seastar::future<seastar::stop_iteration>
624 SocketConnection::handle_connect_reply(msgr_tag_t tag)
627 case CEPH_MSGR_TAG_FEATURES:
628 logger().error("{} connect protocol feature mispatch", __func__);
629 throw std::system_error(make_error_code(error::negotiation_failure));
630 case CEPH_MSGR_TAG_BADPROTOVER:
631 logger().error("{} connect protocol version mispatch", __func__);
632 throw std::system_error(make_error_code(error::negotiation_failure));
633 case CEPH_MSGR_TAG_BADAUTHORIZER:
634 if (h.got_bad_auth) {
635 logger().error("{} got bad authorizer", __func__);
636 throw std::system_error(make_error_code(error::negotiation_failure));
638 h.got_bad_auth = true;
640 return messenger.get_authorizer(peer_type, true)
641 .then([this](auto&& auth) {
642 h.authorizer = std::move(auth);
645 case CEPH_MSGR_TAG_RESETSESSION:
647 return seastar::make_ready_future<stop_t>(stop_t::no);
648 case CEPH_MSGR_TAG_RETRY_GLOBAL:
649 h.global_seq = messenger.get_global_seq(h.reply.global_seq);
650 return seastar::make_ready_future<stop_t>(stop_t::no);
651 case CEPH_MSGR_TAG_RETRY_SESSION:
652 ceph_assert(h.reply.connect_seq > h.connect_seq);
653 h.connect_seq = h.reply.connect_seq;
654 return seastar::make_ready_future<stop_t>(stop_t::no);
655 case CEPH_MSGR_TAG_WAIT:
657 throw std::system_error(make_error_code(error::negotiation_failure));
658 case CEPH_MSGR_TAG_SEQ:
660 case CEPH_MSGR_TAG_READY:
663 if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
665 logger().error("{} missing required features", __func__);
666 throw std::system_error(make_error_code(error::negotiation_failure));
668 if (tag == CEPH_MSGR_TAG_SEQ) {
669 return socket->read_exactly(sizeof(seq_num_t))
670 .then([this] (auto buf) {
671 auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
672 discard_up_to(&out_q, *acked_seq);
673 return socket->write_flush(make_static_packet(in_seq));
675 return handle_connect_reply(CEPH_MSGR_TAG_READY);
678 if (tag == CEPH_MSGR_TAG_READY) {
680 h.peer_global_seq = h.reply.global_seq;
681 policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
682 state = state_t::open;
685 set_features(h.reply.features & h.connect.features);
687 session_security.reset(
688 get_auth_session_handler(nullptr,
689 h.authorizer->protocol,
690 h.authorizer->session_key,
693 h.authorizer.reset();
694 return seastar::make_ready_future<stop_t>(stop_t::yes);
697 logger().error("{} got unknown tag", __func__, int(tag));
698 throw std::system_error(make_error_code(error::negotiation_failure));
702 void SocketConnection::reset_session()
704 decltype(out_q){}.swap(out_q);
705 decltype(sent){}.swap(sent);
708 if (HAVE_FEATURE(features, MSG_AUTH)) {
709 // Set out_seq to a random value, so CRC won't be predictable.
710 // Constant to limit starting sequence number to 2^31. Nothing special
711 // about it, just a big number.
712 constexpr uint64_t SEQ_MASK = 0x7fffffff;
713 out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
715 // previously, seq #'s always started at 0.
720 seastar::future<seastar::stop_iteration>
721 SocketConnection::repeat_connect()
723 // encode ceph_msg_connect
724 memset(&h.connect, 0, sizeof(h.connect));
725 h.connect.features = policy.features_supported;
726 h.connect.host_type = messenger.get_myname().type();
727 h.connect.global_seq = h.global_seq;
728 h.connect.connect_seq = h.connect_seq;
729 h.connect.protocol_version = get_proto_version(peer_type, true);
730 // this is fyi, actually, server decides!
731 h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
733 return messenger.get_authorizer(peer_type, false)
734 .then([this](auto&& auth) {
735 h.authorizer = std::move(auth);
738 h.connect.authorizer_protocol = h.authorizer->protocol;
739 h.connect.authorizer_len = h.authorizer->bl.length();
740 bl.append(create_static(h.connect));
741 bl.append(h.authorizer->bl);
743 h.connect.authorizer_protocol = 0;
744 h.connect.authorizer_len = 0;
745 bl.append(create_static(h.connect));
747 return socket->write_flush(std::move(bl));
750 return socket->read(sizeof(h.reply));
751 }).then([this] (bufferlist bl) {
752 auto p = bl.cbegin();
753 ::decode(h.reply, p);
754 ceph_assert(p.end());
755 return socket->read(h.reply.authorizer_len);
756 }).then([this] (bufferlist bl) {
758 auto reply = bl.cbegin();
759 if (!h.authorizer->verify_reply(reply)) {
760 logger().error("{} authorizer failed to verify reply", __func__);
761 throw std::system_error(make_error_code(error::negotiation_failure));
764 return handle_connect_reply(h.reply.tag);
769 SocketConnection::start_connect(const entity_addr_t& _peer_addr,
770 const entity_type_t& _peer_type)
772 ceph_assert(state == state_t::none);
773 ceph_assert(!socket);
774 peer_addr = _peer_addr;
775 peer_type = _peer_type;
776 messenger.register_conn(this);
777 state = state_t::connecting;
778 return seastar::connect(peer_addr.in4_addr())
779 .then([this](seastar::connected_socket fd) {
780 if (state == state_t::closing) {
782 fd.shutdown_output();
783 throw std::system_error(make_error_code(error::connection_aborted));
785 socket.emplace(std::move(fd));
786 // read server's handshake header
787 return socket->read(server_header_size);
788 }).then([this] (bufferlist headerbl) {
789 auto p = headerbl.cbegin();
791 entity_addr_t saddr, caddr;
794 ceph_assert(p.end());
795 validate_peer_addr(saddr, peer_addr);
797 if (my_addr != caddr) {
798 // take peer's address for me, but preserve my nonce
799 caddr.nonce = my_addr.nonce;
802 // encode/send client's handshake header
804 bl.append(buffer::create_static(banner_size, banner));
805 ::encode(my_addr, bl, 0);
806 h.global_seq = messenger.get_global_seq();
807 return socket->write_flush(std::move(bl));
809 return seastar::repeat([this] {
810 return repeat_connect();
813 state = state_t::open;
814 // start background processing of tags
815 read_tags_until_next_message();
816 }).then_wrapped([this] (auto fut) {
817 // satisfy the handshake's promise
818 fut.forward_to(std::move(h.promise));
823 SocketConnection::start_accept(seastar::connected_socket&& fd,
824 const entity_addr_t& _peer_addr)
826 ceph_assert(state == state_t::none);
827 ceph_assert(!socket);
828 peer_addr = _peer_addr;
829 socket.emplace(std::move(fd));
830 messenger.accept_conn(this);
831 state = state_t::accepting;
832 // encode/send server's handshake header
834 bl.append(buffer::create_static(banner_size, banner));
835 ::encode(my_addr, bl, 0);
836 ::encode(peer_addr, bl, 0);
837 return socket->write_flush(std::move(bl))
839 // read client's handshake header and connect request
840 return socket->read(client_header_size);
841 }).then([this] (bufferlist bl) {
842 auto p = bl.cbegin();
846 ceph_assert(p.end());
847 if (!addr.is_blank_ip()) {
851 return seastar::repeat([this] {
852 return repeat_handle_connect();
855 state = state_t::open;
856 // start background processing of tags
857 read_tags_until_next_message();
858 }).then_wrapped([this] (auto fut) {
859 // satisfy the handshake's promise
860 fut.forward_to(std::move(h.promise));
864 seastar::future<> SocketConnection::fault()
867 messenger.unregister_conn(this);
869 if (h.backoff.count()) {
870 h.backoff += h.backoff;
872 h.backoff = conf.ms_initial_backoff;
874 if (h.backoff > conf.ms_max_backoff) {
875 h.backoff = conf.ms_max_backoff;
877 return seastar::sleep(h.backoff);