From 9247562e32435beb5a586e89fdcba6dd157d716e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 12 Jun 2018 14:06:33 -0500 Subject: [PATCH] msg/async: mark AsyncConnection with msgr2 flag Signed-off-by: Sage Weil --- src/msg/async/AsyncConnection.cc | 9 ++++++--- src/msg/async/AsyncConnection.h | 4 +++- src/msg/async/AsyncMessenger.cc | 9 ++++++--- src/msg/msg_types.h | 2 ++ 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 97d7e4400ef07..e2982b41db6b6 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -35,6 +35,7 @@ ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { return *_dout << "-- " << async_msgr->get_myaddrs() << " >> " << peer_addrs << " conn(" << this + << (msgr2 ? " msgr2" : " legacy") << " :" << port << " s=" << get_state_name(state) << " pgs=" << peer_global_seq @@ -118,8 +119,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) data.push_back(std::move(ptr)); } -AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, - Worker *w) +AsyncConnection::AsyncConnection( + CephContext *cct, AsyncMessenger *m, DispatchQueue *q, + Worker *w, bool m2) : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1), @@ -129,7 +131,8 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu recv_start(0), recv_end(0), last_active(ceph::coarse_mono_clock::now()), inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), - msg_left(0), cur_msg_size(0), got_bad_auth(false), authorizer(NULL), replacing(false), + msg_left(0), cur_msg_size(0), got_bad_auth(false), authorizer(NULL), + msgr2(m2), replacing(false), is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), worker(w), center(&w->center) { diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 58a5c1c5bd480..d929805ce5f1e 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -183,7 +183,8 @@ class AsyncConnection : public Connection { } *delay_state; public: - AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w); + AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, + Worker *w, bool is_msgr2); ~AsyncConnection() override; void maybe_start_delay_thread(); @@ -350,6 +351,7 @@ class AsyncConnection : public Connection { bufferlist authorizer_buf; ceph_msg_connect_reply connect_reply; // Accepting state + bool msgr2 = false; entity_addr_t socket_addr; CryptoKey session_key; bool replacing; // when replacing process happened, we will reply connect diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 7c7f8e2ae3d6d..d255f198dd155 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -275,7 +275,8 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, stack = single->stack.get(); stack->start(); local_worker = stack->get_worker(); - local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker); + local_connection = new AsyncConnection(cct, this, &dispatch_queue, + local_worker, true); init_local_connection(); reap_handler = new C_handle_reap(this); unsigned processor_num = 1; @@ -536,7 +537,8 @@ void AsyncMessenger::wait() void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr) { lock.Lock(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); + AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w, + addr.is_msgr2()); conn->accept(std::move(cli_socket), addr); accepting_conns.insert(conn); lock.Unlock(); @@ -553,7 +555,8 @@ AsyncConnectionRef AsyncMessenger::create_connect( // create connection Worker *w = stack->get_worker(); - AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); + AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w, + addrs.front().is_msgr2()); conn->connect(addrs, type); assert(!conns.count(addrs)); conns[addrs] = conn; diff --git a/src/msg/msg_types.h b/src/msg/msg_types.h index bbc64d58bd0e8..714a8ca9ae6c1 100644 --- a/src/msg/msg_types.h +++ b/src/msg/msg_types.h @@ -268,6 +268,8 @@ struct entity_addr_t { uint32_t get_type() const { return type; } void set_type(uint32_t t) { type = t; } + bool is_legacy() const { return type == TYPE_LEGACY; } + bool is_msgr2() const { return type == TYPE_MSGR2; } __u32 get_nonce() const { return nonce; } void set_nonce(__u32 n) { nonce = n; } -- 2.39.5