From: Haomai Wang Date: Thu, 28 May 2015 16:59:50 +0000 (+0800) Subject: AsyncConnection: Avoid "lock" acquire in message normal send flow X-Git-Tag: v9.0.2~23^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=80c3150d820a729636f79b9aa767fa0c88280aa1;p=ceph.git AsyncConnection: Avoid "lock" acquire in message normal send flow Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 6d2b8e89b4c4..26d0481d4543 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -174,9 +174,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p) : Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0), - out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), - sd(-1), port(-1), write_lock("AsyncConnection::write_lock"), can_write(0), - open_write(false), lock("AsyncConnection::lock"), keepalive(false), recv_buf(NULL), + out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1), + write_lock("AsyncConnection::write_lock"), can_write(0), + open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false), is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c) @@ -837,18 +837,19 @@ void AsyncConnection::process() // side queueing because messages can't be renumbered, but the (kernel) client will // occasionally pull a message out of the sent queue to send elsewhere. in that case // it doesn't matter if we "got" it or not. - if (message->get_seq() <= in_seq) { + uint64_t cur_seq = in_seq.read(); + if (message->get_seq() <= cur_seq) { ldout(async_msgr->cct,0) << __func__ << " got old message " - << message->get_seq() << " <= " << in_seq << " " << message << " " << *message + << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message << ", discarding" << dendl; message->put(); if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message) assert(0 == "old msgs despite reconnect_seq feature"); break; } - if (message->get_seq() > in_seq + 1) { + if (message->get_seq() > cur_seq + 1) { ldout(async_msgr->cct, 0) << __func__ << " missed message? skipped from seq " - << in_seq << " to " << message->get_seq() << dendl; + << cur_seq << " to " << message->get_seq() << dendl; if (async_msgr->cct->_conf->ms_die_on_skipped_message) assert(0 == "skipped incoming seq"); } @@ -856,13 +857,13 @@ void AsyncConnection::process() message->set_connection(this); // note last received message. - in_seq = message->get_seq(); + in_seq.set(message->get_seq()); ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq() << " " << message << " " << *message << dendl; // if send_message always successfully send, it may have no // opportunity to send seq ack. 10 is a experience value. - if (in_seq > in_seq_acked + 10) { + if (ack_left.inc() > 10) { center->dispatch_event_external(write_handler); } @@ -1242,7 +1243,8 @@ int AsyncConnection::_process_connection() //} bufferlist bl; - bl.append((char*)&in_seq, sizeof(in_seq)); + uint64_t s = in_seq.read(); + bl.append((char*)&s, sizeof(s)); r = try_send(bl); if (r == 0) { state = STATE_CONNECTING_READY; @@ -1810,7 +1812,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = " - << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl; + << connect_seq << " in_seq=" << in_seq.read() << ", sending READY" << dendl; int next_state; @@ -1823,7 +1825,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a next_state = STATE_ACCEPTING_READY; discard_requeued_up_to(0); is_reset_from_peer = false; - in_seq = 0; + in_seq.set(0); } // send READY reply @@ -1847,8 +1849,9 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (reply.authorizer_len) reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); + uint64_t s = in_seq.read(); if (reply.tag == CEPH_MSGR_TAG_SEQ) - reply_bl.append((char*)&in_seq, sizeof(in_seq)); + reply_bl.append((char*)&s, sizeof(s)); lock.Unlock(); // Because "replacing" will prevent other connections preempt this addr, @@ -2148,6 +2151,7 @@ void AsyncConnection::fault() void AsyncConnection::was_session_reset() { ldout(async_msgr->cct,10) << __func__ << " started" << dendl; + assert(lock.is_locked()); Mutex::Locker l(write_lock); discard_out_queue(); @@ -2157,9 +2161,10 @@ void AsyncConnection::was_session_reset() ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl; } - in_seq = 0; + in_seq.set(0); connect_seq = 0; - in_seq_acked = 0; + // it's safe to directly set 0, double locked + ack_left.set(0); once_ready = false; can_write = 0; } @@ -2324,8 +2329,8 @@ void AsyncConnection::handle_ack(uint64_t seq) void AsyncConnection::send_keepalive() { ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; - Mutex::Locker l(lock); - if (state != STATE_CLOSED) { + Mutex::Locker l(write_lock); + if (can_write != 2) { keepalive = true; center->dispatch_event_external(write_handler); } @@ -2367,12 +2372,11 @@ void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) void AsyncConnection::handle_write() { ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; - Mutex::Locker l(lock); bufferlist bl; int r = 0; write_lock.Lock(); - if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) { + if (can_write == 1) { if (keepalive) { _send_keepalive_or_ack(); keepalive = false; @@ -2399,13 +2403,14 @@ void AsyncConnection::handle_write() } } - if (in_seq > in_seq_acked) { + uint64_t left = ack_left.read(); + if (left) { ceph_le64 s; - s = in_seq; + s = in_seq.read(); bl.append(CEPH_MSGR_TAG_ACK); bl.append((char*)&s, sizeof(s)); - ldout(async_msgr->cct, 10) << __func__ << " try send msg ack" << dendl; - in_seq_acked = s; + ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; + ack_left.sub(ack_left); r = _try_send(bl); } else if (is_queued()) { r = _try_send(bl); @@ -2415,23 +2420,42 @@ void AsyncConnection::handle_write() ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; goto fail; } - } else if (state == STATE_STANDBY && !policy.server && is_queued()) { - ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state) - << " policy.server is false" << dendl; - _connect(); - } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) { - r = _try_send(bl); - if (r < 0) { - ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; - goto fail; + write_lock.Unlock(); + } else { + write_lock.Unlock(); + + if (async_msgr->cct->_conf->ms_inject_internal_delays) { + if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { + ldout(msgr->cct, 10) << __func__ << " sleep for " + << async_msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } } + + lock.Lock(); + write_lock.Lock(); + if (state == STATE_STANDBY && !policy.server && is_queued()) { + ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state) + << " policy.server is false" << dendl; + _connect(); + } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) { + r = _try_send(bl); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; + write_lock.Unlock(); + goto fail; + } + } + write_lock.Unlock(); + lock.Unlock(); } - write_lock.Unlock(); return ; fail: - write_lock.Unlock(); fault(); + lock.Unlock(); } void AsyncConnection::wakeup_from(uint64_t id) diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 11850856c129..1f5c0edc0c2b 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -234,7 +234,6 @@ class AsyncConnection : public Connection { int global_seq; __u32 connect_seq, peer_global_seq; atomic_t out_seq; - uint64_t in_seq, in_seq_acked; int state; int state_after_send; int sd; @@ -242,12 +241,14 @@ class AsyncConnection : public Connection { Messenger::Policy policy; Mutex write_lock; + uint64_t in_seq, in_seq_acked; int can_write; // 0. can't send 1. can send_message 2. connection is closed bool open_write; map > > out_q; // priority queue for outbound msgs list > sent; // the first bufferlist need to inject seq list local_messages; // local deliver bufferlist outcoming_bl; + bool keepalive; Mutex lock; utime_t backoff; // backoff time @@ -258,7 +259,6 @@ class AsyncConnection : public Connection { EventCallbackRef connect_handler; EventCallbackRef local_deliver_handler; EventCallbackRef wakeup_handler; - bool keepalive; struct iovec msgvec[IOV_MAX]; char *recv_buf; uint32_t recv_max_prefetch; @@ -277,6 +277,9 @@ class AsyncConnection : public Connection { bufferlist::iterator data_blp; bufferlist front, middle, data; ceph_msg_connect connect_msg; + // used to accumulate the difference between `in_seq` and `in_seq_acked`, why + // we have this field because of lock separation + int ack_left; // Connecting state bool got_bad_auth; AuthAuthorizer *authorizer;