]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async: mark AsyncConnection with msgr2 flag
authorSage Weil <sage@redhat.com>
Tue, 12 Jun 2018 19:06:33 +0000 (14:06 -0500)
committerSage Weil <sage@redhat.com>
Tue, 3 Jul 2018 18:01:24 +0000 (13:01 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/msg_types.h

index 97d7e4400ef07287e3ce8997ed64c81e2dd66f6b..e2982b41db6b603ec1286de020946c46830a3e02 100644 (file)
@@ -35,6 +35,7 @@
 ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
   return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
                << peer_addrs << " conn(" << this
+               << (msgr2 ? " msgr2" : " legacy")
                 << " :" << port
                 << " s=" << get_state_name(state)
                 << " pgs=" << peer_global_seq
@@ -118,8 +119,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
   data.push_back(std::move(ptr));
 }
 
-AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
-                                 Worker *w)
+AsyncConnection::AsyncConnection(
+  CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
+  Worker *w, bool m2)
   : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
     logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
     state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
@@ -129,7 +131,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
     recv_start(0), recv_end(0),
     last_active(ceph::coarse_mono_clock::now()),
     inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
-    msg_left(0), cur_msg_size(0), got_bad_auth(false), authorizer(NULL), replacing(false),
+    msg_left(0), cur_msg_size(0), got_bad_auth(false), authorizer(NULL),
+    msgr2(m2), replacing(false),
     is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0),
     worker(w), center(&w->center)
 {
index 58a5c1c5bd4803ea9e0d0d8adc800b3ca43ccc9b..d929805ce5f1e001f63ebfeec6113cdc35f85a3c 100644 (file)
@@ -183,7 +183,8 @@ class AsyncConnection : public Connection {
   } *delay_state;
 
  public:
-  AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
+  AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
+                 Worker *w, bool is_msgr2);
   ~AsyncConnection() override;
   void maybe_start_delay_thread();
 
@@ -350,6 +351,7 @@ class AsyncConnection : public Connection {
   bufferlist authorizer_buf;
   ceph_msg_connect_reply connect_reply;
   // Accepting state
+  bool msgr2 = false;
   entity_addr_t socket_addr;
   CryptoKey session_key;
   bool replacing;    // when replacing process happened, we will reply connect
index 7c7f8e2ae3d6d4c8358f7a67bd9fa96bcacbe1c6..d255f198dd1553329df286ae2103d2a4d5c3503c 100644 (file)
@@ -275,7 +275,8 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
   stack = single->stack.get();
   stack->start();
   local_worker = stack->get_worker();
-  local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
+  local_connection = new AsyncConnection(cct, this, &dispatch_queue,
+                                        local_worker, true);
   init_local_connection();
   reap_handler = new C_handle_reap(this);
   unsigned processor_num = 1;
@@ -536,7 +537,8 @@ void AsyncMessenger::wait()
 void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
 {
   lock.Lock();
-  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
+  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
+                                               addr.is_msgr2());
   conn->accept(std::move(cli_socket), addr);
   accepting_conns.insert(conn);
   lock.Unlock();
@@ -553,7 +555,8 @@ AsyncConnectionRef AsyncMessenger::create_connect(
 
   // create connection
   Worker *w = stack->get_worker();
-  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
+  AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
+                                               addrs.front().is_msgr2());
   conn->connect(addrs, type);
   assert(!conns.count(addrs));
   conns[addrs] = conn;
index bbc64d58bd0e8f5c5a096953d232335d32db536f..714a8ca9ae6c16e60c70333a765c5a4c9e4373b6 100644 (file)
@@ -268,6 +268,8 @@ struct entity_addr_t {
 
   uint32_t get_type() const { return type; }
   void set_type(uint32_t t) { type = t; }
+  bool is_legacy() const { return type == TYPE_LEGACY; }
+  bool is_msgr2() const { return type == TYPE_MSGR2; }
 
   __u32 get_nonce() const { return nonce; }
   void set_nonce(__u32 n) { nonce = n; }