: cct(cct), connected(0), error(0), infiniband(ib),
dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
is_server(false), con_handler(new C_handle_connection(this)),
- active(false)
+ active(false), pending(false)
{
qp = infiniband->create_queue_pair(
cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted");
+ plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns");
perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
return r;
if (o) {
- if (pending_sent_conns.back() != o)
+ if (!o->is_pending()) {
pending_sent_conns.push_back(o);
+ perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1);
+ o->set_pending(1);
+ }
dispatcher->make_pending_worker(this);
}
return r;
void RDMAWorker::handle_pending_message()
{
ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
- std::set<RDMAConnectedSocketImpl*> done;
while (!pending_sent_conns.empty()) {
RDMAConnectedSocketImpl *o = pending_sent_conns.front();
pending_sent_conns.pop_front();
- if (!done.count(o)) {
- done.insert(o);
- ssize_t r = o->submit(false);
- ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
- if (r < 0) {
- if (r == -EAGAIN) {
- pending_sent_conns.push_back(o);
- dispatcher->make_pending_worker(this);
- return ;
- }
- o->fault();
+ ssize_t r = o->submit(false);
+ ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
+ if (r < 0) {
+ if (r == -EAGAIN) {
+ pending_sent_conns.push_back(o);
+ dispatcher->make_pending_worker(this);
+ return ;
}
+ o->fault();
}
+ o->set_pending(0);
+ perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1);
}
-
dispatcher->notify_pending_workers();
}
l_msgr_rdma_tx_bytes,
l_msgr_rdma_rx_chunks,
l_msgr_rdma_rx_bytes,
+ l_msgr_rdma_pending_sent_conns,
l_msgr_rdma_last,
};
EventCallbackRef con_handler;
int tcp_fd = -1;
bool active;// qp is active ?
+ bool pending;
void notify();
ssize_t read_buffers(char* buf, size_t len);
void cleanup();
void set_accept_fd(int sd);
int try_connect(const entity_addr_t&, const SocketOptions &opt);
-
+ bool is_pending() {return pending;}
+ void set_pending(bool val) {pending = val;}
class C_handle_connection : public EventCallback {
RDMAConnectedSocketImpl *csi;
bool active;