done = true;
t.join();
ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
- auto i = qp_conns.begin();
- while (i != qp_conns.end()) {
- delete i->second.first;
- ++i;
- }
- while (!dead_queue_pairs.empty()) {
- delete dead_queue_pairs.back();
- dead_queue_pairs.pop_back();
- }
+ assert(qp_conns.empty());
+ assert(dead_queue_pairs.empty());
+ tx_cc->ack_events();
rx_cc->ack_events();
+ delete tx_cq;
delete rx_cq;
+ delete tx_cc;
delete rx_cc;
delete async_handler;
}
RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
: cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
- w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s)
+ w_lock("RDMADispatcher::for worker pending list"), stack(s)
{
+ tx_cc = ib->create_comp_channel(c);
+ assert(tx_cc);
rx_cc = ib->create_comp_channel(c);
assert(rx_cc);
+ tx_cq = ib->create_comp_queue(c, tx_cc);
+ assert(tx_cq);
rx_cq = ib->create_comp_queue(c, rx_cc);
assert(rx_cq);
plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
+ plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
+ plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
+ plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
+ plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
+
plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
std::vector<ibv_wc> tx_cqe;
- RDMAWorker* worker;
- ldout(cct, 20) << __func__ << " going to poll rx cq:" << rx_cq << dendl;
+ ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
RDMAConnectedSocketImpl *conn = nullptr;
utime_t last_inactive = ceph_clock_now();
bool rearmed = false;
- int ret = 0;
+ int r = 0;
while (true) {
- int n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
- if (!n) {
+ int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
+ if (tx_ret > 0) {
+ ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
+ << " responses."<< dendl;
+ handle_tx_event(wc, tx_ret);
+ }
+
+ int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
+ if (rx_ret > 0) {
+ ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
+ << " responses."<< dendl;
+ perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
+
+ Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+ for (int i = 0; i < rx_ret; ++i) {
+ ibv_wc* response = &wc[i];
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
+
+ assert(wc[i].opcode == IBV_WC_RECV);
+
+ if (response->status == IBV_WC_SUCCESS) {
+ conn = get_conn_lockless(response->qp_num);
+ if (!conn) {
+ assert(ib->is_rx_buffer(chunk->buffer));
+ r = ib->post_chunk(chunk);
+ ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
+ assert(r == 0);
+ } else {
+ polled[conn].push_back(*response);
+ }
+ } else {
+ perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+ ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
+ << ") status(" << response->status << ":"
+ << ib->wc_status_to_string(response->status) << ")" << dendl;
+ assert(ib->is_rx_buffer(chunk->buffer));
+ r = ib->post_chunk(chunk);
+ if (r) {
+ ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
+ assert(r == 0);
+ }
+
+ conn = get_conn_lockless(response->qp_num);
+ if (conn && conn->is_connected())
+ conn->fault();
+ }
+ }
+
+ for (auto &&i : polled)
+ i.first->pass_wc(std::move(i.second));
+ polled.clear();
+ }
+
+ if (!tx_ret && !rx_ret) {
// NOTE: Has TX just transitioned to idle? We should do it when idle!
// It's now safe to delete queue pairs (see comment by declaration
// for dead_queue_pairs).
if (!rearmed) {
// Clean up cq events after rearm notify ensure no new incoming event
// arrived between polling and rearm
+ tx_cq->rearm_notify();
rx_cq->rearm_notify();
rearmed = true;
continue;
}
- struct pollfd channel_poll;
- channel_poll.fd = rx_cc->get_fd();
- channel_poll.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
- channel_poll.revents = 0;
- int r = 0;
+ struct pollfd channel_poll[2];
+ channel_poll[0].fd = tx_cc->get_fd();
+ channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ channel_poll[0].revents = 0;
+ channel_poll[1].fd = rx_cc->get_fd();
+ channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ channel_poll[1].revents = 0;
+ r = 0;
perf_logger->set(l_msgr_rdma_polling, 0);
while (!done && r == 0) {
- r = poll(&channel_poll, 1, 1);
+ r = poll(channel_poll, 2, 1);
if (r < 0) {
r = -errno;
lderr(cct) << __func__ << " poll failed " << r << dendl;
ceph_abort();
}
}
+ if (r > 0 && tx_cc->get_cq_event())
+ ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
if (r > 0 && rx_cc->get_cq_event())
- ldout(cct, 20) << __func__ << " got cq event." << dendl;
+ ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
last_inactive = ceph_clock_now();
perf_logger->set(l_msgr_rdma_polling, 1);
rearmed = false;
}
- continue;
- }
-
- ldout(cct, 20) << __func__ << " pool completion queue got " << n
- << " responses."<< dendl;
- perf_logger->inc(l_msgr_rdma_rx_total_wc, n);
- Mutex::Locker l(lock);//make sure connected socket alive when pass wc
- for (int i = 0; i < n; ++i) {
- ibv_wc* response = &wc[i];
- Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-
- ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
-
- if (wc[i].opcode == IBV_WC_SEND) {
- tx_cqe.push_back(wc[i]);
- ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl;
- continue;
- }
-
- if (response->status != IBV_WC_SUCCESS) {
- perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
- ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
- << ") status(" << response->status << ":"
- << ib->wc_status_to_string(response->status) << ")" << dendl;
- if (ib->is_rx_buffer(chunk->buffer)) {
- ret = ib->post_chunk(chunk);
- if (ret) {
- ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(ret) << dendl;
- assert(ret == 0);
- }
- conn = get_conn_lockless(response->qp_num);
- if (conn && conn->is_connected())
- conn->fault();
- notify_pending_workers();
- } else if (ib->is_tx_buffer(chunk->buffer)) {
- tx_cqe.push_back(wc[i]);
- } else {
- ldout(cct, 0) << __func__ << " unknown chunk: " << chunk << dendl;
- }
- continue;
- }
-
- conn = get_conn_lockless(response->qp_num);
- if (!conn) {
- ret = ib->post_chunk(chunk);
- ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << ret << dendl;
- assert(ret == 0);
- continue;
- }
- polled[conn].push_back(*response);
- }
- for (auto &&i : polled)
- i.first->pass_wc(std::move(i.second));
- polled.clear();
- if (!tx_cqe.empty()) {
- worker = get_worker_from_list();
- if (worker == nullptr)
- worker = dynamic_cast<RDMAWorker*>(stack->get_worker());
- worker->pass_wc(std::move(tx_cqe));
- tx_cqe.clear();
}
}
}
void RDMADispatcher::notify_pending_workers() {
- {
- Mutex::Locker l(w_lock);
- if (pending_workers.empty())
- return ;
+ if (num_pending_workers) {
+ RDMAWorker *w = nullptr;
+ {
+ Mutex::Locker l(w_lock);
+ if (!pending_workers.empty()) {
+ w = pending_workers.front();
+ pending_workers.pop_front();
+ --num_pending_workers;
+ }
+ }
+ if (w)
+ w->notify_worker();
}
- pending_workers.front()->notify_worker();
}
int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
return fd;
}
-int RDMADispatcher::register_worker(RDMAWorker* w)
-{
- int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
- assert(fd >= 0);
- Mutex::Locker l(w_lock);
- workers[w] = fd;
- return fd;
-}
-
-void RDMADispatcher::pending_buffers(RDMAWorker* w)
-{
- Mutex::Locker l(w_lock);
- pending_workers.push_back(w);
-}
-
-RDMAWorker* RDMADispatcher::get_worker_from_list()
-{
- Mutex::Locker l(w_lock);
- if (pending_workers.empty())
- return nullptr;
- else {
- RDMAWorker* w = pending_workers.front();
- pending_workers.pop_front();
- return w;
- }
-}
-
RDMAConnectedSocketImpl* RDMADispatcher::get_conn_by_qp(uint32_t qp)
{
Mutex::Locker l(lock);
t = std::thread(&RDMADispatcher::polling, this);
}
+void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
+{
+ std::vector<Chunk*> tx_chunks;
+
+ for (int i = 0; i < n; ++i) {
+ ibv_wc* response = &cqe[i];
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ ldout(cct, 25) << __func__ << " QP: " << response->qp_num
+ << " len: " << response->byte_len << " , addr:" << chunk
+ << " " << ib->wc_status_to_string(response->status) << dendl;
+
+ if (response->status != IBV_WC_SUCCESS) {
+ perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
+ if (response->status == IBV_WC_RETRY_EXC_ERR) {
+ ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
+ perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
+ } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
+ ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
+ << response->qp_num << " should be down while this WR=" << response->wr_id
+ << " still in flight." << dendl;
+ perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
+ } else {
+ ldout(cct, 1) << __func__ << " send work request returned error for buffer("
+ << response->wr_id << ") status(" << response->status << "): "
+ << ib->wc_status_to_string(response->status) << dendl;
+ }
+
+ Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+
+ if (conn && conn->is_connected()) {
+ ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
+ conn->fault();
+ } else {
+ ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+ }
+ }
+
+ // FIXME: why not tx?
+ if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer))
+ tx_chunks.push_back(chunk);
+ else
+ ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
+ }
+
+ perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
+ post_tx_buffer(tx_chunks);
+}
+
+/**
+ * Add the given Chunks to the given free queue.
+ *
+ * \param[in] chunks
+ * The Chunks to enqueue.
+ * \return
+ * 0 if success or -1 for failure
+ */
+void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
+{
+ if (chunks.empty())
+ return ;
+
+ inflight -= chunks.size();
+ ib->get_memory_manager()->return_tx(chunks);
+ ldout(cct, 30) << __func__ << " release " << chunks.size()
+ << " chunks, inflight " << inflight << dendl;
+ notify_pending_workers();
+}
+
RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
: Worker(c, i), stack(nullptr), infiniband(NULL),
sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
- plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
- plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
- plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
- plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
-
plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
RDMAWorker::~RDMAWorker()
{
delete tx_handler;
- if (notify_fd >= 0)
- ::close(notify_fd);
}
void RDMAWorker::initialize()
{
if (!dispatcher) {
dispatcher = stack->get_dispatcher();
- notify_fd = dispatcher->register_worker(this);
- center.create_file_event(notify_fd, EVENT_READABLE, tx_handler);
memory_manager = infiniband->get_memory_manager();
}
}
-void RDMAWorker::notify_worker()
-{
- uint64_t i = 1;
- assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
-}
-
-void RDMAWorker::pass_wc(std::vector<ibv_wc> &&v)
-{
- Mutex::Locker l(lock);
- if (wc.empty())
- wc = std::move(v);
- else
- wc.insert(wc.end(), v.begin(), v.end());
- notify_worker();
-}
-
-void RDMAWorker::get_wc(std::vector<ibv_wc> &w)
-{
- Mutex::Locker l(lock);
- if (wc.empty())
- return ;
- w.swap(wc);
-}
-
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa);
return 0;
}
-int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
+int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
assert(center.in_thread());
int r = infiniband->get_tx_buffers(c, bytes);
- if (r > 0) {
- stack->get_dispatcher()->inflight += r;
- ldout(cct, 30) << __func__ << " reserve " << r << " chunks, inflight " << dispatcher->inflight << dendl;
+ assert(r >= 0);
+ size_t got = infiniband->get_memory_manager()->get_tx_chunk_size() * r;
+ ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
+ stack->get_dispatcher()->inflight += r;
+ if (got == bytes)
return r;
- }
- assert(r == 0);
if (o) {
if (pending_sent_conns.back() != o)
pending_sent_conns.push_back(o);
- dispatcher->pending_buffers(this);
+ dispatcher->make_pending_worker(this);
}
return r;
}
-/**
- * Add the given Chunks to the given free queue.
- *
- * \param[in] chunks
- * The Chunks to enqueue.
- * \return
- * 0 if success or -1 for failure
- */
-void RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
-{
- assert(center.in_thread());
- if (chunks.empty())
- return ;
-
- dispatcher->inflight -= chunks.size();
- memory_manager->return_tx(chunks);
- ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
+void RDMAWorker::handle_pending_message()
+{
+ ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
std::set<RDMAConnectedSocketImpl*> done;
while (!pending_sent_conns.empty()) {
RDMAConnectedSocketImpl *o = pending_sent_conns.front();
if (r < 0) {
if (r == -EAGAIN) {
pending_sent_conns.push_front(o);
- break;
+ dispatcher->make_pending_worker(this);
+ return ;
}
o->fault();
}
}
}
-}
-
-void RDMAWorker::handle_tx_event()
-{
- std::vector<Chunk*> tx_chunks;
- std::vector<ibv_wc> cqe;
- get_wc(cqe);
-
- for (size_t i = 0; i < cqe.size(); ++i) {
- ibv_wc* response = &cqe[i];
- Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
- ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;
-
- if (response->status != IBV_WC_SUCCESS) {
- perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
- if (response->status == IBV_WC_RETRY_EXC_ERR) {
- ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
- perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
- } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
- ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
- << response->qp_num << " should be down while this WR=" << response->wr_id
- << " still in flight." << dendl;
- perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
- } else {
- ldout(cct, 1) << __func__ << " send work request returned error for buffer("
- << response->wr_id << ") status(" << response->status << "): "
- << infiniband->wc_status_to_string(response->status) << dendl;
- }
- RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
-
- if (conn && conn->is_connected()) {
- ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
- conn->fault();
- } else {
- ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
- }
- }
-
- // FIXME: why not tx?
- if (memory_manager->is_tx_buffer(chunk->buffer))
- tx_chunks.push_back(chunk);
- }
- perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size());
- post_tx_buffer(tx_chunks);
-
- ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
dispatcher->notify_pending_workers();
}
-
RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
{
if (!global_infiniband)
l_msgr_rdma_polling,
l_msgr_rdma_inflight_tx_chunks,
+ l_msgr_rdma_tx_total_wc,
+ l_msgr_rdma_tx_total_wc_errors,
+ l_msgr_rdma_tx_wc_retry_errors,
+ l_msgr_rdma_tx_wc_wr_flush_errors,
+
l_msgr_rdma_rx_total_wc,
l_msgr_rdma_rx_total_wc_errors,
l_msgr_rdma_rx_fin,
std::thread t;
CephContext *cct;
Infiniband* ib;
- Infiniband::CompletionQueue* rx_cq; // common completion queue for all transmits
- Infiniband::CompletionChannel* rx_cc;
+ Infiniband::CompletionQueue* tx_cq;
+ Infiniband::CompletionQueue* rx_cq;
+ Infiniband::CompletionChannel *tx_cc, *rx_cc;
EventCallbackRef async_handler;
bool done = false;
- Mutex lock; // protect `qp_conns
- Mutex w_lock; // protect pending workers
+ Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
// qp_num -> InfRcConnection
// The main usage of `qp_conns` is looking up connection by qp_num,
// so the lifecycle of element in `qp_conns` is the lifecycle of qp.
/// save them in this vector and delete them at a safe time, when there are
/// no outstanding transmit buffers to be lost.
std::vector<QueuePair*> dead_queue_pairs;
- Mutex qp_lock;//for csi reuse qp
- ceph::unordered_map<RDMAWorker*, int> workers;;
+
+ std::atomic<uint64_t> num_pending_workers = {0};
+ Mutex w_lock; // protect pending workers
+ // fixme: lockfree
std::list<RDMAWorker*> pending_workers;
RDMAStack* stack;
void handle_async_event();
void polling();
int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
- int register_worker(RDMAWorker* w);
- void pending_buffers(RDMAWorker* w);
+ void make_pending_worker(RDMAWorker* w) {
+ Mutex::Locker l(w_lock);
+ if (pending_workers.back() != w) {
+ pending_workers.push_back(w);
+ ++num_pending_workers;
+ }
+ }
RDMAStack* get_stack() { return stack; }
- RDMAWorker* get_worker_from_list();
RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp);
RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
void erase_qpn(uint32_t qpn);
+ Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
void notify_pending_workers();
virtual void handle_pre_fork() override;
virtual void handle_post_fork() override;
+ void handle_tx_event(ibv_wc *cqe, int n);
+ void post_tx_buffer(std::vector<Chunk*> &chunks);
std::atomic<uint64_t> inflight = {0};
};
enum {
l_msgr_rdma_first = 95000,
- l_msgr_rdma_tx_total_wc,
- l_msgr_rdma_tx_total_wc_errors,
- l_msgr_rdma_tx_wc_retry_errors,
- l_msgr_rdma_tx_wc_wr_flush_errors,
-
l_msgr_rdma_tx_no_mem,
l_msgr_rdma_tx_parital_mem,
l_msgr_rdma_tx_failed,
MemoryManager *memory_manager;
std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
RDMADispatcher* dispatcher = nullptr;
- int notify_fd = -1;
Mutex lock;
- std::vector<ibv_wc> wc;
class C_handle_cq_tx : public EventCallback {
RDMAWorker *worker;
public:
C_handle_cq_tx(RDMAWorker *w): worker(w) {}
void do_request(int fd) {
- worker->handle_tx_event();
+ worker->handle_pending_message();
}
};
PerfCounters *perf_logger;
explicit RDMAWorker(CephContext *c, unsigned i);
virtual ~RDMAWorker();
- void notify_worker();
- void pass_wc(std::vector<ibv_wc> &&v);
- void get_wc(std::vector<ibv_wc> &w);
virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
virtual void initialize() override;
RDMAStack *get_stack() { return stack; }
- int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
- void post_tx_buffer(std::vector<Chunk*> &chunks);
+ int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
void remove_pending_conn(RDMAConnectedSocketImpl *o) {
assert(center.in_thread());
pending_sent_conns.remove(o);
}
- void handle_tx_event();
+ void handle_pending_message();
void set_ib(Infiniband* ib) { infiniband = ib; }
void set_stack(RDMAStack *s) { stack = s; }
+ void notify_worker() {
+ center.dispatch_event_external(tx_handler);
+ }
};
class RDMAConnectedSocketImpl : public ConnectedSocketImpl {