]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: add perf counters to RDMA backend 13484/head
authorHaomai Wang <haomai@xsky.com>
Fri, 17 Feb 2017 16:23:37 +0000 (00:23 +0800)
committerHaomai Wang <haomai@xsky.com>
Sat, 18 Feb 2017 05:30:22 +0000 (13:30 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h

index 1b3c541c2ec3d9d9224e4a05b6df66506751b666..baa37e7ce53313d72bfb662f1ce9d99181728e25 100644 (file)
@@ -35,11 +35,14 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i
   my_msg.peer_qpn = 0;
   my_msg.gid = infiniband->get_gid();
   notify_fd = dispatcher->register_qp(qp, this);
+  dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
+  dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
 }
 
 RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
 {
   ldout(cct, 20) << __func__ << " destruct." << dendl;
+  dispatcher->perf_logger->dec(l_msgr_rdma_active_queue_pair);
   worker->remove_pending_conn(this);
   dispatcher->erase_qpn(my_msg.qpn);
   cleanup();
@@ -193,8 +196,11 @@ void RDMAConnectedSocketImpl::handle_connection() {
   ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;
   int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
   if (r < 0) {
-    if (r != -EAGAIN)
+    if (r != -EAGAIN) {
+      dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
+      ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
       fault();
+    }
     return;
   }
 
@@ -210,6 +216,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
     r = infiniband->send_msg(cct, tcp_fd, my_msg);
     if (r < 0) {
       ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
+      dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
       fault();
     }
   } else {
@@ -221,6 +228,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
       r = infiniband->send_msg(cct, tcp_fd, my_msg);
       if (r < 0) {
         ldout(cct, 1) << __func__ << " server ack failed." << dendl;
+        dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
         fault();
         return ;
       }
@@ -259,6 +267,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
     ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
     chunk->prepare_read(response->byte_len);
     if (response->byte_len == 0) {
+      dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
       if (connected) {
         error = ECONNRESET;
         assert(infiniband->post_chunk(chunk) == 0);
@@ -266,6 +275,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
       }
       break;
     }
+    worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
     //assert(response->byte_len);
     if (read == (ssize_t)len) {
       buffers.push_back(chunk);
@@ -280,6 +290,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
     }
   }
 
+  worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
   if (is_server && connected == 0) {
     ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
     connected = 1; //if so, we don't need the last handshake
@@ -402,7 +413,8 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
 
   int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
   if (ret == 0) {
-    ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl;
+    ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
+    worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
     return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough
   }
   vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
@@ -428,6 +440,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
   assert(total <= pending_bl.length());
   bufferlist swapped;
   if (total < pending_bl.length()) {
+    worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
     pending_bl.splice(total, pending_bl.length()-total, &swapped);
     pending_bl.swap(swapped);
   } else {
@@ -475,6 +488,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
       ldout(cct, 20) << __func__ << " send_inline." << dendl;
       }*/
 
+    worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
     if (pre_wr)
       pre_wr->next = &iswr[current_swr];
     pre_wr = &iswr[current_swr];
@@ -485,11 +499,13 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
 
   ibv_send_wr *bad_tx_work_request;
   if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
-    lderr(cct) << __func__ << " failed to send data"
-               << " (most probably should be peer not ready): "
-               << cpp_strerror(errno) << dendl;
+    ldout(cct, 1) << __func__ << " failed to send data"
+                  << " (most probably should be peer not ready): "
+                  << cpp_strerror(errno) << dendl;
+    worker->perf_logger->inc(l_msgr_rdma_tx_failed);
     return -errno;
   }
+  worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
   ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl;
   return 0;
 }
@@ -503,9 +519,10 @@ void RDMAConnectedSocketImpl::fin() {
   wr.send_flags = IBV_SEND_SIGNALED;
   ibv_send_wr* bad_tx_work_request;
   if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
-    lderr(cct) << __func__ << " failed to send message="
-               << " ibv_post_send failed(most probably should be peer not ready): "
-               << cpp_strerror(errno) << dendl;
+    ldout(cct, 1) << __func__ << " failed to send message="
+                  << " ibv_post_send failed(most probably should be peer not ready): "
+                  << cpp_strerror(errno) << dendl;
+    worker->perf_logger->inc(l_msgr_rdma_tx_failed);
     return ;
   }
 }
index 16e996c8e2a82bf45b324ca820adbf3cd2e4bf2e..aa2c73795cc478069edfb6e65fdbffff67677dc8 100644 (file)
@@ -55,13 +55,35 @@ RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
   assert(rx_cc);
   rx_cq = ib->create_comp_queue(c, rx_cc);
   assert(rx_cq);
+
+  PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
+
+  plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
+  plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
+
+  plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
+  plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
+  plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
+
+  plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
+  plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
+
+  plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
+
+
+  plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
+  plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
+
+  perf_logger = plb.create_perf_counters();
+  cct->get_perfcounters_collection()->add(perf_logger);
+
   t = std::thread(&RDMADispatcher::polling, this);
   cct->register_fork_watcher(this);
 }
 
 void RDMADispatcher::handle_async_event()
 {
-  ldout(cct, 20) << __func__ << dendl;
+  ldout(cct, 30) << __func__ << dendl;
   while (1) {
     ibv_async_event async_event;
     if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
@@ -70,9 +92,11 @@ void RDMADispatcher::handle_async_event()
                   << " " << cpp_strerror(errno) << ")" << dendl;
       return;
     }
+    perf_logger->inc(l_msgr_rdma_total_async_events);
     // FIXME: Currently we must ensure no other factor make QP in ERROR state,
     // otherwise this qp can't be deleted in current cleanup flow.
     if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
+      perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
       uint64_t qpn = async_event.element.qp->qp_num;
       ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
                      << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
@@ -114,6 +138,7 @@ void RDMADispatcher::polling()
       // for dead_queue_pairs).
       // Additionally, don't delete qp while outstanding_buffers isn't empty,
       // because we need to check qp's state before sending
+      perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
       if (!inflight.load()) {
         Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
         while (!dead_queue_pairs.empty()) {
@@ -122,11 +147,11 @@ void RDMADispatcher::polling()
           dead_queue_pairs.pop_back();
         }
       }
-      handle_async_event();
       if (done)
         break;
 
       if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
+        handle_async_event();
         if (!rearmed) {
           // Clean up cq events after rearm notify ensure no new incoming event
           // arrived between polling and rearm
@@ -140,6 +165,7 @@ void RDMADispatcher::polling()
         channel_poll.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
         channel_poll.revents = 0;
         int r = 0;
+        perf_logger->set(l_msgr_rdma_polling, 0);
         while (!done && r == 0) {
           r = poll(&channel_poll, 1, 1);
           if (r < 0) {
@@ -151,6 +177,7 @@ void RDMADispatcher::polling()
         if (r > 0 && rx_cc->get_cq_event())
           ldout(cct, 20) << __func__ << " got cq event." << dendl;
         last_inactive = ceph_clock_now();
+        perf_logger->set(l_msgr_rdma_polling, 1);
         rearmed = false;
       }
       continue;
@@ -158,12 +185,14 @@ void RDMADispatcher::polling()
 
     ldout(cct, 20) << __func__ << " pool completion queue got " << n
                    << " responses."<< dendl;
+    perf_logger->inc(l_msgr_rdma_rx_total_wc, n);
     Mutex::Locker l(lock);//make sure connected socket alive when pass wc
     for (int i = 0; i < n; ++i) {
       ibv_wc* response = &wc[i];
       Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
 
       if (response->status != IBV_WC_SUCCESS) {
+        perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
         ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
                       << ") status(" << response->status << ":"
                       << ib->wc_status_to_string(response->status) << dendl;
@@ -295,6 +324,27 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
   : Worker(c, i), stack(nullptr), infiniband(NULL),
     tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false)
 {
+  // initialize perf_logger
+  char name[128];
+  sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
+  PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
+
+  plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
+  plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
+  plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
+  plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
+
+  plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
+  plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
+  plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
+
+  plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
+  plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
+  plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
+  plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted");
+
+  perf_logger = plb.create_perf_counters();
+  cct->get_perfcounters_collection()->add(perf_logger);
 }
 
 RDMAWorker::~RDMAWorker()
@@ -441,12 +491,15 @@ void RDMAWorker::handle_tx_event()
     ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;
 
     if (response->status != IBV_WC_SUCCESS) {
+      perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
       if (response->status == IBV_WC_RETRY_EXC_ERR) {
         ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
+        perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
       } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
         ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
                       << response->qp_num << " should be down while this WR=" << response->wr_id
                       << " still in flight." << dendl;
+        perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
       } else {
         ldout(cct, 1) << __func__ << " send work request returned error for buffer("
                       << response->wr_id << ") status(" << response->status << "): "
@@ -469,6 +522,7 @@ void RDMAWorker::handle_tx_event()
     }
   }
 
+  perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size());
   post_tx_buffer(tx_chunks);
 
   ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
@@ -489,6 +543,7 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
     w->set_ib(global_infiniband);
     w->set_stack(this);
   }
+
   ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
 }
 
index b3c726d90d689b8a9559324fc408ae31847f86ad..7386b3ba92853d4737f8c85f363a69f1c6433065 100644 (file)
@@ -34,6 +34,28 @@ class RDMAServerSocketImpl;
 class RDMAStack;
 class RDMAWorker;
 
+enum {
+  l_msgr_rdma_dispatcher_first = 94000,
+
+  l_msgr_rdma_polling,
+  l_msgr_rdma_inflight_tx_chunks,
+
+  l_msgr_rdma_rx_total_wc,
+  l_msgr_rdma_rx_total_wc_errors,
+  l_msgr_rdma_rx_fin,
+
+  l_msgr_rdma_handshake_errors,
+
+  l_msgr_rdma_total_async_events,
+  l_msgr_rdma_async_last_wqe_events,
+
+  l_msgr_rdma_created_queue_pair,
+  l_msgr_rdma_active_queue_pair,
+
+  l_msgr_rdma_dispatcher_last,
+};
+
+
 class RDMADispatcher : public CephContext::ForkWatcher {
   typedef Infiniband::MemoryManager::Chunk Chunk;
   typedef Infiniband::QueuePair QueuePair;
@@ -84,6 +106,8 @@ class RDMADispatcher : public CephContext::ForkWatcher {
   };
 
  public:
+  PerfCounters *perf_logger;
+
   explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s);
   virtual ~RDMADispatcher();
   void handle_async_event();
@@ -105,6 +129,26 @@ class RDMADispatcher : public CephContext::ForkWatcher {
 };
 
 
+enum {
+  l_msgr_rdma_first = 95000,
+
+  l_msgr_rdma_tx_total_wc,
+  l_msgr_rdma_tx_total_wc_errors,
+  l_msgr_rdma_tx_wc_retry_errors,
+  l_msgr_rdma_tx_wc_wr_flush_errors,
+
+  l_msgr_rdma_tx_no_mem,
+  l_msgr_rdma_tx_parital_mem,
+  l_msgr_rdma_tx_failed,
+
+  l_msgr_rdma_tx_chunks,
+  l_msgr_rdma_tx_bytes,
+  l_msgr_rdma_rx_chunks,
+  l_msgr_rdma_rx_bytes,
+
+  l_msgr_rdma_last,
+};
+
 class RDMAWorker : public Worker {
   typedef Infiniband::CompletionQueue CompletionQueue;
   typedef Infiniband::CompletionChannel CompletionChannel;
@@ -132,6 +176,7 @@ class RDMAWorker : public Worker {
   };
 
  public:
+  PerfCounters *perf_logger;
   explicit RDMAWorker(CephContext *c, unsigned i);
   virtual ~RDMAWorker();
   void notify();
@@ -245,6 +290,7 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
 class RDMAStack : public NetworkStack {
   vector<std::thread> threads;
   RDMADispatcher *dispatcher;
+  PerfCounters *perf_counter;
 
  public:
   explicit RDMAStack(CephContext *cct, const string &t);