5ebb9232be3ee20827d0a9a0dd1b5bfe97c2e973
[ceph.git] / src / crimson / net / SocketConnection.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2017 Red Hat, Inc
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "SocketConnection.h"
16
17 #include <algorithm>
18 #include <seastar/core/shared_future.hh>
19 #include <seastar/core/sleep.hh>
20 #include <seastar/net/packet.hh>
21
22 #include "include/msgr.h"
23 #include "include/random.h"
24 #include "auth/Auth.h"
25 #include "auth/AuthSessionHandler.h"
26
27 #include "crimson/common/log.h"
28 #include "Config.h"
29 #include "Errors.h"
30 #include "SocketMessenger.h"
31
32 using namespace ceph::net;
33
34 template <typename T>
35 seastar::net::packet make_static_packet(const T& value) {
36     return { reinterpret_cast<const char*>(&value), sizeof(value) };
37 }
38
39 namespace {
40   seastar::logger& logger() {
41     return ceph::get_logger(ceph_subsys_ms);
42   }
43 }
44
45 SocketConnection::SocketConnection(SocketMessenger& messenger,
46                                    const entity_addr_t& my_addr)
47   : Connection(my_addr),
48     messenger(messenger),
49     send_ready(h.promise.get_future())
50 {
51 }
52
53 SocketConnection::~SocketConnection()
54 {
55   // errors were reported to callers of send()
56   ceph_assert(send_ready.available());
57   send_ready.ignore_ready_future();
58 }
59
60 ceph::net::Messenger*
61 SocketConnection::get_messenger() const {
62   return &messenger;
63 }
64
65 bool SocketConnection::is_connected()
66 {
67   return !send_ready.failed();
68 }
69
70 void SocketConnection::read_tags_until_next_message()
71 {
72   seastar::repeat([this] {
73       // read the next tag
74       return socket->read_exactly(1)
75         .then([this] (auto buf) {
76           if (buf.empty()) {
77             throw std::system_error(make_error_code(error::read_eof));
78           }
79           switch (buf[0]) {
80           case CEPH_MSGR_TAG_MSG:
81             // stop looping and notify read_header()
82             return seastar::make_ready_future<stop_t>(stop_t::yes);
83           case CEPH_MSGR_TAG_ACK:
84             return handle_ack();
85           case CEPH_MSGR_TAG_KEEPALIVE:
86             break;
87           case CEPH_MSGR_TAG_KEEPALIVE2:
88             return handle_keepalive2()
89               .then([this] { return stop_t::no; });
90           case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
91             return handle_keepalive2_ack()
92               .then([this] { return stop_t::no; });
93           case CEPH_MSGR_TAG_CLOSE:
94             std::cout << "close" << std::endl;
95             break;
96           }
97           return seastar::make_ready_future<stop_t>(stop_t::no);
98         });
99     }).handle_exception_type([this] (const std::system_error& e) {
100       if (e.code() == error::read_eof) {
101         close();
102       }
103       throw e;
104     }).then_wrapped([this] (auto fut) {
105       // satisfy the message promise
106       fut.forward_to(std::move(on_message));
107     });
108 }
109
110 seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
111 {
112   return socket->read_exactly(sizeof(ceph_le64))
113     .then([this] (auto buf) {
114       auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
115       discard_up_to(&sent, *seq);
116       return stop_t::no;
117     });
118 }
119
120 void SocketConnection::discard_up_to(std::queue<MessageRef>* queue,
121                                      seq_num_t seq)
122 {
123   while (!queue->empty() &&
124          queue->front()->get_seq() < seq) {
125     queue->pop();
126   }
127 }
128
129 void SocketConnection::requeue_sent()
130 {
131   out_seq -= sent.size();
132   while (!sent.empty()) {
133     auto m = sent.front();
134     sent.pop();
135     out_q.push(std::move(m));
136   }
137 }
138
139 seastar::future<> SocketConnection::maybe_throttle()
140 {
141   if (!policy.throttler_bytes) {
142     return seastar::now();
143   }
144   const auto to_read = (m.header.front_len +
145                         m.header.middle_len +
146                         m.header.data_len);
147   return policy.throttler_bytes->get(to_read);
148 }
149
150 seastar::future<MessageRef> SocketConnection::do_read_message()
151 {
152   return on_message.get_future()
153     .then([this] {
154       on_message = seastar::promise<>{};
155       // read header
156       return socket->read(sizeof(m.header));
157     }).then([this] (bufferlist bl) {
158       // throttle the traffic, maybe
159       auto p = bl.cbegin();
160       ::decode(m.header, p);
161       return maybe_throttle();
162     }).then([this] {
163       // read front
164       return socket->read(m.header.front_len);
165     }).then([this] (bufferlist bl) {
166       m.front = std::move(bl);
167       // read middle
168       return socket->read(m.header.middle_len);
169     }).then([this] (bufferlist bl) {
170       m.middle = std::move(bl);
171       // read data
172       return socket->read(m.header.data_len);
173     }).then([this] (bufferlist bl) {
174       m.data = std::move(bl);
175       // read footer
176       return socket->read(sizeof(m.footer));
177     }).then([this] (bufferlist bl) {
178       // resume background processing of tags
179       read_tags_until_next_message();
180
181       auto p = bl.cbegin();
182       ::decode(m.footer, p);
183       auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
184                                   m.front, m.middle, m.data, nullptr);
185       // TODO: set time stamps
186       msg->set_byte_throttler(policy.throttler_bytes);
187       constexpr bool add_ref = false; // Message starts with 1 ref
188       return MessageRef{msg, add_ref};
189     });
190 }
191
192 seastar::future<MessageRef> SocketConnection::read_message()
193 {
194   return seastar::repeat_until_value([this] {
195       return do_read_message()
196         .then([this] (MessageRef msg) -> std::optional<MessageRef> {
197           if (!update_rx_seq(msg->get_seq())) {
198             // skip this request and read the next
199             return {};
200           }
201           return msg;
202         });
203     });
204 }
205
206 bool SocketConnection::update_rx_seq(seq_num_t seq)
207 {
208   if (seq <= in_seq) {
209     if (HAVE_FEATURE(features, RECONNECT_SEQ) &&
210         conf.ms_die_on_old_message) {
211       ceph_abort_msg("old msgs despite reconnect_seq feature");
212     }
213     return false;
214   } else if (seq > in_seq + 1) {
215     if (conf.ms_die_on_skipped_message) {
216       ceph_abort_msg("skipped incoming seq");
217     }
218     return false;
219   } else {
220     in_seq = seq;
221     return true;
222   }
223 }
224
225 seastar::future<> SocketConnection::write_message(MessageRef msg)
226 {
227   msg->set_seq(++out_seq);
228   msg->encode(features, messenger.get_crc_flags());
229   bufferlist bl;
230   bl.append(CEPH_MSGR_TAG_MSG);
231   auto& header = msg->get_header();
232   bl.append((const char*)&header, sizeof(header));
233   bl.append(msg->get_payload());
234   bl.append(msg->get_middle());
235   bl.append(msg->get_data());
236   auto& footer = msg->get_footer();
237   if (HAVE_FEATURE(features, MSG_AUTH)) {
238     bl.append((const char*)&footer, sizeof(footer));
239   } else {
240     ceph_msg_footer_old old_footer;
241     if (messenger.get_crc_flags() & MSG_CRC_HEADER) {
242       old_footer.front_crc = footer.front_crc;
243       old_footer.middle_crc = footer.middle_crc;
244     } else {
245       old_footer.front_crc = old_footer.middle_crc = 0;
246     }
247     if (messenger.get_crc_flags() & MSG_CRC_DATA) {
248       old_footer.data_crc = footer.data_crc;
249     } else {
250       old_footer.data_crc = 0;
251     }
252     old_footer.flags = footer.flags;
253     bl.append((const char*)&old_footer, sizeof(old_footer));
254   }
255   // write as a seastar::net::packet
256   return socket->write_flush(std::move(bl))
257     .then([this, msg = std::move(msg)] {
258       if (!policy.lossy) {
259         sent.push(std::move(msg));
260       }
261     });
262 }
263
264 seastar::future<> SocketConnection::send(MessageRef msg)
265 {
266   // chain the message after the last message is sent
267   seastar::shared_future<> f = send_ready.then(
268     [this, msg = std::move(msg)] {
269       return write_message(std::move(msg));
270     });
271
272   // chain any later messages after this one completes
273   send_ready = f.get_future();
274   // allow the caller to wait on the same future
275   return f.get_future();
276 }
277
278 seastar::future<> SocketConnection::keepalive()
279 {
280   seastar::shared_future<> f = send_ready.then([this] {
281       k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
282         ceph::coarse_real_clock::now());
283       return socket->write_flush(make_static_packet(k.req));
284     });
285   send_ready = f.get_future();
286   return f.get_future();
287 }
288
289 seastar::future<> SocketConnection::close()
290 {
291   if (state == state_t::closing) {
292     // already closing
293     assert(close_ready.valid());
294     return close_ready.get_future();
295   }
296
297   // unregister_conn() drops a reference, so hold another until completion
298   auto cleanup = [conn = SocketConnectionRef(this)] {};
299
300   if (state == state_t::accepting) {
301     messenger.unaccept_conn(this);
302   } else if (state >= state_t::connecting && state < state_t::closing) {
303     messenger.unregister_conn(this);
304   } else {
305     // cannot happen
306     ceph_assert(false);
307   }
308
309   // close_ready become valid only after state is state_t::closing
310   assert(!close_ready.valid());
311
312   if (socket) {
313     close_ready = socket->close().finally(std::move(cleanup));
314   } else {
315     ceph_assert(state == state_t::connecting);
316     close_ready = seastar::now();
317   }
318   state = state_t::closing;
319   return close_ready.get_future();
320 }
321
322 // handshake
323
324 /// store the banner in a non-const string for buffer::create_static()
325 static char banner[] = CEPH_BANNER;
326 constexpr size_t banner_size = sizeof(CEPH_BANNER)-1;
327
328 constexpr size_t client_header_size = banner_size + sizeof(ceph_entity_addr);
329 constexpr size_t server_header_size = banner_size + 2 * sizeof(ceph_entity_addr);
330
331 WRITE_RAW_ENCODER(ceph_msg_connect);
332 WRITE_RAW_ENCODER(ceph_msg_connect_reply);
333
334 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect& c)
335 {
336   return out << "connect{features=" << std::hex << c.features << std::dec
337       << " host_type=" << c.host_type
338       << " global_seq=" << c.global_seq
339       << " connect_seq=" << c.connect_seq
340       << " protocol_version=" << c.protocol_version
341       << " authorizer_protocol=" << c.authorizer_protocol
342       << " authorizer_len=" << c.authorizer_len
343       << " flags=" << std::hex << static_cast<uint16_t>(c.flags) << std::dec << '}';
344 }
345
346 std::ostream& operator<<(std::ostream& out, const ceph_msg_connect_reply& r)
347 {
348   return out << "connect_reply{tag=" << static_cast<uint16_t>(r.tag)
349       << " features=" << std::hex << r.features << std::dec
350       << " global_seq=" << r.global_seq
351       << " connect_seq=" << r.connect_seq
352       << " protocol_version=" << r.protocol_version
353       << " authorizer_len=" << r.authorizer_len
354       << " flags=" << std::hex << static_cast<uint16_t>(r.flags) << std::dec << '}';
355 }
356
357 // check that the buffer starts with a valid banner without requiring it to
358 // be contiguous in memory
359 static void validate_banner(bufferlist::const_iterator& p)
360 {
361   auto b = std::cbegin(banner);
362   auto end = b + banner_size;
363   while (b != end) {
364     const char *buf{nullptr};
365     auto remaining = std::distance(b, end);
366     auto len = p.get_ptr_and_advance(remaining, &buf);
367     if (!std::equal(buf, buf + len, b)) {
368       throw std::system_error(make_error_code(error::bad_connect_banner));
369     }
370     b += len;
371   }
372 }
373
374 // make sure that we agree with the peer about its address
375 static void validate_peer_addr(const entity_addr_t& addr,
376                                const entity_addr_t& expected)
377 {
378   if (addr == expected) {
379     return;
380   }
381   // ok if server bound anonymously, as long as port/nonce match
382   if (addr.is_blank_ip() &&
383       addr.get_port() == expected.get_port() &&
384       addr.get_nonce() == expected.get_nonce()) {
385     return;
386   } else {
387     throw std::system_error(make_error_code(error::bad_peer_address));
388   }
389 }
390
391 /// return a static bufferptr to the given object
392 template <typename T>
393 bufferptr create_static(T& obj)
394 {
395   return buffer::create_static(sizeof(obj), reinterpret_cast<char*>(&obj));
396 }
397
398 bool SocketConnection::require_auth_feature() const
399 {
400   if (h.connect.authorizer_protocol != CEPH_AUTH_CEPHX) {
401     return false;
402   }
403   if (conf.cephx_require_signatures) {
404     return true;
405   }
406   if (h.connect.host_type == CEPH_ENTITY_TYPE_OSD ||
407       h.connect.host_type == CEPH_ENTITY_TYPE_MDS) {
408     return conf.cephx_cluster_require_signatures;
409   } else {
410     return conf.cephx_service_require_signatures;
411   }
412 }
413
414 uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool connect) const
415 {
416   constexpr entity_type_t my_type = CEPH_ENTITY_TYPE_OSD;
417   // see also OSD.h, unlike other connection of simple/async messenger,
418   // crimson msgr is only used by osd
419   constexpr uint32_t CEPH_OSD_PROTOCOL = 10;
420   if (peer_type == my_type) {
421     // internal
422     return CEPH_OSD_PROTOCOL;
423   } else {
424     // public
425     switch (connect ? peer_type : my_type) {
426       case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
427       case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
428       case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
429       default: return 0;
430     }
431   }
432 }
433
434 seastar::future<seastar::stop_iteration>
435 SocketConnection::repeat_handle_connect()
436 {
437   return socket->read(sizeof(h.connect))
438     .then([this](bufferlist bl) {
439       auto p = bl.cbegin();
440       ::decode(h.connect, p);
441       peer_type = h.connect.host_type;
442       return socket->read(h.connect.authorizer_len);
443     }).then([this] (bufferlist authorizer) {
444       if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
445         return seastar::make_ready_future<msgr_tag_t, bufferlist>(
446             CEPH_MSGR_TAG_BADPROTOVER, bufferlist{});
447       }
448       if (require_auth_feature()) {
449         policy.features_required |= CEPH_FEATURE_MSG_AUTH;
450       }
451       if (auto feat_missing = policy.features_required & ~(uint64_t)h.connect.features;
452           feat_missing != 0) {
453         return seastar::make_ready_future<msgr_tag_t, bufferlist>(
454             CEPH_MSGR_TAG_FEATURES, bufferlist{});
455       }
456       return messenger.verify_authorizer(peer_type,
457                                          h.connect.authorizer_protocol,
458                                          authorizer);
459     }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
460       memset(&h.reply, 0, sizeof(h.reply));
461       if (tag) {
462         return send_connect_reply(tag, std::move(authorizer_reply));
463       }
464       if (auto existing = messenger.lookup_conn(peer_addr); existing) {
465         return handle_connect_with_existing(existing, std::move(authorizer_reply));
466       } else if (h.connect.connect_seq > 0) {
467         return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION,
468                                   std::move(authorizer_reply));
469       }
470       h.connect_seq = h.connect.connect_seq + 1;
471       h.peer_global_seq = h.connect.global_seq;
472       set_features((uint64_t)policy.features_supported & (uint64_t)h.connect.features);
473       // TODO: cct
474       return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
475     });
476 }
477
478 seastar::future<seastar::stop_iteration>
479 SocketConnection::send_connect_reply(msgr_tag_t tag,
480                                      bufferlist&& authorizer_reply)
481 {
482   h.reply.tag = tag;
483   h.reply.features = static_cast<uint64_t>((h.connect.features &
484                                             policy.features_supported) |
485                                            policy.features_required);
486   h.reply.authorizer_len = authorizer_reply.length();
487   return socket->write(make_static_packet(h.reply))
488     .then([this, reply=std::move(authorizer_reply)]() mutable {
489       return socket->write_flush(std::move(reply));
490     }).then([] {
491       return stop_t::no;
492     });
493 }
494
495 seastar::future<seastar::stop_iteration>
496 SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
497                                            bufferlist&& authorizer_reply)
498 {
499   h.global_seq = messenger.get_global_seq();
500   h.reply.tag = tag;
501   h.reply.features = policy.features_supported;
502   h.reply.global_seq = h.global_seq;
503   h.reply.connect_seq = h.connect_seq;
504   h.reply.flags = 0;
505   if (policy.lossy) {
506     h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
507   }
508   h.reply.authorizer_len = authorizer_reply.length();
509   return socket->write(make_static_packet(h.reply))
510     .then([this, reply=std::move(authorizer_reply)]() mutable {
511       if (reply.length()) {
512         return socket->write(std::move(reply));
513       } else {
514         return seastar::now();
515       }
516     }).then([this] {
517       if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
518         return socket->write_flush(make_static_packet(in_seq))
519           .then([this] {
520             return socket->read_exactly(sizeof(seq_num_t));
521           }).then([this] (auto buf) {
522             auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
523             discard_up_to(&out_q, *acked_seq);
524           });
525       } else {
526         return socket->flush();
527       }
528     }).then([this] {
529       messenger.register_conn(this);
530       messenger.unaccept_conn(this);
531       return stop_t::yes;
532     });
533 }
534
535 seastar::future<>
536 SocketConnection::handle_keepalive2()
537 {
538   return socket->read_exactly(sizeof(ceph_timespec))
539     .then([this] (auto buf) {
540       k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
541       std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
542       return socket->write_flush(make_static_packet(k.ack));
543     });
544 }
545
546 seastar::future<>
547 SocketConnection::handle_keepalive2_ack()
548 {
549   return socket->read_exactly(sizeof(ceph_timespec))
550     .then([this] (auto buf) {
551       auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
552       k.ack_stamp = *t;
553       std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
554     });
555 }
556
557 seastar::future<seastar::stop_iteration>
558 SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply)
559 {
560   if (h.connect.global_seq < existing->peer_global_seq()) {
561     h.reply.global_seq = existing->peer_global_seq();
562     return send_connect_reply(CEPH_MSGR_TAG_RETRY_GLOBAL);
563   } else if (existing->is_lossy()) {
564     return replace_existing(existing, std::move(authorizer_reply));
565   } else if (h.connect.connect_seq == 0 && existing->connect_seq() > 0) {
566     return replace_existing(existing, std::move(authorizer_reply), true);
567   } else if (h.connect.connect_seq < existing->connect_seq()) {
568     // old attempt, or we sent READY but they didn't get it.
569     h.reply.connect_seq = existing->connect_seq() + 1;
570     return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
571   } else if (h.connect.connect_seq == existing->connect_seq()) {
572     // if the existing connection successfully opened, and/or
573     // subsequently went to standby, then the peer should bump
574     // their connect_seq and retry: this is not a connection race
575     // we need to resolve here.
576     if (existing->get_state() == state_t::open ||
577         existing->get_state() == state_t::standby) {
578       if (policy.resetcheck && existing->connect_seq() == 0) {
579         return replace_existing(existing, std::move(authorizer_reply));
580       } else {
581         h.reply.connect_seq = existing->connect_seq() + 1;
582         return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION);
583       }
584     } else if (peer_addr < my_addr ||
585                existing->is_server_side()) {
586       // incoming wins
587       return replace_existing(existing, std::move(authorizer_reply));
588     } else {
589       return send_connect_reply(CEPH_MSGR_TAG_WAIT);
590     }
591   } else if (policy.resetcheck &&
592              existing->connect_seq() == 0) {
593     return send_connect_reply(CEPH_MSGR_TAG_RESETSESSION);
594   } else {
595     return replace_existing(existing, std::move(authorizer_reply));
596   }
597 }
598
599 seastar::future<seastar::stop_iteration>
600 SocketConnection::replace_existing(SocketConnectionRef existing,
601                                    bufferlist&& authorizer_reply,
602                                    bool is_reset_from_peer)
603 {
604   msgr_tag_t reply_tag;
605   if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
606       !is_reset_from_peer) {
607     reply_tag = CEPH_MSGR_TAG_SEQ;
608   } else {
609     reply_tag = CEPH_MSGR_TAG_READY;
610   }
611   messenger.unregister_conn(existing);
612   if (!existing->is_lossy()) {
613     // reset the in_seq if this is a hard reset from peer,
614     // otherwise we respect our original connection's value
615     in_seq = is_reset_from_peer ? 0 : existing->rx_seq_num();
616     // steal outgoing queue and out_seq
617     existing->requeue_sent();
618     std::tie(out_seq, out_q) = existing->get_out_queue();
619   }
620   return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
621 }
622
623 seastar::future<seastar::stop_iteration>
624 SocketConnection::handle_connect_reply(msgr_tag_t tag)
625 {
626   switch (tag) {
627   case CEPH_MSGR_TAG_FEATURES:
628     logger().error("{} connect protocol feature mispatch", __func__);
629     throw std::system_error(make_error_code(error::negotiation_failure));
630   case CEPH_MSGR_TAG_BADPROTOVER:
631     logger().error("{} connect protocol version mispatch", __func__);
632     throw std::system_error(make_error_code(error::negotiation_failure));
633   case CEPH_MSGR_TAG_BADAUTHORIZER:
634     if (h.got_bad_auth) {
635       logger().error("{} got bad authorizer", __func__);
636       throw std::system_error(make_error_code(error::negotiation_failure));
637     }
638     h.got_bad_auth = true;
639     // try harder
640     return messenger.get_authorizer(peer_type, true)
641       .then([this](auto&& auth) {
642         h.authorizer = std::move(auth);
643         return stop_t::no;
644       });
645   case CEPH_MSGR_TAG_RESETSESSION:
646     reset_session();
647     return seastar::make_ready_future<stop_t>(stop_t::no);
648   case CEPH_MSGR_TAG_RETRY_GLOBAL:
649     h.global_seq = messenger.get_global_seq(h.reply.global_seq);
650     return seastar::make_ready_future<stop_t>(stop_t::no);
651   case CEPH_MSGR_TAG_RETRY_SESSION:
652     ceph_assert(h.reply.connect_seq > h.connect_seq);
653     h.connect_seq = h.reply.connect_seq;
654     return seastar::make_ready_future<stop_t>(stop_t::no);
655   case CEPH_MSGR_TAG_WAIT:
656     // TODO: state wait
657     throw std::system_error(make_error_code(error::negotiation_failure));
658   case CEPH_MSGR_TAG_SEQ:
659     break;
660   case CEPH_MSGR_TAG_READY:
661     break;
662   }
663   if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
664       missing) {
665     logger().error("{} missing required features", __func__);
666     throw std::system_error(make_error_code(error::negotiation_failure));
667   }
668   if (tag == CEPH_MSGR_TAG_SEQ) {
669     return socket->read_exactly(sizeof(seq_num_t))
670       .then([this] (auto buf) {
671         auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
672         discard_up_to(&out_q, *acked_seq);
673         return socket->write_flush(make_static_packet(in_seq));
674       }).then([this] {
675         return handle_connect_reply(CEPH_MSGR_TAG_READY);
676       });
677   }
678   if (tag == CEPH_MSGR_TAG_READY) {
679     // hooray!
680     h.peer_global_seq = h.reply.global_seq;
681     policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
682     state = state_t::open;
683     h.connect_seq++;
684     h.backoff = 0ms;
685     set_features(h.reply.features & h.connect.features);
686     if (h.authorizer) {
687       session_security.reset(
688           get_auth_session_handler(nullptr,
689                                    h.authorizer->protocol,
690                                    h.authorizer->session_key,
691                                    features));
692     }
693     h.authorizer.reset();
694     return seastar::make_ready_future<stop_t>(stop_t::yes);
695   } else {
696     // unknown tag
697     logger().error("{} got unknown tag", __func__, int(tag));
698     throw std::system_error(make_error_code(error::negotiation_failure));
699   }
700 }
701
702 void SocketConnection::reset_session()
703 {
704   decltype(out_q){}.swap(out_q);
705   decltype(sent){}.swap(sent);
706   in_seq = 0;
707   h.connect_seq = 0;
708   if (HAVE_FEATURE(features, MSG_AUTH)) {
709     // Set out_seq to a random value, so CRC won't be predictable.
710     // Constant to limit starting sequence number to 2^31.  Nothing special
711     // about it, just a big number.
712     constexpr uint64_t SEQ_MASK = 0x7fffffff;
713     out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
714   } else {
715     // previously, seq #'s always started at 0.
716     out_seq = 0;
717   }
718 }
719
720 seastar::future<seastar::stop_iteration>
721 SocketConnection::repeat_connect()
722 {
723   // encode ceph_msg_connect
724   memset(&h.connect, 0, sizeof(h.connect));
725   h.connect.features = policy.features_supported;
726   h.connect.host_type = messenger.get_myname().type();
727   h.connect.global_seq = h.global_seq;
728   h.connect.connect_seq = h.connect_seq;
729   h.connect.protocol_version = get_proto_version(peer_type, true);
730   // this is fyi, actually, server decides!
731   h.connect.flags = policy.lossy ? CEPH_MSG_CONNECT_LOSSY : 0;
732
733   return messenger.get_authorizer(peer_type, false)
734     .then([this](auto&& auth) {
735       h.authorizer = std::move(auth);
736       bufferlist bl;
737       if (h.authorizer) {
738         h.connect.authorizer_protocol = h.authorizer->protocol;
739         h.connect.authorizer_len = h.authorizer->bl.length();
740         bl.append(create_static(h.connect));
741         bl.append(h.authorizer->bl);
742       } else {
743         h.connect.authorizer_protocol = 0;
744         h.connect.authorizer_len = 0;
745         bl.append(create_static(h.connect));
746       };
747       return socket->write_flush(std::move(bl));
748     }).then([this] {
749      // read the reply
750       return socket->read(sizeof(h.reply));
751     }).then([this] (bufferlist bl) {
752       auto p = bl.cbegin();
753       ::decode(h.reply, p);
754       ceph_assert(p.end());
755       return socket->read(h.reply.authorizer_len);
756     }).then([this] (bufferlist bl) {
757       if (h.authorizer) {
758         auto reply = bl.cbegin();
759         if (!h.authorizer->verify_reply(reply)) {
760           logger().error("{} authorizer failed to verify reply", __func__);
761           throw std::system_error(make_error_code(error::negotiation_failure));
762         }
763       }
764       return handle_connect_reply(h.reply.tag);
765     });
766 }
767
768 seastar::future<>
769 SocketConnection::start_connect(const entity_addr_t& _peer_addr,
770                                 const entity_type_t& _peer_type)
771 {
772   ceph_assert(state == state_t::none);
773   ceph_assert(!socket);
774   peer_addr = _peer_addr;
775   peer_type = _peer_type;
776   messenger.register_conn(this);
777   state = state_t::connecting;
778   return seastar::connect(peer_addr.in4_addr())
779     .then([this](seastar::connected_socket fd) {
780       if (state == state_t::closing) {
781         fd.shutdown_input();
782         fd.shutdown_output();
783         throw std::system_error(make_error_code(error::connection_aborted));
784       }
785       socket.emplace(std::move(fd));
786       // read server's handshake header
787       return socket->read(server_header_size);
788     }).then([this] (bufferlist headerbl) {
789       auto p = headerbl.cbegin();
790       validate_banner(p);
791       entity_addr_t saddr, caddr;
792       ::decode(saddr, p);
793       ::decode(caddr, p);
794       ceph_assert(p.end());
795       validate_peer_addr(saddr, peer_addr);
796
797       if (my_addr != caddr) {
798         // take peer's address for me, but preserve my nonce
799         caddr.nonce = my_addr.nonce;
800         my_addr = caddr;
801       }
802       // encode/send client's handshake header
803       bufferlist bl;
804       bl.append(buffer::create_static(banner_size, banner));
805       ::encode(my_addr, bl, 0);
806       h.global_seq = messenger.get_global_seq();
807       return socket->write_flush(std::move(bl));
808     }).then([=] {
809       return seastar::repeat([this] {
810         return repeat_connect();
811       });
812     }).then([this] {
813       state = state_t::open;
814       // start background processing of tags
815       read_tags_until_next_message();
816     }).then_wrapped([this] (auto fut) {
817       // satisfy the handshake's promise
818       fut.forward_to(std::move(h.promise));
819     });
820 }
821
822 seastar::future<>
823 SocketConnection::start_accept(seastar::connected_socket&& fd,
824                                const entity_addr_t& _peer_addr)
825 {
826   ceph_assert(state == state_t::none);
827   ceph_assert(!socket);
828   peer_addr = _peer_addr;
829   socket.emplace(std::move(fd));
830   messenger.accept_conn(this);
831   state = state_t::accepting;
832   // encode/send server's handshake header
833   bufferlist bl;
834   bl.append(buffer::create_static(banner_size, banner));
835   ::encode(my_addr, bl, 0);
836   ::encode(peer_addr, bl, 0);
837   return socket->write_flush(std::move(bl))
838     .then([this] {
839       // read client's handshake header and connect request
840       return socket->read(client_header_size);
841     }).then([this] (bufferlist bl) {
842       auto p = bl.cbegin();
843       validate_banner(p);
844       entity_addr_t addr;
845       ::decode(addr, p);
846       ceph_assert(p.end());
847       if (!addr.is_blank_ip()) {
848         peer_addr = addr;
849       }
850     }).then([this] {
851       return seastar::repeat([this] {
852         return repeat_handle_connect();
853       });
854     }).then([this] {
855       state = state_t::open;
856       // start background processing of tags
857       read_tags_until_next_message();
858     }).then_wrapped([this] (auto fut) {
859       // satisfy the handshake's promise
860       fut.forward_to(std::move(h.promise));
861     });
862 }
863
864 seastar::future<> SocketConnection::fault()
865 {
866   if (policy.lossy) {
867     messenger.unregister_conn(this);
868   }
869   if (h.backoff.count()) {
870     h.backoff += h.backoff;
871   } else {
872     h.backoff = conf.ms_initial_backoff;
873   }
874   if (h.backoff > conf.ms_max_backoff) {
875     h.backoff = conf.ms_max_backoff;
876   }
877   return seastar::sleep(h.backoff);
878 }