]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: msgr2: send entity type and peer_address in Tag::HELLO frame
authorRicardo Dias <rdias@suse.com>
Wed, 30 Jan 2019 22:15:29 +0000 (22:15 +0000)
committerSage Weil <sage@redhat.com>
Thu, 7 Feb 2019 12:12:43 +0000 (06:12 -0600)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/AsyncConnection.cc
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index d2eaa558745bb4c1f4fff36de94a357e46c4545c..1ed18db114d66ccb4f689d0661531ec9c3483b3c 100644 (file)
@@ -386,7 +386,7 @@ void AsyncConnection::process() {
       ssize_t r = cs.is_connected();
       if (r < 0) {
         ldout(async_msgr->cct, 1) << __func__ << " reconnect failed to "
-                                 << target_addr << dendl;
+                                  << target_addr << dendl;
         if (r == -ECONNREFUSED) {
           ldout(async_msgr->cct, 2)
               << __func__ << " connection refused!" << dendl;
@@ -479,6 +479,7 @@ void AsyncConnection::accept(ConnectedSocket socket,
   std::lock_guard<std::mutex> l(lock);
   cs = std::move(socket);
   socket_addr = listen_addr;
+  target_addr = peer_addr; // until we know better
   state = STATE_ACCEPTING;
   protocol->accept();
   // rescheduler connection in order to avoid lock dep
index 49613bfbcbbcfc945c6fd9b2a3ff5679136b95e9..cb3745d6b46d152d8daae9154eea305c0ed09b9b 100644 (file)
@@ -138,17 +138,20 @@ protected:
   // this tuple is only used when decoding values from a payload buffer
   std::tuple<Args...> _values;
 
-  // required only when signing and encryting payload, otherwise is null
+  // required for using econding/decoding features or when signing and
+  // encryting payload, otherwise is null
   ProtocolV2 *protocol;
 
+  uint64_t features;
+
   template <typename T>
   inline void _encode_payload_each(T &t) {
     if constexpr (std::is_same<T, bufferlist const>()) {
       this->payload.claim_append((bufferlist &)t);
     } else if constexpr (std::is_same<T, std::vector<uint32_t> const>()) {
-      encode((uint32_t)t.size(), this->payload, -1ll);
+      encode((uint32_t)t.size(), this->payload, features);
       for (const auto &elem : t) {
-        encode(elem, this->payload, 0);
+        encode(elem, this->payload, features);
       }
     } else if constexpr (std::is_same<T, ceph_msg_header2 const>()) {
       this->payload.append((char *)&t, sizeof(t));
@@ -157,7 +160,7 @@ protected:
       protocol->sign_payload(this->payload);
       protocol->encrypt_payload(this->payload);
     } else {
-      encode(t, this->payload, -1ll);
+      encode(t, this->payload, features);
     }
   }
 
@@ -198,12 +201,12 @@ protected:
   }
 
 public:
-  PayloadFrame(const Args &... args) : protocol(nullptr) {
+  PayloadFrame(const Args &... args) : protocol(nullptr), features(0) {
     (_encode_payload_each(args), ...);
   }
 
   PayloadFrame(ProtocolV2 *protocol, const Args &... args)
-      : protocol(protocol) {
+      : protocol(protocol), features(protocol->connection_features) {
     (_encode_payload_each(args), ...);
   }
 
@@ -218,6 +221,16 @@ public:
   }
 };
 
+struct HelloFrame : public PayloadFrame<HelloFrame,
+                                        uint8_t,          // entity type
+                                        entity_addr_t> {  // peer_addr
+  const ProtocolV2::Tag tag = ProtocolV2::Tag::HELLO;
+  using PayloadFrame::PayloadFrame;
+
+  inline uint8_t &entity_type() { return get_val<0>(); }
+  inline entity_addr_t &peer_addr() { return get_val<1>(); }
+};
+
 struct AuthRequestFrame
     : public PayloadFrame<AuthRequestFrame, uint32_t, uint32_t, bufferlist> {
   const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST;
@@ -299,19 +312,18 @@ struct ClientIdentFrame
 
 struct ServerIdentFrame
     : public SignedEncryptedFrame<ServerIdentFrame, entity_addrvec_t,
-                                  entity_addr_t, int64_t, uint64_t, uint64_t,
+                                  int64_t, uint64_t, uint64_t,
                                   uint64_t, uint64_t, uint64_t> {
   const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT;
   using SignedEncryptedFrame::SignedEncryptedFrame;
 
   inline entity_addrvec_t &addrs() { return get_val<0>(); }
-  inline entity_addr_t &peer_addr() { return get_val<1>(); }
-  inline int64_t &gid() { return get_val<2>(); }
-  inline uint64_t &global_seq() { return get_val<3>(); }
-  inline uint64_t &supported_features() { return get_val<4>(); }
-  inline uint64_t &required_features() { return get_val<5>(); }
-  inline uint64_t &flags() { return get_val<6>(); }
-  inline uint64_t &cookie() { return get_val<7>(); }
+  inline int64_t &gid() { return get_val<1>(); }
+  inline uint64_t &global_seq() { return get_val<2>(); }
+  inline uint64_t &supported_features() { return get_val<3>(); }
+  inline uint64_t &required_features() { return get_val<4>(); }
+  inline uint64_t &flags() { return get_val<5>(); }
+  inline uint64_t &cookie() { return get_val<6>(); }
 };
 
 struct ReconnectFrame
@@ -1204,38 +1216,24 @@ CtPtr ProtocolV2::_banner_exchange(CtPtr callback) {
   ldout(cct, 20) << __func__ << dendl;
   bannerExchangeCallback = callback;
 
-  uint8_t type = messenger->get_mytype();
-  __le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
-  __le64 required_features = CEPH_MSGR2_REQUIRED_FEATURES;
-
-  size_t banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
-  __le16 banner_payload_len = sizeof(uint8_t) + 2 * sizeof(__le64);
-  size_t banner_len = banner_prefix_len + sizeof(__le16) + banner_payload_len;
-  char banner[banner_len];
-  uint8_t offset = 0;
-  memcpy(banner, CEPH_BANNER_V2_PREFIX, banner_prefix_len);
-  offset += banner_prefix_len;
-  memcpy(banner + offset, (void *)&banner_payload_len, sizeof(__le16));
-  offset += sizeof(__le16);
-  memcpy(banner + offset, (void *)&type, sizeof(uint8_t));
-  offset += sizeof(uint8_t);
-  memcpy(banner + offset, (void *)&supported_features, sizeof(__le64));
-  offset += sizeof(__le64);
-  memcpy(banner + offset, (void *)&required_features, sizeof(__le64));
+  bufferlist banner_payload;
+  encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
+  encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
 
   bufferlist bl;
-  bl.append(banner, banner_len);
+  bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
+  encode((uint16_t)banner_payload.length(), bl, 0);
+  bl.claim_append(banner_payload);
 
   return WRITE(bl, "banner", _wait_for_peer_banner);
 }
 
 CtPtr ProtocolV2::_wait_for_peer_banner() {
-  unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16) +
-                        sizeof(uint8_t) + 2 * sizeof(__le64);
-  return READ(banner_len, _banner_exchange_handle_peer_banner);
+  unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
+  return READ(banner_len, _handle_peer_banner);
 }
 
-CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) {
+CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
@@ -1256,58 +1254,56 @@ CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) {
     return _fault();
   }
 
-  uint8_t peer_type = 0;
-  __le64 peer_supported_features;
-  __le64 peer_required_features;
+  uint16_t payload_len;
+  bufferlist bl;
+  bl.push_back(
+      buffer::create_static(sizeof(__le16), buffer + banner_prefix_len));
+  auto ti = bl.cbegin();
+  try {
+    decode(payload_len, ti);
+  } catch (const buffer::error &e) {
+    lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
+    return _fault();
+  }
 
-  uint8_t offset = banner_prefix_len;
-  __le16 banner_payload_len = *(__le16 *)(buffer + offset);
+  ceph_assert(payload_len <= 4096);  // if we need more then we need to increase
+                                     // temp_buffer size as well
 
-  // V2 banner len check
-  if (banner_payload_len != (sizeof(uint8_t) + 2 * sizeof(__le64))) {
-    lderr(cct) << __func__ << " bad banner length: " << banner_payload_len
-               << dendl;
+  next_payload_len = payload_len;
+  return READ(next_payload_len, _handle_peer_banner_payload);
+}
+
+CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) {
+  ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+  if (r < 0) {
+    ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
+                  << " (" << cpp_strerror(r) << ")" << dendl;
     return _fault();
   }
-  offset += sizeof(__le16);
-
-  peer_type = *(uint8_t *)(buffer + offset);
-  offset += sizeof(uint8_t);
-  peer_supported_features = *(__le64 *)(buffer + offset);
-  offset += sizeof(__le64);
-  peer_required_features = *(__le64 *)(buffer + offset);
 
-  if (connection->get_peer_type() == -1) {
-    connection->set_peer_type(peer_type);
+  uint64_t peer_supported_features;
+  uint64_t peer_required_features;
 
-    ceph_assert(state == ACCEPTING);
-    connection->policy = messenger->get_policy(peer_type);
-    ldout(cct, 10) << __func__ << " accept of host_type " << (int)peer_type
-                   << ", policy.lossy=" << connection->policy.lossy
-                   << " policy.server=" << connection->policy.server
-                   << " policy.standby=" << connection->policy.standby
-                   << " policy.resetcheck=" << connection->policy.resetcheck
-                   << dendl;
-  } else {
-    if (connection->get_peer_type() != peer_type) {
-      ldout(cct, 1) << __func__ << " connection peer type does not match what"
-                    << " peer advertises " << connection->get_peer_type()
-                    << " != " << (int)peer_type << dendl;
-      stop();
-      connection->dispatch_queue->queue_reset(connection);
-      return nullptr;
-    }
+  bufferlist bl;
+  bl.push_back(buffer::create_static(next_payload_len, buffer));
+  auto ti = bl.cbegin();
+  try {
+    decode(peer_supported_features, ti);
+    decode(peer_required_features, ti);
+  } catch (const buffer::error &e) {
+    lderr(cct) << __func__ << " decode banner payload failed " << dendl;
+    return _fault();
   }
 
-  ldout(cct, 1) << __func__ << " peer_type=" << (int)peer_type
-                << " supported=" << std::hex << peer_supported_features
-                << " required=" << std::hex << peer_required_features
-                << std::dec << dendl;
+  ldout(cct, 1) << __func__ << " supported=" << std::hex
+                << peer_supported_features << " required=" << std::hex
+                << peer_required_features << std::dec << dendl;
 
   // Check feature bit compatibility
 
-  __le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
-  __le64 required_features = CEPH_MSGR2_REQUIRED_FEATURES;
+  uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
+  uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
 
   if ((required_features & peer_supported_features) != required_features) {
     ldout(cct, 1) << __func__ << " peer does not support all required features"
@@ -1328,15 +1324,43 @@ CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) {
   }
 
   this->peer_required_features = peer_required_features;
+  if (this->peer_required_features == 0) {
+    this->connection_features = CEPH_FEATURE_MSG_ADDR2;
+  }
+
+  HelloFrame hello(this, messenger->get_mytype(), connection->target_addr);
+  return WRITE(hello.get_buffer(), "hello frame", read_frame);
+}
+
+CtPtr ProtocolV2::handle_hello(char *payload, uint32_t length) {
+  ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
+
+  HelloFrame hello(payload, length);
 
-  if (cct->_conf->ms_inject_internal_delays &&
-      cct->_conf->ms_inject_socket_failures) {
-    if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
-      ldout(cct, 10) << __func__ << " sleep for "
-                     << cct->_conf->ms_inject_internal_delays << dendl;
-      utime_t t;
-      t.set_from_double(cct->_conf->ms_inject_internal_delays);
-      t.sleep();
+  ldout(cct, 5) << __func__ << " received hello:"
+                << " peer_type=" << (int)hello.entity_type()
+                << " peer_addr_for_me=" << hello.peer_addr() << dendl;
+
+  if (connection->get_peer_type() == -1) {
+    connection->set_peer_type(hello.entity_type());
+
+    ceph_assert(state == ACCEPTING);
+    connection->policy = messenger->get_policy(hello.entity_type());
+    ldout(cct, 10) << __func__ << " accept of host_type "
+                   << (int)hello.entity_type()
+                   << ", policy.lossy=" << connection->policy.lossy
+                   << " policy.server=" << connection->policy.server
+                   << " policy.standby=" << connection->policy.standby
+                   << " policy.resetcheck=" << connection->policy.resetcheck
+                   << dendl;
+  } else {
+    if (connection->get_peer_type() != hello.entity_type()) {
+      ldout(cct, 1) << __func__ << " connection peer type does not match what"
+                    << " peer advertises " << connection->get_peer_type()
+                    << " != " << (int)hello.entity_type() << dendl;
+      stop();
+      connection->dispatch_queue->queue_reset(connection);
+      return nullptr;
     }
   }
 
@@ -1385,6 +1409,7 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) {
                  << " tag=" << static_cast<uint32_t>(next_tag) << dendl;
 
   switch (next_tag) {
+    case Tag::HELLO:
     case Tag::AUTH_REQUEST:
     case Tag::AUTH_BAD_METHOD:
     case Tag::AUTH_BAD_AUTH:
@@ -1427,6 +1452,8 @@ CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
   }
 
   switch (next_tag) {
+    case Tag::HELLO:
+      return handle_hello(buffer, next_payload_len);
     case Tag::AUTH_REQUEST:
       return handle_auth_request(buffer, next_payload_len);
     case Tag::AUTH_BAD_METHOD:
@@ -2208,30 +2235,30 @@ CtPtr ProtocolV2::send_client_ident() {
       messenger->get_myaddrs().front().is_blank_ip()) {
     sockaddr_storage ss;
     socklen_t len = sizeof(ss);
-    getsockname(connection->cs.fd(), (sockaddr*)&ss, &len);
-    ldout(cct,1) << __func__ << " getsockname reveals I am " << (sockaddr*)&ss
-                << " when talking to " << connection->target_addr << dendl;
+    getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
+    ldout(cct, 1) << __func__ << " getsockname reveals I am " << (sockaddr *)&ss
+                  << " when talking to " << connection->target_addr << dendl;
     entity_addr_t a;
     a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
-    a.set_sockaddr((sockaddr*)&ss);
+    a.set_sockaddr((sockaddr *)&ss);
     a.set_port(0);
     connection->lock.unlock();
     messenger->learned_addr(a);
     if (cct->_conf->ms_inject_internal_delays &&
-       cct->_conf->ms_inject_socket_failures) {
+        cct->_conf->ms_inject_socket_failures) {
       if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
-       ldout(cct, 10) << __func__ << " sleep for "
-                      << cct->_conf->ms_inject_internal_delays << dendl;
-      utime_t t;
-      t.set_from_double(cct->_conf->ms_inject_internal_delays);
-      t.sleep();
+        ldout(cct, 10) << __func__ << " sleep for "
+                       << cct->_conf->ms_inject_internal_delays << dendl;
+        utime_t t;
+        t.set_from_double(cct->_conf->ms_inject_internal_delays);
+        t.sleep();
       }
     }
     connection->lock.lock();
     if (state != CONNECTING) {
       ldout(cct, 1) << __func__
-                   << " state changed while learned_addr, mark_down or "
-                   << " replacing must be happened just now" << dendl;
+                    << " state changed while learned_addr, mark_down or "
+                    << " replacing must be happened just now" << dendl;
       return nullptr;
     }
   }
@@ -2352,7 +2379,6 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) {
   ServerIdentFrame server_ident(this, payload, length);
   ldout(cct, 5) << __func__ << " received server identification:"
                 << " addrs=" << server_ident.addrs()
-                << " my_addr=" << server_ident.peer_addr()
                 << " gid=" << server_ident.gid()
                 << " global_seq=" << server_ident.global_seq()
                 << " features_supported=" << std::hex
@@ -2572,6 +2598,7 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
   if (client_ident.addrs().empty()) {
     return _fault();  // a v2 peer should never do this
   }
+
   connection->set_peer_addrs(client_ident.addrs());
   connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
 
@@ -2967,13 +2994,12 @@ CtPtr ProtocolV2::send_server_ident() {
 
   uint64_t gs = messenger->get_global_seq();
   ServerIdentFrame server_ident(
-      this, messenger->get_myaddrs(), connection->target_addr,
-      messenger->get_myname().num(), gs, connection->policy.features_supported,
+      this, messenger->get_myaddrs(), messenger->get_myname().num(), gs,
+      connection->policy.features_supported,
       connection->policy.features_required, flags, cookie);
 
   ldout(cct, 5) << __func__ << " sending identification:"
                 << " addrs=" << messenger->get_myaddrs()
-                << " target_addr=" << connection->target_addr
                 << " gid=" << messenger->get_myname().num()
                 << " global_seq=" << gs << " features_supported=" << std::hex
                 << connection->policy.features_supported
index d6721ffcf3dc49b516530067e8116488fa678a0a..5ac7f72c7006e7ebe0bcd085096556b68f54e417 100644 (file)
@@ -47,7 +47,8 @@ private:
 
 public:
   enum class Tag : uint32_t {
-    AUTH_REQUEST = 1,
+    HELLO = 1,
+    AUTH_REQUEST,
     AUTH_BAD_METHOD,
     AUTH_BAD_AUTH,
     AUTH_MORE,
@@ -81,7 +82,6 @@ private:
   std::shared_ptr<AuthSessionHandler> session_security;
   std::unique_ptr<AuthAuthorizerChallenge> authorizer_challenge;
   uint64_t auth_flags;
-  uint64_t connection_features;
   uint64_t cookie;
   uint64_t global_seq;
   uint64_t connect_seq;
@@ -142,12 +142,14 @@ private:
   void handle_message_ack(uint64_t seq);
 
   CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner);
-  READ_HANDLER_CONTINUATION_DECL(ProtocolV2,
-                                 _banner_exchange_handle_peer_banner);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner);
+  READ_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner_payload);
 
   Ct<ProtocolV2> *_banner_exchange(Ct<ProtocolV2> *callback);
   Ct<ProtocolV2> *_wait_for_peer_banner();
-  Ct<ProtocolV2> *_banner_exchange_handle_peer_banner(char *buffer, int r);
+  Ct<ProtocolV2> *_handle_peer_banner(char *buffer, int r);
+  Ct<ProtocolV2> *_handle_peer_banner_payload(char *buffer, int r);
+  Ct<ProtocolV2> *handle_hello(char *payload, uint32_t length);
 
   CONTINUATION_DECL(ProtocolV2, read_frame);
   READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length_and_tag);
@@ -191,6 +193,8 @@ private:
   Ct<ProtocolV2> *handle_message_ack(char *payload, uint32_t length);
 
 public:
+  uint64_t connection_features;
+
   ProtocolV2(AsyncConnection *connection);
   virtual ~ProtocolV2();