#define dout_prefix _conn_prefix(_dout)
ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this
- << " sd=" << sd << " :" << port
- << " s=" << get_state_name(state)
- << " pgs=" << peer_global_seq
- << " cs=" << connect_seq
- << " l=" << policy.lossy
- << ").";
+ << " sd=" << sd << " :" << port
+ << " s=" << get_state_name(state)
+ << " pgs=" << peer_global_seq
+ << " cs=" << connect_seq
+ << " l=" << policy.lossy
+ << ").";
}
const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
if (errno == EAGAIN || errno == EINTR) {
nread = 0;
} else {
- ldout(async_msgr->cct, 1) << __func__ << " Reading from fd=" << fd
+ ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << fd
<< " : "<< strerror(errno) << dendl;
return -1;
}
} else if (nread == 0) {
- ldout(async_msgr->cct, 1) << __func__ << " Peer close file descriptor "
+ ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor "
<< fd << dendl;
return -1;
}
return -EINTR;
}
+ if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
+ ::shutdown(sd, SHUT_RDWR);
+ }
+ }
+
uint64_t sent = 0;
list<bufferptr>::const_iterator pb = outcoming_bl.buffers().begin();
uint64_t left_pbrs = outcoming_bl.buffers().size();
assert(len);
ldout(async_msgr->cct, 20) << __func__ << " len is " << len << " state_offset is "
<< state_offset << dendl;
+
+ if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl;
+ ::shutdown(sd, SHUT_RDWR);
+ }
+ }
+
int r = 0;
uint64_t left = len - state_offset;
if (recv_end > recv_start) {
// verify header crc
if (header_crc != header.crc) {
- ldout(async_msgr->cct,0) << __func__ << "reader got bad header crc "
+ ldout(async_msgr->cct,0) << __func__ << " reader got bad header crc "
<< header_crc << " != " << header.crc << dendl;
goto fail;
}
//
if (session_security.get() == NULL) {
- ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " no session security set" << dendl;
} else {
if (session_security->check_message_signature(message)) {
ldout(async_msgr->cct, 0) << __func__ << "Signature check failed" << dendl;
r = _try_send(bl);
if (r == 0) {
state = STATE_CONNECTING_WAIT_CONNECT_REPLY;
- ldout(async_msgr->cct,20) << __func__ << "connect wrote (self +) cseq, waiting for reply" << dendl;
+ ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl;
} else if (r > 0) {
state = STATE_WAIT_SEND;
state_after_send = STATE_CONNECTING_WAIT_CONNECT_REPLY;
assert(connect_seq == connect_reply.connect_seq);
backoff = utime_t();
set_features((uint64_t)connect_reply.features & (uint64_t)connect_msg.features);
- ldout(async_msgr->cct, 10) << __func__ << "connect success " << connect_seq
- << ", lossy = " << policy.lossy << ", features "
- << get_features() << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " connect success " << connect_seq
+ << ", lossy = " << policy.lossy << ", features "
+ << get_features() << dendl;
// If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
// connection. PLR
}
if (reply.tag == CEPH_MSGR_TAG_SEQ) {
- ldout(async_msgr->cct, 10) << __func__ << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
state = STATE_CONNECTING_WAIT_ACK_SEQ;
}
if (reply.tag == CEPH_MSGR_TAG_READY) {
- ldout(async_msgr->cct, 10) << __func__ << "got CEPH_MSGR_TAG_READY " << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " got CEPH_MSGR_TAG_READY " << dendl;
state = STATE_CONNECTING_READY;
}
reply.protocol_version = async_msgr->get_proto_version(peer_type, false);
// mismatch?
- ldout(async_msgr->cct,10) << __func__ << "accept my proto " << reply.protocol_version
+ ldout(async_msgr->cct, 10) << __func__ << " accept my proto " << reply.protocol_version
<< ", their proto " << connect.protocol_version << dendl;
if (connect.protocol_version != reply.protocol_version) {
return _reply_accept(CEPH_MSGR_TAG_BADPROTOVER, connect, reply, authorizer_reply);
}
uint64_t feat_missing = policy.features_required & ~(uint64_t)connect.features;
if (feat_missing) {
- ldout(async_msgr->cct, 1) << __func__ << "peer missing required features "
+ ldout(async_msgr->cct, 1) << __func__ << " peer missing required features "
<< std::hex << feat_missing << std::dec << dendl;
return _reply_accept(CEPH_MSGR_TAG_FEATURES, connect, reply, authorizer_reply);
}
}
// We've verified the authorizer for this AsyncConnection, so set up the session security structure. PLR
- ldout(async_msgr->cct, 10) << __func__ << " accept: setting up session_security." << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " accept setting up session_security." << dendl;
// existing?
lock.Unlock();
AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
+
+ if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+ ldout(msgr->cct, 10) << __func__ << " sleep for "
+ << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
+ utime_t t;
+ t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+ t.sleep();
+ }
+
lock.Lock();
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept, it must be mark_down, state="
goto replace;
}
- ldout(async_msgr->cct, 0) << __func__ << "accept connect_seq " << connect.connect_seq
+ ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq
<< " vs existing " << existing->connect_seq
<< " state " << existing->state << dendl;
if (connect.connect_seq < existing->connect_seq) {
// old attempt, or we sent READY but they didn't get it.
- ldout(async_msgr->cct, 10) << __func__ << "accept existing " << existing << ".cseq "
+ ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing << ".cseq "
<< existing->connect_seq << " > " << connect.connect_seq
<< ", RETRY_SESSION" << dendl;
reply.connect_seq = existing->connect_seq + 1;
assert(connect.global_seq >= existing->peer_global_seq);
if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
existing->connect_seq == 0) {
- ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq "
+ ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
<< connect.connect_seq << ", " << existing << ".cseq = "
<< existing->connect_seq << "), sending RESETSESSION" << dendl;
return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
} // existing
else if (!replacing && connect.connect_seq > 0) {
// we reset, and they are opening a new session
- ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq "
+ ldout(async_msgr->cct, 0) << __func__ << " accept we reset (peer sent cseq "
<< connect.connect_seq << "), sending RESETSESSION" << dendl;
return _reply_accept(CEPH_MSGR_TAG_RESETSESSION, connect, reply, authorizer_reply);
} else {
// new session
- ldout(async_msgr->cct,10) << __func__ << "accept new session" << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " accept new session" << dendl;
existing = NULL;
goto open;
}
}
ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
+ if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+ ldout(msgr->cct, 10) << __func__ << " sleep for "
+ << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
+ utime_t t;
+ t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+ t.sleep();
+ }
+
// There is no possible that existing connection will acquire this lock
existing->lock.Lock();
// if replacing, this con is alreadly accepted.
lock.Unlock();
r = async_msgr->accept_conn(this);
+
+ if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+ ldout(msgr->cct, 10) << __func__ << " sleep for "
+ << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
+ utime_t t;
+ t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+ t.sleep();
+ }
+
lock.Lock();
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
<< " just fail later one(this)" << dendl;
- goto fail;
+ goto fail_registered;
}
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
ldout(async_msgr->cct, 1) << __func__ << " state changed while accept_conn, it must be mark_down, state="
<< get_state_name(state) << dendl;
assert(state == STATE_CLOSED);
- goto fail;
+ goto fail_registered;
}
// notify
r = _try_send(reply_bl);
if (r < 0)
- goto fail;
+ goto fail_registered;
if (r == 0) {
state = next_state;
return 0;
+ fail_registered:
+ ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl;
+
+ if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+ ldout(async_msgr->cct, 10) << __func__ << " sleep for "
+ << async_msgr->cct->_conf->ms_inject_internal_delays
+ << dendl;
+ utime_t t;
+ t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+ t.sleep();
+ }
+
fail:
+ ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl;
return -1;
}
void AsyncConnection::_connect()
{
- ldout(async_msgr->cct, 10) << __func__ << " " << connect_seq << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " csq=" << connect_seq << dendl;
state = STATE_CONNECTING;
stopping.set(0);
void AsyncConnection::accept(int incoming)
{
- ldout(async_msgr->cct, 10) << __func__ << " " << incoming << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl;
assert(sd < 0);
sd = incoming;
*/
void AsyncConnection::discard_out_queue()
{
- ldout(async_msgr->cct, 10) << __func__ << " " << dendl;
+ ldout(async_msgr->cct, 10) << __func__ << " started" << dendl;
for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl;
// here. We'll check it on the call. PLR
int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq));
out_seq &= SEQ_MASK;
- lsubdout(async_msgr->cct, ms, 10) << __func__ << "randomize_out_seq " << out_seq << dendl;
+ lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << out_seq << dendl;
return seq_error;
} else {
// previously, seq #'s always started at 0.
void AsyncConnection::was_session_reset()
{
- ldout(async_msgr->cct,10) << __func__ << "was_session_reset" << dendl;
+ ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
discard_out_queue();
center->dispatch_event_external(remote_reset_handler);
if (randomize_out_seq()) {
- lsubdout(async_msgr->cct,ms,15) << __func__ << " Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
+ lsubdout(async_msgr->cct,ms,15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
}
in_seq = 0;
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
async_msgr->unregister_conn(this);
+
+ if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+ ldout(msgr->cct, 10) << __func__ << " sleep for "
+ << async_msgr->cct->_conf->ms_inject_internal_delays
+ << dendl;
+ utime_t t;
+ t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+ t.sleep();
+ }
+
shutdown_socket();
discard_out_queue();
open_write = false;