crimson/net: define an alias of seastar::stop_iteration
[ceph.git] / src / crimson / net / SocketConnection.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2017 Red Hat, Inc
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "SocketConnection.h"
16
17 #include <algorithm>
18 #include <seastar/core/shared_future.hh>
19 #include <seastar/core/sleep.hh>
20 #include <seastar/net/packet.hh>
21
22 #include "include/msgr.h"
23 #include "include/random.h"
24 #include "auth/Auth.h"
25 #include "auth/AuthSessionHandler.h"
26
27 #include "crimson/common/log.h"
28 #include "Config.h"
29 #include "Errors.h"
30 #include "SocketMessenger.h"
31
32 using namespace ceph::net;
33
34 template <typename T>
35 seastar::net::packet make_static_packet(const T& value) {
36     return { reinterpret_cast<const char*>(&value), sizeof(value) };
37 }
38
39 namespace {
40   seastar::logger& logger() {
41     return ceph::get_logger(ceph_subsys_ms);
42   }
43 }
44
45 SocketConnection::SocketConnection(SocketMessenger& messenger,
46                                    const entity_addr_t& my_addr)
47   : Connection(my_addr),
48     messenger(messenger),
49     send_ready(h.promise.get_future())
50 {
51 }
52
53 SocketConnection::~SocketConnection()
54 {
55   // errors were reported to callers of send()
56   ceph_assert(send_ready.available());
57   send_ready.ignore_ready_future();
58 }
59
60 ceph::net::Messenger*
61 SocketConnection::get_messenger() const {
62   return &messenger;
63 }
64
65 bool SocketConnection::is_connected()
66 {
67   return !send_ready.failed();
68 }
69
70 void SocketConnection::read_tags_until_next_message()
71 {
72   seastar::repeat([this] {
73       // read the next tag
74       return socket->read_exactly(1)
75         .then([this] (auto buf) {
76           if (buf.empty()) {
77             throw std::system_error(make_error_code(error::read_eof));
78           }
79           switch (buf[0]) {
80           case CEPH_MSGR_TAG_MSG:
81             // stop looping and notify read_header()
82             return seastar::make_ready_future<stop_t>(stop_t::yes);
83           case CEPH_MSGR_TAG_ACK:
84             return handle_ack();
85           case CEPH_MSGR_TAG_KEEPALIVE:
86             break;
87           case CEPH_MSGR_TAG_KEEPALIVE2:
88             return handle_keepalive2()
89               .then([this] { return stop_t::no; });
90           case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
91             return handle_keepalive2_ack()
92               .then([this] { return stop_t::no; });
93           case CEPH_MSGR_TAG_CLOSE:
94             std::cout << "close" << std::endl;
95             break;
96           }
97           return seastar::make_ready_future<stop_t>(stop_t::no);
98         });
99     }).handle_exception_type([this] (const std::system_error& e) {
100       if (e.code() == error::read_eof) {
101         close();
102       }
103       throw e;
104     }).then_wrapped([this] (auto fut) {
105       // satisfy the message promise
106       fut.forward_to(std::move(on_message));
107     });
108 }
109
110 seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
111 {
112   return socket->read_exactly(sizeof(ceph_le64))
113     .then([this] (auto buf) {
114       auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
115       discard_up_to(&sent, *seq);
116       return stop_t::no;
117     });
118 }
119
120 void SocketConnection::discard_up_to(std::queue<MessageRef>* queue,
121                                      seq_num_t seq)
122 {
123   while (!queue->empty() &&
124          queue->front()->get_seq() < seq) {
125     queue->pop();
126   }
127 }
128
129 void SocketConnection::requeue_sent()
130 {
131   out_seq -= sent.size();
132   while (!sent.empty()) {
133     auto m = sent.front();
134     sent.pop();
135     out_q.push(std::move(m));
136   }
137 }
138
139 seastar::future<> SocketConnection::maybe_throttle()
140 {
141   if (!policy.throttler_bytes) {
142     return seastar::now();
143   }
144   const auto to_read = (m.header.front_len +
145                         m.header.middle_len +
146                         m.header.data_len);
147   return policy.throttler_bytes->get(to_read);
148 }
149
150 seastar::future<MessageRef> SocketConnection::do_read_message()
151 {
152   return on_message.get_future()
153     .then([this] {
154       on_message = seastar::promise<>{};
155       // read header
156       return socket->read(sizeof(m.header));
157     }).then([this] (bufferlist bl) {
158       // throttle the traffic, maybe
159       auto p = bl.cbegin();
160       ::decode(m.header, p);
161       return maybe_throttle();
162     }).then([this] {
163       // read front
164       return socket->read(m.header.front_len);
165     }).then([this] (bufferlist bl) {
166       m.front = std::move(bl);
167       // read middle
168       return socket->read(m.header.middle_len);
169     }).then([this] (bufferlist bl) {
170       m.middle = std::move(bl);
171       // read data
172       return socket->read(m.header.data_len);
173     }).then([this] (bufferlist bl) {
174       m.data = std::move(bl);
175       // read footer
176       return socket->read(sizeof(m.footer));
177     }).then([this] (bufferlist bl) {
178       // resume background processing of tags
179       read_tags_until_next_message();
180
181       auto p = bl.cbegin();
182       ::decode(m.footer, p);
183       auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
184                                   m.front, m.middle, m.data, nullptr);
185       // TODO: set time stamps
186       msg->set_byte_throttler(policy.throttler_bytes);
187       constexpr bool add_ref = false; // Message starts with 1 ref
188       return MessageRef{msg, add_ref};
189     });
190 }
191
192 seastar::future<MessageRef> SocketConnection::read_message()
193 {
194   return seastar::repeat_until_value([this] {
195       return do_read_message()
196         .then([this] (MessageRef msg) -> std::optional<MessageRef> {
197           if (!update_rx_seq(msg->get_seq())) {
198             // skip this request and read the next
199             return {};
200           }
201           return msg;
202         });
203     });
204 }
205
206 bool SocketConnection::update_rx_seq(seq_num_t seq)
207 {
208   if (seq <= in_seq) {
209     if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
210         conf.ms_die_on_old_message) {
211       ceph_abort_msg("old msgs despite reconnect_seq feature");
212     }
213     return false;
214   } else if (seq > in_seq + 1) {
215     if (conf.ms_die_on_skipped_message) {
216       ceph_abort_msg("skipped incoming seq");
217     }
218     return false;
219   } else {
220     in_seq = seq;
221     return true;
222   }
223 }
224
225 seastar::future<> SocketConnection::write_message(MessageRef msg)
226 {
227   msg->set_seq(++out_seq);
228   msg->encode(features, messenger.get_crc_flags());
229   bufferlist bl;
230   bl.append(CEPH_MSGR_TAG_MSG);
231   auto& header = msg->get_header();
232   bl.append((const char*)&header, sizeof(header));
233   bl.append(msg->get_payload());
234   bl.append(msg->get_middle());
235   bl.append(msg->get_data());
236   auto& footer = msg->get_footer();
237   if (HAVE_FEATURE(features, MSG_AUTH)) {
238     bl.append((const char*)&footer, sizeof(footer));
239   } else {
240     ceph_msg_footer_old old_footer;
241     if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
242       old_footer.front_crc = footer.front_crc;
243       old_footer.middle_crc = footer.middle_crc;
244     } else {
245       old_footer.front_crc = old_footer.middle_crc = 0;
246     }
247     if (messenger.get_crc_flags() & MSG_CRC_DATA) {
248       old_footer.data_crc = footer.data_crc;
249     } else {
250       old_footer.data_crc = 0;
251     }
252     old_footer.flags = footer.flags;
253     bl.append((const char*)&old_footer, sizeof(old_footer));
254   }
255   // write as a seastar::net::packet
256   return socket->write_flush(std::move(bl))
257     .then([this, msg = std::move(msg)] {
258       if (!policy.lossy) {
259         sent.push(std::move(msg));
260       }
261     });
262 }
263
264 seastar::future<> SocketConnection::send(MessageRef msg)
265 {
266   // chain the message after the last message is sent
267   seastar::shared_future<> f = send_ready.then(
268     [this, msg = std::move(msg)] {
269       return write_message(std::move(msg));
270     });
271
272   // chain any later messages after this one completes
273   send_ready = f.get_future();
274   // allow the caller to wait on the same future
275   return f.get_future();
276 }
277
278 seastar::future<> SocketConnection::keepalive()
279 {
280   seastar::shared_future<> f = send_ready.then([this] {
281       k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
282         ceph::coarse_real_clock::now());
283       return socket->write_flush(make_static_packet(k.req));
284     });
285   send_ready = f.get_future();
286   return f.get_future();
287 }
288
289 seastar::future<> SocketConnection::close()
290 {
291   if (state == state_t::closing) {
292     // already closing
293     assert(close_ready.valid());
294     return close_ready.get_future();
295   }
296
297   // unregister_conn() drops a reference, so hold another until completion
298   auto cleanup = [conn = SocketConnectionRef(this)] {};
299
300   if (state == state_t::accepting) {
301     messenger.unaccept_conn(this);
302   } else if (state >= state_t::connecting && state < state_t::closing) {
303     messenger.unregister_conn(this);
304   } else {
305     // cannot happen
306     ceph_assert(false);
307   }
308
309   // close_ready become valid only after state is state_t::closing
310   assert(!close_ready.valid());
311
312   if (socket) {
313     close_ready = socket->close().finally(std::move(cleanup));
314   } else {
315     ceph_assert(state == state_t::connecting);
316     close_ready = seastar::now();
317   }
318   state = state_t::closing;
319   return close_ready.get_future();
320 }
321
322 // handshake
323
324 /// store the banner in a non-const string for buffer::create_static()
325 static char banner[] = CEPH_BANNER;
326 constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
327
328 constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
329 constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
330
331 WRITE_RAW_ENCODER(ceph_msg_connect);
332 WRITE_RAW_ENCODER(ceph_msg_connect_reply);
333
334 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
335 {
336   return out << "connect{features=" << std::hex << c.features << std::dec
337       << " host_type=" << c.host_type
338       << " global_seq=" << c.global_seq
339       << " connect_seq=" << c.connect_seq
340       << " protocol_version=" << c.protocol_version
341       << " authorizer_protocol=" << c.authorizer_protocol
342       << " authorizer_len=" << c.authorizer_len
343       << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
344 }
345
346 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
347 {
348   return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
349       << " features=" << std::hex << r.features << std::dec
350       << " global_seq=" << r.global_seq
351       << " connect_seq=" << r.connect_seq
352       << " protocol_version=" << r.protocol_version
353       << " authorizer_len=" << r.authorizer_len
354       << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
355 }
356
357 // check that the buffer starts with a valid banner without requiring it to
358 // be contiguous in memory
359 static void validate_banner(bufferlist::const_iterator& p)
360 {
361   auto b = std::cbegin(banner);
362   auto end = b + banner_size;
363   while (b != end) {
364     const char *buf{nullptr};
365     auto remaining = std::distance(b, end);
366     auto len = p.get_ptr_and_advance(remaining, &buf);
367     if (!std::equal(buf, buf + len, b)) {
368       throw std::system_error(make_error_code(error::bad_connect_banner));
369     }
370     b += len;
371   }
372 }
373
374 // make sure that we agree with the peer about its address
375 static void validate_peer_addr(const entity_addr_t& addr,
376                                const entity_addr_t& expected)
377 {
378   if (addr == expected) {
379     return;
380   }
381   // ok if server bound anonymously, as long as port/nonce match
382   if (addr.is_blank_ip() &&
383       addr.get_port() == expected.get_port() &&
384       addr.get_nonce() == expected.get_nonce()) {
385     return;
386   } else {
387     throw std::system_error(make_error_code(error::bad_peer_address));
388   }
389 }
390
391 /// return a static bufferptr to the given object
392 template <typename T>
393 bufferptr create_static(T& obj)
394 {
395   return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
396 }
397
398 bool SocketConnection::require_auth_feature() const
399 {
400   if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
401     return false;
402   }
403   if (conf.cephx_require_signatures) {
404     return true;
405   }
406   if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
407       h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
408     return conf.cephx_cluster_require_signatures;
409   } else {
410     return conf.cephx_service_require_signatures;
411   }
412 }
413
414 uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const
415 {
416   constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
417   // see also OSD.h, unlike other connection of simple/async messenger,
418   // crimson msgr is only used by osd
419   constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
420   if (peer_type == my_type) {
421     // internal
422     return CEPH_OSD_PROTOCOL;
423   } else {
424     // public
425     switch (connect ? peer_type : my_type) {
426       case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
427       case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
428       case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
429       default: return 0;
430     }
431   }
432 }
433
434 seastar::future<>
435 SocketConnection::repeat_handle_connect()
436 {
437   return socket->read(sizeof(h.connect))
438     .then([this](bufferlist bl) {
439       auto p = bl.cbegin();
440       ::decode(h.connect, p);
441       peer_type = h.connect.host_type;
442       return socket->read(h.connect.authorizer_len);
443     }).then([this] (bufferlist authorizer) {
444       if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
445         return seastar::make_ready_future<msgr_tag_t, bufferlist>(
446             CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
447       }
448       if (require_auth_feature()) {
449         policy.features_required |= CEPH_FEATURE_MSG_AUTH;
450       }
451       if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features;
452           feat_missing != 0) {
453         return seastar::make_ready_future<msgr_tag_t, bufferlist>(
454             CEPH_MSGR_TAG_FEATURES, bufferlist{});
455       }
456       return messenger.verify_authorizer(peer_type,
457                                          h.connect.authorizer_protocol,
458                                          authorizer);
459     }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
460       memset(&h.reply, 0, sizeof(h.reply));
461       if (tag) {
462         return send_connect_reply(tag, std::move(authorizer_reply));
463       }
464       if (auto existing = messenger.lookup_conn(peer_addr); existing) {
465         return handle_connect_with_existing(existing, std::move(authorizer_reply));
466       } else if (h.connect.connect_seq > 0) {
467         return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
468                                   std::move(authorizer_reply));
469       }
470       h.connect_seq = h.connect.connect_seq + 1;
471       h.peer_global_seq = h.connect.global_seq;
472       set_features((uint64_t)policy.features_supported & (uint64_t)h.connect.features);
473       // TODO: cct
474       return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
475     });
476 }
477
478 seastar::future<>
479 SocketConnection::send_connect_reply(msgr_tag_t tag,
480                                      bufferlist&& authorizer_reply)
481 {
482   h.reply.tag = tag;
483   h.reply.features = static_cast<uint64_t>((h.connect.features &
484                                             policy.features_supported) |
485                                            policy.features_required);
486   h.reply.authorizer_len = authorizer_reply.length();
487   return socket->write(make_static_packet(h.reply))
488     .then([this, reply=std::move(authorizer_reply)]() mutable {
489       return socket->write_flush(std::move(reply));
490     });
491 }
492
493 seastar::future<>
494 SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
495                                            bufferlist&& authorizer_reply)
496 {
497   h.global_seq = messenger.get_global_seq();
498   h.reply.tag = tag;
499   h.reply.features = policy.features_supported;
500   h.reply.global_seq = h.global_seq;
501   h.reply.connect_seq = h.connect_seq;
502   h.reply.flags = 0;
503   if (policy.lossy) {
504     h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
505   }
506   h.reply.authorizer_len = authorizer_reply.length();
507   return socket->write(make_static_packet(h.reply))
508     .then([this, reply=std::move(authorizer_reply)]() mutable {
509       if (reply.length()) {
510         return socket->write(std::move(reply));
511       } else {
512         return seastar::now();
513       }
514     }).then([this] {
515       if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
516         return socket->write_flush(make_static_packet(in_seq))
517           .then([this] {
518             return socket->read_exactly(sizeof(seq_num_t));
519           }).then([this] (auto buf) {
520             auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
521             discard_up_to(&out_q, *acked_seq);
522           });
523       } else {
524         return socket->flush();
525       }
526     }).then([this] {
527       messenger.register_conn(this);
528       messenger.unaccept_conn(this);
529       state = state_t::open;
530     });
531 }
532
533 seastar::future<>
534 SocketConnection::handle_keepalive2()
535 {
536   return socket->read_exactly(sizeof(ceph_timespec))
537     .then([this] (auto buf) {
538       k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
539       std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
540       return socket->write_flush(make_static_packet(k.ack));
541     });
542 }
543
544 seastar::future<>
545 SocketConnection::handle_keepalive2_ack()
546 {
547   return socket->read_exactly(sizeof(ceph_timespec))
548     .then([this] (auto buf) {
549       auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
550       k.ack_stamp = *t;
551       std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
552     });
553 }
554
555 seastar::future<>
556 SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply)
557 {
558   if (h.connect.global_seq < existing->peer_global_seq()) {
559     h.reply.global_seq = existing->peer_global_seq();
560     return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
561   } else if (existing->is_lossy()) {
562     return replace_existing(existing, std::move(authorizer_reply));
563   } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) {
564     return replace_existing(existing, std::move(authorizer_reply), true);
565   } else if (h.connect.connect_seq < existing->connect_seq()) {
566     // old attempt, or we sent READY but they didn't get it.
567     h.reply.connect_seq = existing->connect_seq() + 1;
568     return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
569   } else if (h.connect.connect_seq == existing->connect_seq()) {
570     // if the existing connection successfully opened, and/or
571     // subsequently went to standby, then the peer should bump
572     // their connect_seq and retry: this is not a connection race
573     // we need to resolve here.
574     if (existing->get_state() == state_t::open ||
575         existing->get_state() == state_t::standby) {
576       if (policy.resetcheck && existing->connect_seq() == 0) {
577         return replace_existing(existing, std::move(authorizer_reply));
578       } else {
579         h.reply.connect_seq = existing->connect_seq() + 1;
580         return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
581       }
582     } else if (peer_addr < my_addr ||
583                existing->is_server_side()) {
584       // incoming wins
585       return replace_existing(existing, std::move(authorizer_reply));
586     } else {
587       return send_connect_reply(CEPH_MSGR_TAG_WAIT);
588     }
589   } else if (policy.resetcheck &&
590              existing->connect_seq() == 0) {
591     return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
592   } else {
593     return replace_existing(existing, std::move(authorizer_reply));
594   }
595 }
596
597 seastar::future<> SocketConnection::replace_existing(SocketConnectionRef existing,
598                                                      bufferlist&& authorizer_reply,
599                                                      bool is_reset_from_peer)
600 {
601   msgr_tag_t reply_tag;
602   if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
603       !is_reset_from_peer) {
604     reply_tag = CEPH_MSGR_TAG_SEQ;
605   } else {
606     reply_tag = CEPH_MSGR_TAG_READY;
607   }
608   messenger.unregister_conn(existing);
609   if (!existing->is_lossy()) {
610     // reset the in_seq if this is a hard reset from peer,
611     // otherwise we respect our original connection's value
612     in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
613     // steal outgoing queue and out_seq
614     existing->requeue_sent();
615     std::tie(out_seq, out_q) = existing->get_out_queue();
616   }
617   return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
618 }
619
620 seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
621 {
622   switch (tag) {
623   case CEPH_MSGR_TAG_FEATURES:
624     return fault();
625   case CEPH_MSGR_TAG_BADPROTOVER:
626     return fault();
627   case CEPH_MSGR_TAG_BADAUTHORIZER:
628     if (h.got_bad_auth) {
629       logger().error("{} got bad authorizer", __func__);
630       throw std::system_error(make_error_code(error::negotiation_failure));
631     }
632     h.got_bad_auth = true;
633     // try harder
634     return messenger.get_authorizer(peer_type, true)
635       .then([this](auto&& auth) {
636         h.authorizer = std::move(auth);
637         return seastar::now();
638       });
639   case CEPH_MSGR_TAG_RESETSESSION:
640     reset_session();
641     return seastar::now();
642   case CEPH_MSGR_TAG_RETRY_GLOBAL:
643     h.global_seq = messenger.get_global_seq(h.reply.global_seq);
644     return seastar::now();
645   case CEPH_MSGR_TAG_RETRY_SESSION:
646     ceph_assert(h.reply.connect_seq > h.connect_seq);
647     h.connect_seq = h.reply.connect_seq;
648     return seastar::now();
649   case CEPH_MSGR_TAG_WAIT:
650     return fault();
651   case CEPH_MSGR_TAG_SEQ:
652     break;
653   case CEPH_MSGR_TAG_READY:
654     break;
655   }
656   if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
657       missing) {
658     return fault();
659   }
660   if (tag == CEPH_MSGR_TAG_SEQ) {
661     return socket->read_exactly(sizeof(seq_num_t))
662       .then([this] (auto buf) {
663         auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
664         discard_up_to(&out_q, *acked_seq);
665         return socket->write_flush(make_static_packet(in_seq));
666       }).then([this] {
667         return handle_connect_reply(CEPH_MSGR_TAG_READY);
668       });
669   }
670   if (tag == CEPH_MSGR_TAG_READY) {
671     // hooray!
672     h.peer_global_seq = h.reply.global_seq;
673     policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
674     state = state_t::open;
675     h.connect_seq++;
676     h.backoff = 0ms;
677     set_features(h.reply.features & h.connect.features);
678     if (h.authorizer) {
679       session_security.reset(
680           get_auth_session_handler(nullptr,
681                                    h.authorizer->protocol,
682                                    h.authorizer->session_key,
683                                    features));
684     }
685     h.authorizer.reset();
686     return seastar::now();
687   } else {
688     // unknown tag
689     logger().error("{} got unknown tag", __func__, int(tag));
690     throw std::system_error(make_error_code(error::negotiation_failure));
691   }
692 }
693
694 void SocketConnection::reset_session()
695 {
696   decltype(out_q){}.swap(out_q);
697   decltype(sent){}.swap(sent);
698   in_seq = 0;
699   h.connect_seq = 0;
700   if (HAVE_FEATURE(features, MSG_AUTH)) {
701     // Set out_seq to a random value, so CRC won't be predictable.
702     // Constant to limit starting sequence number to 2^31.  Nothing special
703     // about it, just a big number.
704     constexpr uint64_t SEQ_MASK = 0x7fffffff;
705     out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
706   } else {
707     // previously, seq #'s always started at 0.
708     out_seq = 0;
709   }
710 }
711
712 seastar::future<> SocketConnection::repeat_connect()
713 {
714   // encode ceph_msg_connect
715   memset(&h.connect, 0, sizeof(h.connect));
716   h.connect.features = policy.features_supported;
717   h.connect.host_type = messenger.get_myname().type();
718   h.connect.global_seq = h.global_seq;
719   h.connect.connect_seq = h.connect_seq;
720   h.connect.protocol_version = get_proto_version(peer_type, true);
721   // this is fyi, actually, server decides!
722   h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
723
724   return messenger.get_authorizer(peer_type, false)
725     .then([this](auto&& auth) {
726       h.authorizer = std::move(auth);
727       bufferlist bl;
728       if (h.authorizer) {
729         h.connect.authorizer_protocol = h.authorizer->protocol;
730         h.connect.authorizer_len = h.authorizer->bl.length();
731         bl.append(create_static(h.connect));
732         bl.append(h.authorizer->bl);
733       } else {
734         h.connect.authorizer_protocol = 0;
735         h.connect.authorizer_len = 0;
736         bl.append(create_static(h.connect));
737       };
738       return socket->write_flush(std::move(bl));
739     }).then([this] {
740      // read the reply
741       return socket->read(sizeof(h.reply));
742     }).then([this] (bufferlist bl) {
743       auto p = bl.cbegin();
744       ::decode(h.reply, p);
745       ceph_assert(p.end());
746       return socket->read(h.reply.authorizer_len);
747     }).then([this] (bufferlist bl) {
748       if (h.authorizer) {
749         auto reply = bl.cbegin();
750         if (!h.authorizer->verify_reply(reply)) {
751           logger().error("{} authorizer failed to verify reply", __func__);
752           throw std::system_error(make_error_code(error::negotiation_failure));
753         }
754       }
755       return handle_connect_reply(h.reply.tag);
756     });
757 }
758
759 seastar::future<>
760 SocketConnection::start_connect(const entity_addr_t& _peer_addr,
761                                 const entity_type_t& _peer_type)
762 {
763   ceph_assert(state == state_t::none);
764   ceph_assert(!socket);
765   peer_addr = _peer_addr;
766   peer_type = _peer_type;
767   messenger.register_conn(this);
768   state = state_t::connecting;
769   return seastar::connect(peer_addr.in4_addr())
770     .then([this](seastar::connected_socket fd) {
771       if (state == state_t::closing) {
772         fd.shutdown_input();
773         fd.shutdown_output();
774         throw std::system_error(make_error_code(error::connection_aborted));
775       }
776       socket.emplace(std::move(fd));
777       // read server's handshake header
778       return socket->read(server_header_size);
779     }).then([this] (bufferlist headerbl) {
780       auto p = headerbl.cbegin();
781       validate_banner(p);
782       entity_addr_t saddr, caddr;
783       ::decode(saddr, p);
784       ::decode(caddr, p);
785       ceph_assert(p.end());
786       validate_peer_addr(saddr, peer_addr);
787
788       if (my_addr != caddr) {
789         // take peer's address for me, but preserve my nonce
790         caddr.nonce = my_addr.nonce;
791         my_addr = caddr;
792       }
793       // encode/send client's handshake header
794       bufferlist bl;
795       bl.append(buffer::create_static(banner_size, banner));
796       ::encode(my_addr, bl, 0);
797       h.global_seq = messenger.get_global_seq();
798       return socket->write_flush(std::move(bl));
799     }).then([=] {
800       return seastar::do_until([=] { return state == state_t::open; },
801                                [=] { return repeat_connect(); });
802     }).then([this] {
803       // start background processing of tags
804       read_tags_until_next_message();
805     }).then_wrapped([this] (auto fut) {
806       // satisfy the handshake's promise
807       fut.forward_to(std::move(h.promise));
808     });
809 }
810
811 seastar::future<>
812 SocketConnection::start_accept(seastar::connected_socket&& fd,
813                                const entity_addr_t& _peer_addr)
814 {
815   ceph_assert(state == state_t::none);
816   ceph_assert(!socket);
817   peer_addr = _peer_addr;
818   socket.emplace(std::move(fd));
819   messenger.accept_conn(this);
820   state = state_t::accepting;
821   // encode/send server's handshake header
822   bufferlist bl;
823   bl.append(buffer::create_static(banner_size, banner));
824   ::encode(my_addr, bl, 0);
825   ::encode(peer_addr, bl, 0);
826   return socket->write_flush(std::move(bl))
827     .then([this] {
828       // read client's handshake header and connect request
829       return socket->read(client_header_size);
830     }).then([this] (bufferlist bl) {
831       auto p = bl.cbegin();
832       validate_banner(p);
833       entity_addr_t addr;
834       ::decode(addr, p);
835       ceph_assert(p.end());
836       if (!addr.is_blank_ip()) {
837         peer_addr = addr;
838       }
839     }).then([this] {
840       return seastar::do_until([this] { return state == state_t::open; },
841                                [this] { return repeat_handle_connect(); });
842     }).then([this] {
843       // start background processing of tags
844       read_tags_until_next_message();
845     }).then_wrapped([this] (auto fut) {
846       // satisfy the handshake's promise
847       fut.forward_to(std::move(h.promise));
848     });
849 }
850
851 seastar::future<> SocketConnection::fault()
852 {
853   if (policy.lossy) {
854     messenger.unregister_conn(this);
855   }
856   if (h.backoff.count()) {
857     h.backoff += h.backoff;
858   } else {
859     h.backoff = conf.ms_initial_backoff;
860   }
861   if (h.backoff > conf.ms_max_backoff) {
862     h.backoff = conf.ms_max_backoff;
863   }
864   return seastar::sleep(h.backoff);
865 }