From a75ac0ea465cdc89e63caaa62f13afc305fc832f Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 13 Jan 2015 22:18:02 +0800 Subject: [PATCH] AsyncConnection: Add ms_inject_* to AsyncConnection Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 130 +++++++++++++++++++++++-------- 1 file changed, 97 insertions(+), 33 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index b588c4fe6e82d..b7d3b374aff94 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -31,12 +31,12 @@ #define dout_prefix _conn_prefix(_dout) ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this - << " sd=" << sd << " :" << port - << " s=" << get_state_name(state) - << " pgs=" << peer_global_seq - << " cs=" << connect_seq - << " l=" << policy.lossy - << ")."; + << " sd=" << sd << " :" << port + << " s=" << get_state_name(state) + << " pgs=" << peer_global_seq + << " cs=" << connect_seq + << " l=" << policy.lossy + << ")."; } const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512; @@ -204,12 +204,12 @@ int AsyncConnection::read_bulk(int fd, char *buf, int len) if (errno == EAGAIN || errno == EINTR) { nread = 0; } else { - ldout(async_msgr->cct, 1) << __func__ << " Reading from fd=" << fd + ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << fd << " : "<< strerror(errno) << dendl; return -1; } } else if (nread == 0) { - ldout(async_msgr->cct, 1) << __func__ << " Peer close file descriptor " + ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor " << fd << dendl; return -1; } @@ -289,6 +289,13 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) return -EINTR; } + if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { + ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; + ::shutdown(sd, SHUT_RDWR); + } + } + uint64_t sent = 0; list::const_iterator pb = outcoming_bl.buffers().begin(); uint64_t left_pbrs = outcoming_bl.buffers().size(); @@ -362,6 +369,14 @@ int AsyncConnection::read_until(uint64_t len, char *p) assert(len); ldout(async_msgr->cct, 20) << __func__ << " len is " << len << " state_offset is " << state_offset << dendl; + + if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { + ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; + ::shutdown(sd, SHUT_RDWR); + } + } + int r = 0; uint64_t left = len - state_offset; if (recv_end > recv_start) { @@ -567,7 +582,7 @@ void AsyncConnection::process() // verify header crc if (header_crc != header.crc) { - ldout(async_msgr->cct,0) << __func__ << "reader got bad header crc " + ldout(async_msgr->cct,0) << __func__ << " reader got bad header crc " << header_crc << " != " << header.crc << dendl; goto fail; } @@ -764,7 +779,7 @@ void AsyncConnection::process() // if (session_security.get() == NULL) { - ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl; + ldout(async_msgr->cct, 10) << __func__ << " no session security set" << dendl; } else { if (session_security->check_message_signature(message)) { ldout(async_msgr->cct, 0) << __func__ << "Signature check failed" << dendl; @@ -1076,7 +1091,7 @@ int AsyncConnection::_process_connection() r = _try_send(bl); if (r == 0) { state = STATE_CONNECTING_WAIT_CONNECT_REPLY; - ldout(async_msgr->cct,20) << __func__ << "connect wrote (self +) cseq, waiting for reply" << dendl; + ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl; } else if (r > 0) { state = STATE_WAIT_SEND; state_after_send = STATE_CONNECTING_WAIT_CONNECT_REPLY; @@ -1194,9 +1209,9 @@ int AsyncConnection::_process_connection() assert(connect_seq == connect_reply.connect_seq); backoff = utime_t(); set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features); - ldout(async_msgr->cct, 10) << __func__ << "connect success " << connect_seq - << ", lossy = " << policy.lossy << ", features " - << get_features() << dendl; + ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq + << ", lossy = " << policy.lossy << ", features " + << get_features() << dendl; // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the // connection. PLR @@ -1452,11 +1467,11 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co } if (reply.tag == CEPH_MSGR_TAG_SEQ) { - ldout(async_msgr->cct, 10) << __func__ << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; + ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; state = STATE_CONNECTING_WAIT_ACK_SEQ; } if (reply.tag == CEPH_MSGR_TAG_READY) { - ldout(async_msgr->cct, 10) << __func__ << "got CEPH_MSGR_TAG_READY " << dendl; + ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl; state = STATE_CONNECTING_READY; } @@ -1480,7 +1495,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a reply.protocol_version = async_msgr->get_proto_version(peer_type, false); // mismatch? - ldout(async_msgr->cct,10) << __func__ << "accept my proto " << reply.protocol_version + ldout(async_msgr->cct, 10) << __func__ << " accept my proto " << reply.protocol_version << ", their proto " << connect.protocol_version << dendl; if (connect.protocol_version != reply.protocol_version) { return _reply_accept(CEPH_MSGR_TAG_BADPROTOVER, connect, reply, authorizer_reply); @@ -1504,7 +1519,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a } uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features; if (feat_missing) { - ldout(async_msgr->cct, 1) << __func__ << "peer missing required features " + ldout(async_msgr->cct, 1) << __func__ << " peer missing required features " << std::hex << feat_missing << std::dec << dendl; return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply); } @@ -1518,11 +1533,20 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a } // We've verified the authorizer for this AsyncConnection, so set up the session security structure. PLR - ldout(async_msgr->cct, 10) << __func__ << " accept: setting up session_security." << dendl; + ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl; // existing? lock.Unlock(); AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr); + + if (async_msgr->cct->_conf->ms_inject_internal_delays) { + ldout(msgr->cct, 10) << __func__ << " sleep for " + << async_msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + lock.Lock(); if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down, state=" @@ -1551,7 +1575,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a goto replace; } - ldout(async_msgr->cct, 0) << __func__ << "accept connect_seq " << connect.connect_seq + ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq << " vs existing " << existing->connect_seq << " state " << existing->state << dendl; @@ -1566,7 +1590,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (connect.connect_seq < existing->connect_seq) { // old attempt, or we sent READY but they didn't get it. - ldout(async_msgr->cct, 10) << __func__ << "accept existing " << existing << ".cseq " + ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing << ".cseq " << existing->connect_seq << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; reply.connect_seq = existing->connect_seq + 1; @@ -1610,7 +1634,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a assert(connect.global_seq >= existing->peer_global_seq); if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other existing->connect_seq == 0) { - ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq " + ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " << connect.connect_seq << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); @@ -1623,12 +1647,12 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a } // existing else if (!replacing && connect.connect_seq > 0) { // we reset, and they are opening a new session - ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq " + ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply); } else { // new session - ldout(async_msgr->cct,10) << __func__ << "accept new session" << dendl; + ldout(async_msgr->cct, 10) << __func__ << " accept new session" << dendl; existing = NULL; goto open; } @@ -1642,6 +1666,14 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a } ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl; + if (async_msgr->cct->_conf->ms_inject_internal_delays) { + ldout(msgr->cct, 10) << __func__ << " sleep for " + << async_msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + // There is no possible that existing connection will acquire this lock existing->lock.Lock(); @@ -1744,17 +1776,26 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a // if replacing, this con is alreadly accepted. lock.Unlock(); r = async_msgr->accept_conn(this); + + if (async_msgr->cct->_conf->ms_inject_internal_delays) { + ldout(msgr->cct, 10) << __func__ << " sleep for " + << async_msgr->cct->_conf->ms_inject_internal_delays << dendl; + utime_t t; + t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + lock.Lock(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr << " just fail later one(this)" << dendl; - goto fail; + goto fail_registered; } if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) { ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down, state=" << get_state_name(state) << dendl; assert(state == STATE_CLOSED); - goto fail; + goto fail_registered; } // notify @@ -1763,7 +1804,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a r = _try_send(reply_bl); if (r < 0) - goto fail; + goto fail_registered; if (r == 0) { state = next_state; @@ -1775,13 +1816,26 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a return 0; + fail_registered: + ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl; + + if (async_msgr->cct->_conf->ms_inject_internal_delays) { + ldout(async_msgr->cct, 10) << __func__ << " sleep for " + << async_msgr->cct->_conf->ms_inject_internal_delays + << dendl; + utime_t t; + t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + fail: + ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl; return -1; } void AsyncConnection::_connect() { - ldout(async_msgr->cct, 10) << __func__ << " " << connect_seq << dendl; + ldout(async_msgr->cct, 10) << __func__ << " csq=" << connect_seq << dendl; state = STATE_CONNECTING; stopping.set(0); @@ -1792,7 +1846,7 @@ void AsyncConnection::_connect() void AsyncConnection::accept(int incoming) { - ldout(async_msgr->cct, 10) << __func__ << " " << incoming << dendl; + ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl; assert(sd < 0); sd = incoming; @@ -1877,7 +1931,7 @@ void AsyncConnection::discard_requeued_up_to(uint64_t seq) */ void AsyncConnection::discard_out_queue() { - ldout(async_msgr->cct, 10) << __func__ << " " << dendl; + ldout(async_msgr->cct, 10) << __func__ << " started" << dendl; for (list::iterator p = sent.begin(); p != sent.end(); ++p) { ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl; @@ -1900,7 +1954,7 @@ int AsyncConnection::randomize_out_seq() // here. We'll check it on the call. PLR int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq)); out_seq &= SEQ_MASK; - lsubdout(async_msgr->cct, ms, 10) << __func__ << "randomize_out_seq " << out_seq << dendl; + lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << out_seq << dendl; return seq_error; } else { // previously, seq #'s always started at 0. @@ -1970,13 +2024,13 @@ void AsyncConnection::fault() void AsyncConnection::was_session_reset() { - ldout(async_msgr->cct,10) << __func__ << "was_session_reset" << dendl; + ldout(async_msgr->cct,10) << __func__ << " started" << dendl; discard_out_queue(); center->dispatch_event_external(remote_reset_handler); if (randomize_out_seq()) { - lsubdout(async_msgr->cct,ms,15) << __func__ << " Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; + lsubdout(async_msgr->cct,ms,15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; } in_seq = 0; @@ -1993,6 +2047,16 @@ void AsyncConnection::_stop() center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); async_msgr->unregister_conn(this); + + if (async_msgr->cct->_conf->ms_inject_internal_delays) { + ldout(msgr->cct, 10) << __func__ << " sleep for " + << async_msgr->cct->_conf->ms_inject_internal_delays + << dendl; + utime_t t; + t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); + t.sleep(); + } + shutdown_socket(); discard_out_queue(); open_write = false; -- 2.39.5