Worker *w)
: Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
- out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
+ state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
dispatch_queue(q), can_write(WriteStatus::NOWRITE),
keepalive(false), recv_buf(NULL),
recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
// side queueing because messages can't be renumbered, but the (kernel) client will
// occasionally pull a message out of the sent queue to send elsewhere. in that case
// it doesn't matter if we "got" it or not.
- uint64_t cur_seq = in_seq.read();
+ uint64_t cur_seq = in_seq;
if (message->get_seq() <= cur_seq) {
ldout(async_msgr->cct,0) << __func__ << " got old message "
<< message->get_seq() << " <= " << cur_seq << " " << message << " " << *message
#endif
// note last received message.
- in_seq.set(message->get_seq());
+ in_seq = message->get_seq();
ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq "
<< message->get_seq() << " " << message
<< " " << *message << dendl;
if (!policy.lossy) {
- ack_left.inc();
+ ack_left++;
need_dispatch_writer = true;
}
state = STATE_OPEN;
newly_acked_seq = *((uint64_t*)state_buffer);
ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
- << " vs out_seq " << out_seq.read() << dendl;
+ << " vs out_seq " << out_seq << dendl;
discard_requeued_up_to(newly_acked_seq);
//while (newly_acked_seq > out_seq.read()) {
// Message *m = _get_next_outgoing(NULL);
//}
bufferlist bl;
- uint64_t s = in_seq.read();
+ uint64_t s = in_seq;
bl.append((char*)&s, sizeof(s));
r = try_send(bl);
if (r == 0) {
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
- << connect_seq << " in_seq=" << in_seq.read() << ", sending READY" << dendl;
+ << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl;
int next_state;
next_state = STATE_ACCEPTING_READY;
discard_requeued_up_to(0);
is_reset_from_peer = false;
- in_seq.set(0);
+ in_seq = 0;
}
// send READY reply
reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
if (reply.tag == CEPH_MSGR_TAG_SEQ) {
- uint64_t s = in_seq.read();
+ uint64_t s = in_seq;
reply_bl.append((char*)&s, sizeof(s));
}
ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
<< " (" << m->get_seq() << ")" << dendl;
rq.push_front(make_pair(bufferlist(), m));
- out_seq.dec();
+ out_seq--;
}
}
<< " <= " << seq << ", discarding" << dendl;
p.second->put();
rq.pop_front();
- out_seq.inc();
+ out_seq++;
}
if (rq.empty())
out_q.erase(CEPH_MSG_PRIO_HIGHEST);
int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq));
rand_seq &= SEQ_MASK;
lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
- out_seq.set(rand_seq);
+ out_seq = rand_seq;
return seq_error;
} else {
// previously, seq #'s always started at 0.
- out_seq.set(0);
+ out_seq = 0;
return 0;
}
}
dispatch_queue->queue_remote_reset(this);
if (randomize_out_seq()) {
- ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;
+ ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
}
- in_seq.set(0);
+ in_seq = 0;
connect_seq = 0;
// it's safe to directly set 0, double locked
- ack_left.set(0);
+ ack_left = 0;
once_ready = false;
can_write = WriteStatus::NOWRITE;
}
{
FUNCTRACE();
assert(center->in_thread());
- m->set_seq(out_seq.inc());
+ m->set_seq(++out_seq);
if (msgr->crcflags & MSG_CRC_HEADER)
m->calc_header_crc();
} while (can_write == WriteStatus::CANWRITE);
write_lock.unlock();
- uint64_t left = ack_left.read();
+ uint64_t left = ack_left;
if (left) {
ceph_le64 s;
- s = in_seq.read();
+ s = in_seq;
outcoming_bl.append(CEPH_MSGR_TAG_ACK);
outcoming_bl.append((char*)&s, sizeof(s));
ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
- ack_left.sub(left);
- left = ack_left.read();
+ ack_left -= left;
+ left = ack_left;
r = _try_send(left);
} else if (is_queued()) {
r = _try_send();