AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p)
: Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
- out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0),
- sd(-1), port(-1), write_lock("AsyncConnection::write_lock"), can_write(0),
- open_write(false), lock("AsyncConnection::lock"), keepalive(false), recv_buf(NULL),
+ out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1),
+ write_lock("AsyncConnection::write_lock"), can_write(0),
+ open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
// side queueing because messages can't be renumbered, but the (kernel) client will
// occasionally pull a message out of the sent queue to send elsewhere. in that case
// it doesn't matter if we "got" it or not.
- if (message->get_seq() <= in_seq) {
+ uint64_t cur_seq = in_seq.read();
+ if (message->get_seq() <= cur_seq) {
ldout(async_msgr->cct,0) << __func__ << " got old message "
- << message->get_seq() << " <= " << in_seq << " " << message << " " << *message
+ << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message
<< ", discarding" << dendl;
message->put();
if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message)
assert(0 == "old msgs despite reconnect_seq feature");
break;
}
- if (message->get_seq() > in_seq + 1) {
+ if (message->get_seq() > cur_seq + 1) {
ldout(async_msgr->cct, 0) << __func__ << " missed message? skipped from seq "
- << in_seq << " to " << message->get_seq() << dendl;
+ << cur_seq << " to " << message->get_seq() << dendl;
if (async_msgr->cct->_conf->ms_die_on_skipped_message)
assert(0 == "skipped incoming seq");
}
message->set_connection(this);
// note last received message.
- in_seq = message->get_seq();
+ in_seq.set(message->get_seq());
ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq()
<< " " << message << " " << *message << dendl;
// if send_message always successfully send, it may have no
// opportunity to send seq ack. 10 is a experience value.
- if (in_seq > in_seq_acked + 10) {
+ if (ack_left.inc() > 10) {
center->dispatch_event_external(write_handler);
}
//}
bufferlist bl;
- bl.append((char*)&in_seq, sizeof(in_seq));
+ uint64_t s = in_seq.read();
+ bl.append((char*)&s, sizeof(s));
r = try_send(bl);
if (r == 0) {
state = STATE_CONNECTING_READY;
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
- << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl;
+ << connect_seq << " in_seq=" << in_seq.read() << ", sending READY" << dendl;
int next_state;
next_state = STATE_ACCEPTING_READY;
discard_requeued_up_to(0);
is_reset_from_peer = false;
- in_seq = 0;
+ in_seq.set(0);
}
// send READY reply
if (reply.authorizer_len)
reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
+ uint64_t s = in_seq.read();
if (reply.tag == CEPH_MSGR_TAG_SEQ)
- reply_bl.append((char*)&in_seq, sizeof(in_seq));
+ reply_bl.append((char*)&s, sizeof(s));
lock.Unlock();
// Because "replacing" will prevent other connections preempt this addr,
void AsyncConnection::was_session_reset()
{
ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
+ assert(lock.is_locked());
Mutex::Locker l(write_lock);
discard_out_queue();
ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;
}
- in_seq = 0;
+ in_seq.set(0);
connect_seq = 0;
- in_seq_acked = 0;
+ // it's safe to directly set 0, double locked
+ ack_left.set(0);
once_ready = false;
can_write = 0;
}
void AsyncConnection::send_keepalive()
{
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
- Mutex::Locker l(lock);
- if (state != STATE_CLOSED) {
+ Mutex::Locker l(write_lock);
+ if (can_write != 2) {
keepalive = true;
center->dispatch_event_external(write_handler);
}
void AsyncConnection::handle_write()
{
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
- Mutex::Locker l(lock);
bufferlist bl;
int r = 0;
write_lock.Lock();
- if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) {
+ if (can_write == 1) {
if (keepalive) {
_send_keepalive_or_ack();
keepalive = false;
}
}
- if (in_seq > in_seq_acked) {
+ uint64_t left = ack_left.read();
+ if (left) {
ceph_le64 s;
- s = in_seq;
+ s = in_seq.read();
bl.append(CEPH_MSGR_TAG_ACK);
bl.append((char*)&s, sizeof(s));
- ldout(async_msgr->cct, 10) << __func__ << " try send msg ack" << dendl;
- in_seq_acked = s;
+ ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
+ ack_left.sub(ack_left);
r = _try_send(bl);
} else if (is_queued()) {
r = _try_send(bl);
ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
goto fail;
}
- } else if (state == STATE_STANDBY && !policy.server && is_queued()) {
- ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
- << " policy.server is false" << dendl;
- _connect();
- } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
- r = _try_send(bl);
- if (r < 0) {
- ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
- goto fail;
+ write_lock.Unlock();
+ } else {
+ write_lock.Unlock();
+
+ if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+ if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(msgr->cct, 10) << __func__ << " sleep for "
+ << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
+ utime_t t;
+ t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+ t.sleep();
+ }
}
+
+ lock.Lock();
+ write_lock.Lock();
+ if (state == STATE_STANDBY && !policy.server && is_queued()) {
+ ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
+ << " policy.server is false" << dendl;
+ _connect();
+ } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
+ r = _try_send(bl);
+ if (r < 0) {
+ ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
+ write_lock.Unlock();
+ goto fail;
+ }
+ }
+ write_lock.Unlock();
+ lock.Unlock();
}
- write_lock.Unlock();
return ;
fail:
- write_lock.Unlock();
fault();
+ lock.Unlock();
}
void AsyncConnection::wakeup_from(uint64_t id)