From: Kefu Chai Date: Thu, 8 Jun 2017 03:40:00 +0000 (+0800) Subject: msg/async: s/atomic_t/atomic<>/ X-Git-Tag: v12.1.0~209^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f6d6759983b4c370a119b54e19c12ece9cd2265e;p=ceph.git msg/async: s/atomic_t/atomic<>/ Signed-off-by: Kefu Chai --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 5418abaa0ea8..86a5c6a5c8bb 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -122,7 +122,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu Worker *w) : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), - out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1), + state(STATE_NONE), state_after_send(STATE_NONE), port(-1), dispatch_queue(q), can_write(WriteStatus::NOWRITE), keepalive(false), recv_buf(NULL), recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), @@ -728,7 +728,7 @@ void AsyncConnection::process() // side queueing because messages can't be renumbered, but the (kernel) client will // occasionally pull a message out of the sent queue to send elsewhere. in that case // it doesn't matter if we "got" it or not. - uint64_t cur_seq = in_seq.read(); + uint64_t cur_seq = in_seq; if (message->get_seq() <= cur_seq) { ldout(async_msgr->cct,0) << __func__ << " got old message " << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message @@ -760,13 +760,13 @@ void AsyncConnection::process() #endif // note last received message. - in_seq.set(message->get_seq()); + in_seq = message->get_seq(); ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq " << message->get_seq() << " " << message << " " << *message << dendl; if (!policy.lossy) { - ack_left.inc(); + ack_left++; need_dispatch_writer = true; } state = STATE_OPEN; @@ -1135,7 +1135,7 @@ ssize_t AsyncConnection::_process_connection() newly_acked_seq = *((uint64_t*)state_buffer); ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq - << " vs out_seq " << out_seq.read() << dendl; + << " vs out_seq " << out_seq << dendl; discard_requeued_up_to(newly_acked_seq); //while (newly_acked_seq > out_seq.read()) { // Message *m = _get_next_outgoing(NULL); @@ -1148,7 +1148,7 @@ ssize_t AsyncConnection::_process_connection() //} bufferlist bl; - uint64_t s = in_seq.read(); + uint64_t s = in_seq; bl.append((char*)&s, sizeof(s)); r = try_send(bl); if (r == 0) { @@ -1767,7 +1767,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = " - << connect_seq << " in_seq=" << in_seq.read() << ", sending READY" << dendl; + << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl; int next_state; @@ -1780,7 +1780,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis next_state = STATE_ACCEPTING_READY; discard_requeued_up_to(0); is_reset_from_peer = false; - in_seq.set(0); + in_seq = 0; } // send READY reply @@ -1805,7 +1805,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); if (reply.tag == CEPH_MSGR_TAG_SEQ) { - uint64_t s = in_seq.read(); + uint64_t s = in_seq; reply_bl.append((char*)&s, sizeof(s)); } @@ -1964,7 +1964,7 @@ void AsyncConnection::requeue_sent() ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend " << " (" << m->get_seq() << ")" << dendl; rq.push_front(make_pair(bufferlist(), m)); - out_seq.dec(); + out_seq--; } } @@ -1983,7 +1983,7 @@ void AsyncConnection::discard_requeued_up_to(uint64_t seq) << " <= " << seq << ", discarding" << dendl; p.second->put(); rq.pop_front(); - out_seq.inc(); + out_seq++; } if (rq.empty()) out_q.erase(CEPH_MSG_PRIO_HIGHEST); @@ -2019,11 +2019,11 @@ int AsyncConnection::randomize_out_seq() int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq)); rand_seq &= SEQ_MASK; lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl; - out_seq.set(rand_seq); + out_seq = rand_seq; return seq_error; } else { // previously, seq #'s always started at 0. - out_seq.set(0); + out_seq = 0; return 0; } } @@ -2122,13 +2122,13 @@ void AsyncConnection::was_session_reset() dispatch_queue->queue_remote_reset(this); if (randomize_out_seq()) { - ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl; + ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; } - in_seq.set(0); + in_seq = 0; connect_seq = 0; // it's safe to directly set 0, double locked - ack_left.set(0); + ack_left = 0; once_ready = false; can_write = WriteStatus::NOWRITE; } @@ -2182,7 +2182,7 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) { FUNCTRACE(); assert(center->in_thread()); - m->set_seq(out_seq.inc()); + m->set_seq(++out_seq); if (msgr->crcflags & MSG_CRC_HEADER) m->calc_header_crc(); @@ -2463,15 +2463,15 @@ void AsyncConnection::handle_write() } while (can_write == WriteStatus::CANWRITE); write_lock.unlock(); - uint64_t left = ack_left.read(); + uint64_t left = ack_left; if (left) { ceph_le64 s; - s = in_seq.read(); + s = in_seq; outcoming_bl.append(CEPH_MSGR_TAG_ACK); outcoming_bl.append((char*)&s, sizeof(s)); ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; - ack_left.sub(left); - left = ack_left.read(); + ack_left -= left; + left = ack_left; r = _try_send(left); } else if (is_queued()) { r = _try_send(); diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 8f212ae8f299..005b7c13ab29 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -294,8 +294,8 @@ class AsyncConnection : public Connection { PerfCounters *logger; int global_seq; __u32 connect_seq, peer_global_seq; - atomic64_t out_seq; - atomic64_t ack_left, in_seq; + std::atomic out_seq{0}; + std::atomic ack_left{0}, in_seq{0}; int state; int state_after_send; ConnectedSocket cs;