my_msg.peer_qpn = 0;
my_msg.gid = infiniband->get_gid();
notify_fd = dispatcher->register_qp(qp, this);
+ dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
+ dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
}
RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
{
ldout(cct, 20) << __func__ << " destruct." << dendl;
+ dispatcher->perf_logger->dec(l_msgr_rdma_active_queue_pair);
worker->remove_pending_conn(this);
dispatcher->erase_qpn(my_msg.qpn);
cleanup();
ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;
int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
if (r < 0) {
- if (r != -EAGAIN)
+ if (r != -EAGAIN) {
+ dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
+ ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
fault();
+ }
return;
}
r = infiniband->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
+ dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
}
} else {
r = infiniband->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
+ dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
return ;
}
ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
chunk->prepare_read(response->byte_len);
if (response->byte_len == 0) {
+ dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
if (connected) {
error = ECONNRESET;
assert(infiniband->post_chunk(chunk) == 0);
}
break;
}
+ worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
//assert(response->byte_len);
if (read == (ssize_t)len) {
buffers.push_back(chunk);
}
}
+ worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
if (is_server && connected == 0) {
ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
connected = 1; //if so, we don't need the last handshake
int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
if (ret == 0) {
- ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl;
+ ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough
}
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
assert(total <= pending_bl.length());
bufferlist swapped;
if (total < pending_bl.length()) {
+ worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
pending_bl.splice(total, pending_bl.length()-total, &swapped);
pending_bl.swap(swapped);
} else {
ldout(cct, 20) << __func__ << " send_inline." << dendl;
}*/
+ worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
if (pre_wr)
pre_wr->next = &iswr[current_swr];
pre_wr = &iswr[current_swr];
ibv_send_wr *bad_tx_work_request;
if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
- lderr(cct) << __func__ << " failed to send data"
- << " (most probably should be peer not ready): "
- << cpp_strerror(errno) << dendl;
+ ldout(cct, 1) << __func__ << " failed to send data"
+ << " (most probably should be peer not ready): "
+ << cpp_strerror(errno) << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return -errno;
}
+ worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl;
return 0;
}
wr.send_flags = IBV_SEND_SIGNALED;
ibv_send_wr* bad_tx_work_request;
if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
- lderr(cct) << __func__ << " failed to send message="
- << " ibv_post_send failed(most probably should be peer not ready): "
- << cpp_strerror(errno) << dendl;
+ ldout(cct, 1) << __func__ << " failed to send message="
+ << " ibv_post_send failed(most probably should be peer not ready): "
+ << cpp_strerror(errno) << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return ;
}
}
assert(rx_cc);
rx_cq = ib->create_comp_queue(c, rx_cc);
assert(rx_cq);
+
+ PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
+
+ plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
+ plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
+
+ plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
+ plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
+ plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
+
+ plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
+ plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
+
+ plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
+
+
+ plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
+ plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
+
+ perf_logger = plb.create_perf_counters();
+ cct->get_perfcounters_collection()->add(perf_logger);
+
t = std::thread(&RDMADispatcher::polling, this);
cct->register_fork_watcher(this);
}
void RDMADispatcher::handle_async_event()
{
- ldout(cct, 20) << __func__ << dendl;
+ ldout(cct, 30) << __func__ << dendl;
while (1) {
ibv_async_event async_event;
if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
<< " " << cpp_strerror(errno) << ")" << dendl;
return;
}
+ perf_logger->inc(l_msgr_rdma_total_async_events);
// FIXME: Currently we must ensure no other factor make QP in ERROR state,
// otherwise this qp can't be deleted in current cleanup flow.
if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
+ perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
uint64_t qpn = async_event.element.qp->qp_num;
ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
<< " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
// for dead_queue_pairs).
// Additionally, don't delete qp while outstanding_buffers isn't empty,
// because we need to check qp's state before sending
+ perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
if (!inflight.load()) {
Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
while (!dead_queue_pairs.empty()) {
dead_queue_pairs.pop_back();
}
}
- handle_async_event();
if (done)
break;
if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
+ handle_async_event();
if (!rearmed) {
// Clean up cq events after rearm notify ensure no new incoming event
// arrived between polling and rearm
channel_poll.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
channel_poll.revents = 0;
int r = 0;
+ perf_logger->set(l_msgr_rdma_polling, 0);
while (!done && r == 0) {
r = poll(&channel_poll, 1, 1);
if (r < 0) {
if (r > 0 && rx_cc->get_cq_event())
ldout(cct, 20) << __func__ << " got cq event." << dendl;
last_inactive = ceph_clock_now();
+ perf_logger->set(l_msgr_rdma_polling, 1);
rearmed = false;
}
continue;
ldout(cct, 20) << __func__ << " pool completion queue got " << n
<< " responses."<< dendl;
+ perf_logger->inc(l_msgr_rdma_rx_total_wc, n);
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
for (int i = 0; i < n; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
if (response->status != IBV_WC_SUCCESS) {
+ perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< ib->wc_status_to_string(response->status) << dendl;
: Worker(c, i), stack(nullptr), infiniband(NULL),
tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false)
{
+ // initialize perf_logger
+ char name[128];
+ sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
+ PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
+
+ plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
+ plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
+ plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
+ plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
+
+ plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
+ plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
+ plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
+
+ plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
+ plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
+ plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
+ plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted");
+
+ perf_logger = plb.create_perf_counters();
+ cct->get_perfcounters_collection()->add(perf_logger);
}
RDMAWorker::~RDMAWorker()
ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;
if (response->status != IBV_WC_SUCCESS) {
+ perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
if (response->status == IBV_WC_RETRY_EXC_ERR) {
ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
+ perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
} else if (response->status == IBV_WC_WR_FLUSH_ERR) {
ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
<< response->qp_num << " should be down while this WR=" << response->wr_id
<< " still in flight." << dendl;
+ perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
} else {
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
}
}
+ perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size());
post_tx_buffer(tx_chunks);
ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
w->set_ib(global_infiniband);
w->set_stack(this);
}
+
ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
}
class RDMAStack;
class RDMAWorker;
+enum {
+ l_msgr_rdma_dispatcher_first = 94000,
+
+ l_msgr_rdma_polling,
+ l_msgr_rdma_inflight_tx_chunks,
+
+ l_msgr_rdma_rx_total_wc,
+ l_msgr_rdma_rx_total_wc_errors,
+ l_msgr_rdma_rx_fin,
+
+ l_msgr_rdma_handshake_errors,
+
+ l_msgr_rdma_total_async_events,
+ l_msgr_rdma_async_last_wqe_events,
+
+ l_msgr_rdma_created_queue_pair,
+ l_msgr_rdma_active_queue_pair,
+
+ l_msgr_rdma_dispatcher_last,
+};
+
+
class RDMADispatcher : public CephContext::ForkWatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::QueuePair QueuePair;
};
public:
+ PerfCounters *perf_logger;
+
explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s);
virtual ~RDMADispatcher();
void handle_async_event();
};
+enum {
+ l_msgr_rdma_first = 95000,
+
+ l_msgr_rdma_tx_total_wc,
+ l_msgr_rdma_tx_total_wc_errors,
+ l_msgr_rdma_tx_wc_retry_errors,
+ l_msgr_rdma_tx_wc_wr_flush_errors,
+
+ l_msgr_rdma_tx_no_mem,
+ l_msgr_rdma_tx_parital_mem,
+ l_msgr_rdma_tx_failed,
+
+ l_msgr_rdma_tx_chunks,
+ l_msgr_rdma_tx_bytes,
+ l_msgr_rdma_rx_chunks,
+ l_msgr_rdma_rx_bytes,
+
+ l_msgr_rdma_last,
+};
+
class RDMAWorker : public Worker {
typedef Infiniband::CompletionQueue CompletionQueue;
typedef Infiniband::CompletionChannel CompletionChannel;
};
public:
+ PerfCounters *perf_logger;
explicit RDMAWorker(CephContext *c, unsigned i);
virtual ~RDMAWorker();
void notify();
class RDMAStack : public NetworkStack {
vector<std::thread> threads;
RDMADispatcher *dispatcher;
+ PerfCounters *perf_counter;
public:
explicit RDMAStack(CephContext *cct, const string &t);