ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
return *_dout << "-- " << async_msgr->get_myaddrs() << " >> "
<< peer_addrs << " conn(" << this
+ << (msgr2 ? " msgr2" : " legacy")
<< " :" << port
<< " s=" << get_state_name(state)
<< " pgs=" << peer_global_seq
data.push_back(std::move(ptr));
}
-AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
- Worker *w)
+AsyncConnection::AsyncConnection(
+ CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
+ Worker *w, bool m2)
: Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
recv_start(0), recv_end(0),
last_active(ceph::coarse_mono_clock::now()),
inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
- msg_left(0), cur_msg_size(0), got_bad_auth(false), authorizer(NULL), replacing(false),
+ msg_left(0), cur_msg_size(0), got_bad_auth(false), authorizer(NULL),
+ msgr2(m2), replacing(false),
is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0),
worker(w), center(&w->center)
{
} *delay_state;
public:
- AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
+ AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
+ Worker *w, bool is_msgr2);
~AsyncConnection() override;
void maybe_start_delay_thread();
bufferlist authorizer_buf;
ceph_msg_connect_reply connect_reply;
// Accepting state
+ bool msgr2 = false;
entity_addr_t socket_addr;
CryptoKey session_key;
bool replacing; // when replacing process happened, we will reply connect
stack = single->stack.get();
stack->start();
local_worker = stack->get_worker();
- local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
+ local_connection = new AsyncConnection(cct, this, &dispatch_queue,
+ local_worker, true);
init_local_connection();
reap_handler = new C_handle_reap(this);
unsigned processor_num = 1;
void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
{
lock.Lock();
- AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
+ AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
+ addr.is_msgr2());
conn->accept(std::move(cli_socket), addr);
accepting_conns.insert(conn);
lock.Unlock();
// create connection
Worker *w = stack->get_worker();
- AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
+ AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
+ addrs.front().is_msgr2());
conn->connect(addrs, type);
assert(!conns.count(addrs));
conns[addrs] = conn;
uint32_t get_type() const { return type; }
void set_type(uint32_t t) { type = t; }
+ bool is_legacy() const { return type == TYPE_LEGACY; }
+ bool is_msgr2() const { return type == TYPE_MSGR2; }
__u32 get_nonce() const { return nonce; }
void set_nonce(__u32 n) { nonce = n; }