]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async: s/atomic_t/atomic<>/
authorKefu Chai <kchai@redhat.com>
Thu, 8 Jun 2017 03:40:00 +0000 (11:40 +0800)
committerKefu Chai <kchai@redhat.com>
Thu, 8 Jun 2017 03:51:06 +0000 (11:51 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 5418abaa0ea860d92e0cbc75928852f57b5e3470..86a5c6a5c8bb1f858fb86fde1f04e4b8ff102394 100644 (file)
@@ -122,7 +122,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
                                  Worker *w)
   : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
     logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0),
-    out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
+    state(STATE_NONE), state_after_send(STATE_NONE), port(-1),
     dispatch_queue(q), can_write(WriteStatus::NOWRITE),
     keepalive(false), recv_buf(NULL),
     recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
@@ -728,7 +728,7 @@ void AsyncConnection::process()
           // side queueing because messages can't be renumbered, but the (kernel) client will
           // occasionally pull a message out of the sent queue to send elsewhere.  in that case
           // it doesn't matter if we "got" it or not.
-          uint64_t cur_seq = in_seq.read();
+          uint64_t cur_seq = in_seq;
           if (message->get_seq() <= cur_seq) {
             ldout(async_msgr->cct,0) << __func__ << " got old message "
                     << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message
@@ -760,13 +760,13 @@ void AsyncConnection::process()
 #endif
 
           // note last received message.
-          in_seq.set(message->get_seq());
+          in_seq = message->get_seq();
          ldout(async_msgr->cct, 5) << " rx " << message->get_source() << " seq "
                                     << message->get_seq() << " " << message
                                    << " " << *message << dendl;
 
           if (!policy.lossy) {
-            ack_left.inc();
+            ack_left++;
             need_dispatch_writer = true;
           }
           state = STATE_OPEN;
@@ -1135,7 +1135,7 @@ ssize_t AsyncConnection::_process_connection()
 
         newly_acked_seq = *((uint64_t*)state_buffer);
         ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
-                            << " vs out_seq " << out_seq.read() << dendl;
+                            << " vs out_seq " << out_seq << dendl;
         discard_requeued_up_to(newly_acked_seq);
         //while (newly_acked_seq > out_seq.read()) {
         //  Message *m = _get_next_outgoing(NULL);
@@ -1148,7 +1148,7 @@ ssize_t AsyncConnection::_process_connection()
         //}
 
         bufferlist bl;
-        uint64_t s = in_seq.read();
+        uint64_t s = in_seq;
         bl.append((char*)&s, sizeof(s));
         r = try_send(bl);
         if (r == 0) {
@@ -1767,7 +1767,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   connect_seq = connect.connect_seq + 1;
   peer_global_seq = connect.global_seq;
   ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
-                             << connect_seq << " in_seq=" << in_seq.read() << ", sending READY" << dendl;
+                             << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl;
 
   int next_state;
 
@@ -1780,7 +1780,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     next_state = STATE_ACCEPTING_READY;
     discard_requeued_up_to(0);
     is_reset_from_peer = false;
-    in_seq.set(0);
+    in_seq = 0;
   }
 
   // send READY reply
@@ -1805,7 +1805,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
 
   if (reply.tag == CEPH_MSGR_TAG_SEQ) {
-    uint64_t s = in_seq.read();
+    uint64_t s = in_seq;
     reply_bl.append((char*)&s, sizeof(s));
   }
 
@@ -1964,7 +1964,7 @@ void AsyncConnection::requeue_sent()
     ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend "
                                << " (" << m->get_seq() << ")" << dendl;
     rq.push_front(make_pair(bufferlist(), m));
-    out_seq.dec();
+    out_seq--;
   }
 }
 
@@ -1983,7 +1983,7 @@ void AsyncConnection::discard_requeued_up_to(uint64_t seq)
                          << " <= " << seq << ", discarding" << dendl;
     p.second->put();
     rq.pop_front();
-    out_seq.inc();
+    out_seq++;
   }
   if (rq.empty())
     out_q.erase(CEPH_MSG_PRIO_HIGHEST);
@@ -2019,11 +2019,11 @@ int AsyncConnection::randomize_out_seq()
     int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq));
     rand_seq &= SEQ_MASK;
     lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl;
-    out_seq.set(rand_seq);
+    out_seq = rand_seq;
     return seq_error;
   } else {
     // previously, seq #'s always started at 0.
-    out_seq.set(0);
+    out_seq = 0;
     return 0;
   }
 }
@@ -2122,13 +2122,13 @@ void AsyncConnection::was_session_reset()
   dispatch_queue->queue_remote_reset(this);
 
   if (randomize_out_seq()) {
-    ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;
+    ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
   }
 
-  in_seq.set(0);
+  in_seq = 0;
   connect_seq = 0;
   // it's safe to directly set 0, double locked
-  ack_left.set(0);
+  ack_left = 0;
   once_ready = false;
   can_write = WriteStatus::NOWRITE;
 }
@@ -2182,7 +2182,7 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
 {
   FUNCTRACE();
   assert(center->in_thread());
-  m->set_seq(out_seq.inc());
+  m->set_seq(++out_seq);
 
   if (msgr->crcflags & MSG_CRC_HEADER)
     m->calc_header_crc();
@@ -2463,15 +2463,15 @@ void AsyncConnection::handle_write()
     } while (can_write == WriteStatus::CANWRITE);
     write_lock.unlock();
 
-    uint64_t left = ack_left.read();
+    uint64_t left = ack_left;
     if (left) {
       ceph_le64 s;
-      s = in_seq.read();
+      s = in_seq;
       outcoming_bl.append(CEPH_MSGR_TAG_ACK);
       outcoming_bl.append((char*)&s, sizeof(s));
       ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
-      ack_left.sub(left);
-      left = ack_left.read();
+      ack_left -= left;
+      left = ack_left;
       r = _try_send(left);
     } else if (is_queued()) {
       r = _try_send();
index 8f212ae8f29988c9d50429aa0fc3839207bfbba9..005b7c13ab29af3f1d618075c362902fa0d3fdf9 100644 (file)
@@ -294,8 +294,8 @@ class AsyncConnection : public Connection {
   PerfCounters *logger;
   int global_seq;
   __u32 connect_seq, peer_global_seq;
-  atomic64_t out_seq;
-  atomic64_t ack_left, in_seq;
+  std::atomic<uint64_t> out_seq{0};
+  std::atomic<uint64_t> ack_left{0}, in_seq{0};
   int state;
   int state_after_send;
   ConnectedSocket cs;