From 175b3798d580066b4bdc0214e39f3c6e3fd07243 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sun, 10 Apr 2016 15:21:55 +0800 Subject: [PATCH] AsyncConnection: avoid is_connected require connection's lock Fixes: http://tracker.ceph.com/issues/15440 Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 32 ++++++++++++++++---------------- src/msg/async/AsyncConnection.h | 9 +++++---- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 611e49890b1f0..7ae8187184535 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -178,7 +178,7 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p) : Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1), - write_lock("AsyncConnection::write_lock"), can_write(NOWRITE), + write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false), @@ -1349,7 +1349,7 @@ ssize_t AsyncConnection::_process_connection() // message may in queue between last _try_send and connection ready // write event may already notify and we need to force scheduler again write_lock.Lock(); - can_write = CANWRITE; + can_write = WriteStatus::CANWRITE; if (is_queued()) center->dispatch_event_external(write_handler); write_lock.Unlock(); @@ -1511,7 +1511,7 @@ ssize_t AsyncConnection::_process_connection() state = STATE_OPEN; memset(&connect_msg, 0, sizeof(connect_msg)); write_lock.Lock(); - can_write = CANWRITE; + can_write = WriteStatus::CANWRITE; if (is_queued()) center->dispatch_event_external(write_handler); write_lock.Unlock(); @@ -1816,7 +1816,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl; existing->_stop(); } else { - assert(can_write == NOWRITE); + assert(can_write == WriteStatus::NOWRITE); existing->write_lock.Lock(true); // queue a reset on the new connection, which we're dumping for the old center->dispatch_event_external(reset_handler); @@ -1842,7 +1842,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->requeue_sent(); swap(existing->sd, sd); - existing->can_write = NOWRITE; + existing->can_write = WriteStatus::NOWRITE; existing->open_write = false; existing->replacing = true; existing->state_offset = 0; @@ -2011,7 +2011,7 @@ int AsyncConnection::send_message(Message *m) if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; Mutex::Locker l(write_lock); - if (can_write != CLOSED) { + if (can_write != WriteStatus::CLOSED) { local_messages.push_back(m); center->dispatch_event_external(local_deliver_handler); } else { @@ -2037,14 +2037,14 @@ int AsyncConnection::send_message(Message *m) Mutex::Locker l(write_lock); // "features" changes will change the payload encoding - if (can_fast_prepare && (can_write == NOWRITE || get_features() != f)) { + if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) { // ensure the correctness of message encoding bl.clear(); m->get_payload().clear(); - ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer, can_write=" << can_write << " previous " + ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous " << f << " != " << get_features() << dendl; } - if (!is_queued() && can_write == CANWRITE && async_msgr->cct->_conf->ms_async_send_inline) { + if (!is_queued() && can_write == WriteStatus::CANWRITE && async_msgr->cct->_conf->ms_async_send_inline) { if (!can_fast_prepare) prepare_send_message(get_features(), m, bl); logger->inc(l_msgr_send_messages_inline); @@ -2053,7 +2053,7 @@ int AsyncConnection::send_message(Message *m) // we want to handle fault within internal thread center->dispatch_event_external(write_handler); } - } else if (can_write == CLOSED) { + } else if (can_write == WriteStatus::CLOSED) { ldout(async_msgr->cct, 10) << __func__ << " connection closed." << " Drop message " << m << dendl; m->put(); @@ -2163,7 +2163,7 @@ void AsyncConnection::fault() ::close(sd); sd = -1; } - can_write = NOWRITE; + can_write = WriteStatus::NOWRITE; open_write = false; // requeue sent items @@ -2238,7 +2238,7 @@ void AsyncConnection::was_session_reset() // it's safe to directly set 0, double locked ack_left.set(0); once_ready = false; - can_write = NOWRITE; + can_write = WriteStatus::NOWRITE; } void AsyncConnection::_stop() @@ -2257,7 +2257,7 @@ void AsyncConnection::_stop() state = STATE_CLOSED; open_write = false; - can_write = CLOSED; + can_write = WriteStatus::CLOSED; state_offset = 0; if (sd >= 0) { shutdown_socket(); @@ -2293,7 +2293,7 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) { - assert(can_write == CANWRITE); + assert(can_write == WriteStatus::CANWRITE); m->set_seq(out_seq.inc()); if (!policy.lossy) { @@ -2412,7 +2412,7 @@ void AsyncConnection::send_keepalive() { ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; Mutex::Locker l(write_lock); - if (can_write != CLOSED) { + if (can_write != WriteStatus::CLOSED) { keepalive = true; center->dispatch_event_external(write_handler); } @@ -2454,7 +2454,7 @@ void AsyncConnection::handle_write() ssize_t r = 0; write_lock.Lock(); - if (can_write == CANWRITE) { + if (can_write == WriteStatus::CANWRITE) { if (keepalive) { _send_keepalive_or_ack(); keepalive = false; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 4a76a7c1b2411..608cccee84895 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -17,6 +17,7 @@ #ifndef CEPH_MSG_ASYNCCONNECTION_H #define CEPH_MSG_ASYNCCONNECTION_H +#include #include #include #include @@ -130,8 +131,7 @@ class AsyncConnection : public Connection { ostream& _conn_prefix(std::ostream *_dout); bool is_connected() override { - Mutex::Locker l(lock); - return state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE; + return can_write.load() == WriteStatus::CANWRITE; } // Only call when AsyncConnection first construct @@ -240,11 +240,12 @@ class AsyncConnection : public Connection { Messenger::Policy policy; Mutex write_lock; - enum { + enum class WriteStatus { NOWRITE, CANWRITE, CLOSED - } can_write; + }; + std::atomic can_write; bool open_write; map > > out_q; // priority queue for outbound msgs list sent; // the first bufferlist need to inject seq -- 2.39.5