From: Haomai Wang Date: Thu, 13 Oct 2016 11:20:30 +0000 (+0800) Subject: msg/async/rdma: use RDMADispatch to poll rx channel X-Git-Tag: v11.1.0~416^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=955c78adca4942c0eaf183ff7decc4b7a37bcaab;p=ceph.git msg/async/rdma: use RDMADispatch to poll rx channel Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/rdma/Infiniband.cc b/src/msg/async/rdma/Infiniband.cc index fb34fecf9a75..eac08c6d290d 100644 --- a/src/msg/async/rdma/Infiniband.cc +++ b/src/msg/async/rdma/Infiniband.cc @@ -17,7 +17,6 @@ #include "Infiniband.h" #include "common/errno.h" #include "common/debug.h" -#include "RDMAStack.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix @@ -61,7 +60,7 @@ Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(ne } } -Infiniband::Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c), stack(s) +Infiniband::Infiniband(CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c) { device = device_list.get_device(device_name.c_str()); assert(device); @@ -124,7 +123,7 @@ ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_s * QueuePair on success or NULL if init fails * See QueuePair::QueuePair for parameter documentation. */ -Infiniband::QueuePair* Infiniband::create_queue_pair(ibv_qp_type type) +Infiniband::QueuePair* Infiniband::create_queue_pair(CompletionQueue *c, ibv_qp_type type) { Infiniband::CompletionChannel* cc = create_comp_channel(); if (!cc) @@ -137,8 +136,7 @@ Infiniband::QueuePair* Infiniband::create_queue_pair(ibv_qp_type type) return NULL; } - RDMAWorker* w = static_cast(stack->get_worker()); - Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, w->get_tx_cq(), cq, max_send_wr, max_recv_wr); + Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, c, cq, max_send_wr, max_recv_wr); if (qp->init()) { delete cc; delete cq; @@ -208,6 +206,39 @@ int Infiniband::QueuePair::init() return 0; } +/** + * Change RC QueuePair into the ERROR state. This is necessary modify + * the Queue Pair into the Error state and poll all of the relevant + * Work Completions prior to destroying a Queue Pair. + * Since destroying a Queue Pair does not guarantee that its Work + * Completions are removed from the CQ upon destruction. Even if the + * Work Completions are already in the CQ, it might not be possible to + * retrieve them. If the Queue Pair is associated with an SRQ, it is + * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED + * + * \return + * -errno if the QueuePair can't switch to ERROR + * 0 for success. + */ +int Infiniband::QueuePair::to_dead() +{ + if (dead) + return 0; + ibv_qp_attr qpa; + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_ERR; + + int mask = IBV_QP_STATE; + int ret = ibv_modify_qp(qp, &qpa, mask); + if (ret) { + lderr(infiniband.cct) << __func__ << " failed to transition to ERROR state: " + << cpp_strerror(errno) << dendl; + return -errno; + } + dead = true; + return ret; +} + int Infiniband::post_chunk(Chunk* chunk) { ibv_sge isge; @@ -284,7 +315,8 @@ Infiniband::QueuePair::QueuePair( initial_psn(0), max_send_wr(max_send_wr), max_recv_wr(max_recv_wr), - q_key(q_key) + q_key(q_key), + dead(false) { initial_psn = lrand48() & 0xffffff; if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) { diff --git a/src/msg/async/rdma/Infiniband.h b/src/msg/async/rdma/Infiniband.h index 22dee795de44..3ed5f85d3945 100644 --- a/src/msg/async/rdma/Infiniband.h +++ b/src/msg/async/rdma/Infiniband.h @@ -284,10 +284,9 @@ class Infiniband { return 0; } - int take_back(Chunk* ck) { + void take_back(Chunk* ck) { Mutex::Locker l(lock); free_chunks.push_back(ck); - return 0; } int get_buffers(std::vector &chunks, size_t bytes) { @@ -312,7 +311,7 @@ class Infiniband { uint32_t chunk_size; Mutex lock; std::vector free_chunks; - set all_chunks; + std::set all_chunks; char* base; }; @@ -357,9 +356,11 @@ class Infiniband { send = new Cluster(*this, size); send->add(tx_num); } - int return_tx(Chunk* c) { - c->clear(); - return send->take_back(c); + void return_tx(std::vector &chunks) { + for (auto c : chunks) { + c->clear(); + send->take_back(c); + } } int get_send_buffers(std::vector &c, size_t bytes) { @@ -396,8 +397,7 @@ class Infiniband { public: NetHandler net; - RDMAStack* stack; - explicit Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name); + explicit Infiniband(CephContext *c, const std::string &device_name); /** * Destroy an Infiniband object. @@ -459,11 +459,6 @@ class Infiniband { class QueuePair { public: QueuePair(Infiniband& infiniband, ibv_qp_type type,int ib_physical_port, ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0); - // exists solely as superclass constructor for MockQueuePair derivative - explicit QueuePair(Infiniband& infiniband): - infiniband(infiniband), type(IBV_QPT_RC), ctxt(NULL), ib_physical_port(-1), - pd(NULL), srq(NULL), qp(NULL), txcq(NULL), rxcq(NULL), - initial_psn(-1) {} ~QueuePair(); int init(); @@ -551,9 +546,9 @@ class Infiniband { ibv_qp* get_qp() const { return qp; } Infiniband::CompletionQueue* get_tx_cq() const { return txcq; } Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; } - int to_reset(); int to_dead(); - int get_fd() { return fd; } + bool is_dead() const { return dead; } + private: Infiniband& infiniband; // Infiniband to which this QP belongs ibv_qp_type type; // QP type (IBV_QPT_RC, etc.) @@ -568,13 +563,13 @@ class Infiniband { uint32_t max_send_wr; uint32_t max_recv_wr; uint32_t q_key; - int fd; + bool dead; }; public: typedef MemoryManager::Cluster Cluster; typedef MemoryManager::Chunk Chunk; - QueuePair* create_queue_pair(ibv_qp_type type); + QueuePair* create_queue_pair(CompletionQueue *w, ibv_qp_type type); ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge); int post_chunk(Chunk* chunk); int post_channel_cluster(); @@ -592,6 +587,7 @@ class Infiniband { ibv_gid get_gid() { return device->get_gid(); } MemoryManager* get_memory_manager() { return memory_manager; } Device* get_device() { return device; } + int get_async_fd() { return device->ctxt->async_fd; } static const char* wc_status_to_string(int status); }; diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 3e93ebaa2553..c6594f30824f 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -101,57 +101,39 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) { ldout(cct, 20) << __func__ << " need to read bytes: " << len << " buffers size: " << buffers.size() << dendl; + if (error) + return -error; ssize_t read = 0; if (!buffers.empty()) read = read_buffers(buf,len); - static const int MAX_COMPLETIONS = 16; - ibv_wc wc[MAX_COMPLETIONS]; + std::vector cqe; + get_wc(cqe); + if (cqe.empty()) + return read == 0 ? -EAGAIN : read; - bool rearmed = false; - int n; - again: - n = rx_cq->poll_cq(MAX_COMPLETIONS, wc); - ldout(cct, 20) << __func__ << " poll completion queue got " << n << " responses."<< dendl; - for (int i = 0; i < n; ++i) { - ibv_wc* response = &wc[i]; - ldout(cct, 20) << __func__ << " cqe " << response->byte_len << " bytes." << dendl; + ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses."<< dendl; + for (size_t i = 0; i < cqe.size(); ++i) { + ibv_wc* response = &cqe[i]; + assert(response->status == IBV_WC_SUCCESS); + ldout(cct, 20) << __func__ << " cqe " << response->byte_len << " bytes." << dendl; Chunk* chunk = reinterpret_cast(response->wr_id); chunk->prepare_read(response->byte_len); - if (!response->byte_len) { - wait_close = true; - return 0; - } - if (response->status != IBV_WC_SUCCESS) { - lderr(cct) << __func__ << " poll cqe failed! " << " number: " << n << ", status: "<< response->status << cpp_strerror(errno) << dendl; - assert(0); + assert(!response->byte_len); + if (read == (ssize_t)len) { + buffers.push_back(chunk); + ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; + } else if (read + response->byte_len > (ssize_t)len) { + read += chunk->read(buf+read, (ssize_t)len-read); + buffers.push_back(chunk); + ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; } else { - if (read == (ssize_t)len) { - buffers.push_back(chunk); - ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; - } else if (read + response->byte_len > (ssize_t)len) { - read += chunk->read(buf+read, (ssize_t)len-read); - buffers.push_back(chunk); - ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; - } else { - read += chunk->read(buf+read, response->byte_len); - assert(infiniband->post_chunk(chunk) == 0); - } + read += chunk->read(buf+read, response->byte_len); + assert(infiniband->post_chunk(chunk) == 0); } } - if (n) - goto again; - if (!rearmed) { - rx_cq->rearm_notify(); - rearmed = true; - // Clean up cq events after rearm notify ensure no new incoming event - // arrived between polling and rearm - goto again; - } - if (read == 0) - return -EAGAIN; - return read; + return read == 0 ? -EAGAIN : read; } ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) @@ -180,57 +162,44 @@ ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) { - ssize_t size = 0; + if (error) + return -error; static const int MAX_COMPLETIONS = 16; ibv_wc wc[MAX_COMPLETIONS]; - - bool rearmed = false; - int n; - again: - n = rx_cq->poll_cq(MAX_COMPLETIONS, wc); - ldout(cct, 20) << __func__ << " pool completion queue got " << n << " responses."<< dendl; + ssize_t size; ibv_wc* response; Chunk* chunk; bool loaded = false; auto iter = buffers.begin(); - if(iter != buffers.end()) { + if (iter != buffers.end()) { chunk = *iter; - if (chunk->bound == 0) { - wait_close = true; - return 0; - } auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); buffers.erase(iter); loaded = true; size = chunk->bound; } - for (int i = 0; i < n; ++i) { + std::vector cqe; + get_wc(cqe); + if (cqe.empty()) + return size == 0 ? -EAGAIN : size; + + ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl; + + for (size_t i = 0; i < cqe.size(); ++i) { response = &wc[i]; chunk = reinterpret_cast(response->wr_id); chunk->prepare_read(response->byte_len); if(!loaded && i == 0) { - if (chunk->bound == 0) { - wait_close = true; - return 0; - } auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); size = chunk->bound; continue; } buffers.push_back(chunk); + iter++; } - if (n) - goto again; - if (!rearmed) { - rx_cq->rearm_notify(); - rearmed = true; - // Clean up cq events after rearm notify ensure no new incoming event - // arrived between polling and rearm - goto again; - } if (size == 0) return -EAGAIN; return size; @@ -238,18 +207,33 @@ ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) { + if (error) + return -error; size_t bytes = bl.length(); if (!bytes) return 0; - vector tx_buffers; - if (infiniband->get_tx_buffers(tx_buffers, bytes) < 0) { + pending_bl.claim_append(bl); + ssize_t r = submit(more); + if (r < 0 && r != -EAGAIN) + return r; + return bytes; +} + +ssize_t RDMAConnectedSocketImpl::submit(bool more) +{ + if (error) + return -error; + std::vector tx_buffers; + size_t bytes = pending_bl.length(); + if (worker->reserve_message_buffer(this, tx_buffers, bytes) < 0) { ldout(cct, 10) << __func__ << " no enough buffers" << dendl; - return 0; + pending_bl.claim_append(pending_bl); + return -EAGAIN; } ldout(cct, 20) << __func__ << " prepare " << bytes << " bytes, tx buffer count: " << tx_buffers.size() << dendl; vector::iterator current_buffer = tx_buffers.begin(); - list::const_iterator it = bl.buffers().begin(); - while (it != bl.buffers().end()) { + list::const_iterator it = pending_bl.buffers().begin(); + while (it != pending_bl.buffers().end()) { const uintptr_t addr = reinterpret_cast(it->c_str()); uint32_t copied = 0; // ldout(cct, 20) << __func__ << " app_buffer: " << addr << " length: " << it->length() << dendl; @@ -270,7 +254,7 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) return r; ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; - bl.clear(); + pending_bl.clear(); return bytes; } @@ -318,22 +302,3 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) } return 0; } - -void RDMAConnectedSocketImpl::fin() { - //ibv_sge list; - //memset(&list, 0, sizeof(list)); - ibv_send_wr wr; - memset(&wr, 0, sizeof(wr)); - wr.wr_id = reinterpret_cast(this); - wr.num_sge = 0; - //wr.sg_list = &list; - wr.opcode = IBV_WR_SEND; - wr.send_flags = IBV_SEND_SIGNALED; - ibv_send_wr* bad_tx_work_request; - if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { - lderr(cct) << __func__ << " failed to send FIN" - << "(most probably should be peer not ready): " - << cpp_strerror(errno) << dendl; - return ; - } -} diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc index 0a148f9f6e7b..6f0c7de8e60f 100644 --- a/src/msg/async/rdma/RDMAServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -78,8 +78,7 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt ldout(cct, 1) << __func__ << " recv msg not whole." << dendl; continue; } else { - //RDMAWorker* w = static_cast(infiniband->stack->get_worker()); - server = new RDMAConnectedSocketImpl(cct, infiniband, NULL, msg); + server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, worker, msg); msg = server->get_my_msg(); r = infiniband->send_udp_msg(server_setup_socket, msg, addr); server->activate(); diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index ca92fae38743..104fa3bf1700 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -14,8 +14,8 @@ * */ -#include "RDMAStack.h" #include "include/str_list.h" +#include "RDMAStack.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix @@ -23,9 +23,14 @@ static Infiniband* global_infiniband; +RDMAWorker::RDMAWorker(CephContext *c, unsigned i) + : Worker(c, i), stack(nullptr), infiniband(NULL), + tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL) +{} + int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) { - auto p = new RDMAServerSocketImpl(cct, infiniband, sa); + auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa); int r = p->listen(sa, opt); if (r < 0) { delete p; @@ -38,7 +43,7 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { - RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, this); + RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this); entity_addr_t sa; memcpy(&sa, &addr, sizeof(addr)); @@ -46,7 +51,7 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co ldout(cct, 20) << __func__ << " connecting to " << sa.get_sockaddr() << " : " << sa.get_port() << dendl; ldout(cct, 20) << __func__ << " my syn msg : < " << msg.qpn << ", " << msg.psn << ", " << msg.lid << ">"<< dendl; - client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0); + int client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0); if (client_setup_socket == -1) { lderr(cct) << __func__ << " failed to create client socket: " << strerror(errno) << dendl; return -errno; @@ -77,6 +82,7 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co assert(!r); std::unique_ptr csi(p); *socket = ConnectedSocket(std::move(csi)); + ::close(client_setup_socket); return 0; } @@ -85,7 +91,8 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) { if (!global_infiniband) global_infiniband = new Infiniband( - this, cct, cct->_conf->ms_async_rdma_device_name); + cct, cct->_conf->ms_async_rdma_device_name); + dispatcher = new RDMADispatcher(cct, global_infiniband); } void RDMAWorker::initialize() @@ -97,6 +104,45 @@ void RDMAWorker::initialize() memory_manager = infiniband->get_memory_manager(); } +int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) +{ + int r = infiniband->get_tx_buffers(c, bytes); + if (r == 0) { + stack->get_dispatcher()->inflight += c.size(); + return r; + } + + if (pending_sent_conns.back() != o) + pending_sent_conns.push_back(o); + return 0; +} + +/** + * Add the given Chunks to the given free queue. + * + * \param[in] chunks + * The Chunks to enqueue. + * \return + * 0 if success or -1 for failure + */ +int RDMAWorker::post_tx_buffer(std::vector &chunks) +{ + stack->get_dispatcher()->inflight -= chunks.size(); + infiniband->get_memory_manager()->return_tx(chunks); + while (!pending_sent_conns.empty()) { + RDMAConnectedSocketImpl *o = pending_sent_conns.front(); + ssize_t r = o->submit(false); + ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl; + if (r < 0) { + if (r == -EAGAIN) + break; + o->fault(); + } + pending_sent_conns.pop_front(); + } + return 0; +} + void RDMAWorker::handle_tx_event() { ldout(cct, 20) << __func__ << dendl; @@ -105,6 +151,8 @@ void RDMAWorker::handle_tx_event() static const int MAX_COMPLETIONS = 16; ibv_wc wc[MAX_COMPLETIONS]; + std::vector tx_chunks; + tx_chunks.reserve(MAX_COMPLETIONS); bool rearmed = false; int n; @@ -129,17 +177,23 @@ void RDMAWorker::handle_tx_event() << response->wr_id << ") status(" << response->status << "): " << infiniband->wc_status_to_string(response->status) << dendl; } - assert(0); + RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num); + if (conn) { + conn->fault(); + } else { + ldout(cct, 0) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl; + } } - if (memory_manager->is_tx_chunk(chunk)) - infiniband->get_memory_manager()->return_tx(chunk); - else - ldout(cct, 20) << __func__ << " chunk belongs to none " << dendl; + assert(memory_manager->is_tx_chunk(chunk)); + tx_chunks.push_back(chunk); } - if (n) + if (n) { + post_tx_buffer(tx_chunks); + tx_chunks.clear(); goto again; + } if (!rearmed) { tx_cq->rearm_notify(); @@ -148,5 +202,113 @@ void RDMAWorker::handle_tx_event() // arrived between polling and rearm goto again; } + ldout(cct, 20) << __func__ << " leaving handle_tx_event. " << dendl; } + +RDMADispatcher::~RDMADispatcher() +{ + done = true; + t.join(); + assert(qp_conns.empty()); + while (!dead_queue_pairs.empty()) { + delete dead_queue_pairs.back(); + dead_queue_pairs.pop_back(); + } + + rx_cc->ack_events(); + delete rx_cq; + delete rx_cc; + delete async_handler; +} + +void RDMADispatcher::handle_async_event() +{ + ldout(cct, 20) << __func__ << dendl; + while (1) { + ibv_async_event async_event; + if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) { + if (errno != EAGAIN) + lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno + << " " << cpp_strerror(errno) << ")" << dendl; + return; + } + // FIXME: Currently we must ensure no other factor make QP in ERROR state, + // otherwise this qp can't be deleted in current cleanup flow. + if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) { + uint64_t qpn = async_event.element.qp->qp_num; + ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp + << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; + RDMAConnectedSocketImpl *conn = get_conn_by_qp(qpn); + if (!conn) { + ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl; + } else { + ldout(cct, 0) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl; + conn->fault(); + erase_qpn(qpn); + } + } else { + ldout(cct, 0) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt + << " evt: " << ibv_event_type_str(async_event.event_type) + << dendl; + } + ibv_ack_async_event(&async_event); + } +} + + +void RDMADispatcher::polling() +{ + static int MAX_COMPLETIONS = 32; + ibv_wc wc[MAX_COMPLETIONS]; + + std::map > polled; + int i, n; + while (!done) { + n = rx_cq->poll_cq(MAX_COMPLETIONS, wc); + if (!n) { + // NOTE: Has TX just transitioned to idle? We should do it when idle! + // It's now safe to delete queue pairs (see comment by declaration + // for dead_queue_pairs). + // Additionally, don't delete qp while outstanding_buffers isn't empty, + // because we need to check qp's state before sending + if (!inflight.load()) { + while (!dead_queue_pairs.empty()) { + ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl; + delete dead_queue_pairs.back(); + dead_queue_pairs.pop_back(); + } + } + handle_async_event(); + continue; + } + + ldout(cct, 20) << __func__ << " pool completion queue got " << n + << " responses."<< dendl; + for (i = 0; i < n; ++i) { + ibv_wc* response = &wc[i]; + Chunk* chunk = reinterpret_cast(response->wr_id); + ldout(cct, 20) << __func__ << " got chunk=" << response->wr_id << " qp:" << wc[i].qp_num << dendl; + + if (response->status != IBV_WC_SUCCESS) { + lderr(cct) << __func__ << " work request returned error for buffer(" << response->wr_id + << ") status(" << response->status << ":" + << ib->wc_status_to_string(response->status) << dendl; + ib->post_chunk(chunk); + continue; + } + + RDMAConnectedSocketImpl *conn = get_conn_by_qp(response->qp_num); + if (!conn) { + // discard buffer + ldout(cct, 0) << __func__ << " missing qp_num " << response->qp_num << ", discard bd " + << chunk << dendl; + continue; + } + polled[conn].push_back(*response); + } + for (auto &&i : polled) + i.first->pass_wc(std::move(i.second)); + polled.clear(); + } +} diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index 2ff1c0d79296..a1f815858519 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -17,27 +17,119 @@ #ifndef CEPH_MSG_RDMASTACK_H #define CEPH_MSG_RDMASTACK_H +#include + +#include +#include +#include + #include "common/ceph_context.h" #include "common/debug.h" #include "common/errno.h" #include "msg/async/Stack.h" -#include #include "Infiniband.h" class RDMAConnectedSocketImpl; +class RDMAStack; + +class RDMADispatcher { + typedef Infiniband::MemoryManager::Chunk Chunk; + typedef Infiniband::QueuePair QueuePair; + + std::thread t; + CephContext *cct; + Infiniband* ib; + Infiniband::CompletionQueue* rx_cq; // common completion queue for all transmits + Infiniband::CompletionChannel* rx_cc; + EventCallbackRef async_handler; + bool done = false; + Mutex lock; // protect `qp_conns + // qp_num -> InfRcConnection + // The main usage of `qp_conns` is looking up connection by qp_num, + // so the lifecycle of element in `qp_conns` is the lifecycle of qp. + //// make qp queue into dead state + /** + * 1. Connection call mark_down + * 2. Move the Queue Pair into the Error state(QueuePair::to_dead) + * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event) + * 4. Wait for CQ to be empty(handle_tx_event) + * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event) + * + * @param qp The qp needed to dead + */ + ceph::unordered_map > qp_conns; + + /// if a queue pair is closed when transmit buffers are active + /// on it, the transmit buffers never get returned via tx_cq. To + /// work around this problem, don't delete queue pairs immediately. Instead, + /// save them in this vector and delete them at a safe time, when there are + /// no outstanding transmit buffers to be lost. + std::vector dead_queue_pairs; + + class C_handle_cq_async : public EventCallback { + RDMADispatcher *dispatcher; + public: + C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {} + void do_request(int fd) { + // worker->handle_tx_event(); + dispatcher->handle_async_event(); + } + }; + + public: + std::atomic_ulong inflight; + explicit RDMADispatcher(CephContext* cct, Infiniband* i) + : t(&RDMADispatcher::polling, this), cct(cct), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock") { + rx_cc = ib->create_comp_channel(); + assert(rx_cc); + rx_cq = ib->create_comp_queue(rx_cc); + assert(rx_cq); + } + virtual ~RDMADispatcher(); + + void handle_async_event(); + void polling(); + int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) { + int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); + assert(fd >= 0); + Mutex::Locker l(lock); + assert(!qp_conns.count(qp->get_local_qp_number())); + qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi); + return fd; + } + RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp) { + Mutex::Locker l(lock); + auto it = qp_conns.find(qp); + if (it == qp_conns.end()) + return NULL; + if (it->second.first->is_dead()) + return NULL; + return it->second.second; + } + void erase_qpn(uint32_t qpn) { + Mutex::Locker l(lock); + auto it = qp_conns.find(qpn); + if (it == qp_conns.end()) + return ; + dead_queue_pairs.push_back(it->second.first); + qp_conns.erase(it); + } +}; + class RDMAWorker : public Worker { typedef Infiniband::CompletionQueue CompletionQueue; typedef Infiniband::CompletionChannel CompletionChannel; typedef Infiniband::MemoryManager::Chunk Chunk; typedef Infiniband::MemoryManager MemoryManager; - int client_setup_socket; - Infiniband* infiniband; - CompletionQueue* tx_cq; // common completion queue for all transmits - CompletionChannel* tx_cc; + RDMAStack *stack; + Infiniband *infiniband; + CompletionQueue *tx_cq; // common completion queue for all transmits + CompletionChannel *tx_cc; EventCallbackRef tx_handler; - MemoryManager* memory_manager; - vector to_delete; + MemoryManager *memory_manager; + std::list pending_sent_conns; + class C_handle_cq_tx : public EventCallback { RDMAWorker *worker; public: @@ -48,8 +140,7 @@ class RDMAWorker : public Worker { }; public: - explicit RDMAWorker(CephContext *c, unsigned i) - : Worker(c, i), infiniband(NULL), tx_handler(new C_handle_cq_tx(this)) {} + explicit RDMAWorker(CephContext *c, unsigned i); virtual ~RDMAWorker() { tx_cc->ack_events(); delete tx_cq; @@ -60,21 +151,16 @@ class RDMAWorker : public Worker { virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override; virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; virtual void initialize() override; - void handle_tx_event(); CompletionQueue* get_tx_cq() { return tx_cq; } - void remove_to_delete(RDMAConnectedSocketImpl* csi) { - if (to_delete.empty()) - return ; - vector::iterator iter = to_delete.begin(); - for (; iter != to_delete.end(); ++iter) { - if(csi == *iter) { - to_delete.erase(iter); - } - } + RDMAStack *get_stack() { + return stack; } - void add_to_delete(RDMAConnectedSocketImpl* csi) { - to_delete.push_back(csi); + int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes); + int post_tx_buffer(std::vector &chunks); + void remove_pending_conn(RDMAConnectedSocketImpl *o) { + pending_sent_conns.remove(o); } + void handle_tx_event(); }; class RDMAConnectedSocketImpl : public ConnectedSocketImpl { @@ -89,24 +175,54 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { IBSYNMsg peer_msg; IBSYNMsg my_msg; int connected; + int error; Infiniband* infiniband; + RDMADispatcher* dispatcher; RDMAWorker* worker; std::vector buffers; - CompletionChannel* rx_cc; - CompletionQueue* rx_cq; - bool wait_close; + int notify_fd; + bufferlist pending_bl; + + Mutex lock; + std::vector wc; + + void notify() { + uint64_t i = 1; + assert(write(notify_fd, &i, sizeof(i)) == sizeof(i)); + } + ssize_t read_buffers(char* buf, size_t len); + int post_work_request(std::vector&); public: - RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMAWorker* w, IBSYNMsg im = IBSYNMsg()) : cct(cct), peer_msg(im), infiniband(ib), worker(w), wait_close(false) { - qp = infiniband->create_queue_pair(IBV_QPT_RC); - rx_cq = qp->get_rx_cq(); - rx_cc = rx_cq->get_cc(); + RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, + RDMAWorker *w, IBSYNMsg im = IBSYNMsg()) + : cct(cct), peer_msg(im), connected(0), error(0), infiniband(ib), + dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock") { + qp = infiniband->create_queue_pair(w->get_tx_cq(), IBV_QPT_RC); my_msg.qpn = qp->get_local_qp_number(); my_msg.psn = qp->get_initial_psn(); my_msg.lid = infiniband->get_lid(); my_msg.gid = infiniband->get_gid(); + notify_fd = dispatcher->register_qp(qp, this); + } + virtual ~RDMAConnectedSocketImpl() { + worker->remove_pending_conn(this); } + void pass_wc(std::vector &&v) { + Mutex::Locker l(lock); + if (wc.empty()) + wc = std::move(v); + else + wc.insert(wc.end(), v.begin(), v.end()); + notify(); + } + void get_wc(std::vector &w) { + Mutex::Locker l(lock); + if (wc.empty()) + return ; + w.swap(wc); + } virtual int is_connected() override { return connected; } @@ -114,34 +230,32 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { virtual ssize_t zero_copy_read(bufferptr &data) override; virtual ssize_t send(bufferlist &bl, bool more) override; virtual void shutdown() override { + if (qp) { + qp->to_dead(); + qp = NULL; + } } virtual void close() override { - if (!wait_close) { - fin(); - worker->add_to_delete(this); - } else { - clear_all(); + if (qp) { + qp->to_dead(); + qp = NULL; } } virtual int fd() const override { - return rx_cc->get_fd(); + return notify_fd; } - void clear_all() { - delete qp; - rx_cc->ack_events(); - delete rx_cq; - rx_cq = NULL; - if (!wait_close) - worker->remove_to_delete(this); + void fault() { + if (qp) { + qp->to_dead(); + qp = NULL; + } + error = ECONNRESET; + notify(); } + ssize_t submit(bool more); int activate(); - ssize_t read_buffers(char* buf, size_t len); - int poll_cq(int num_entries, ibv_wc *ret_wc_array); IBSYNMsg get_my_msg() { return my_msg; } - IBSYNMsg get_peer_msg() { return peer_msg; } void set_peer_msg(IBSYNMsg m) { peer_msg = m ;} - int post_work_request(std::vector&); - void fin(); }; @@ -150,13 +264,19 @@ class RDMAServerSocketImpl : public ServerSocketImpl { NetHandler net; int server_setup_socket; Infiniband* infiniband; + RDMADispatcher *dispatcher; + RDMAWorker *worker; entity_addr_t sa; + public: - RDMAServerSocketImpl(CephContext *cct, Infiniband* i, entity_addr_t& a) - : cct(cct), net(cct), infiniband(i), sa(a) {} + RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a) + : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) {} int listen(entity_addr_t &sa, const SocketOptions &opt); virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out) override; - virtual void abort_accept() override {} + virtual void abort_accept() override { + if (server_setup_socket >= 0) + ::close(server_setup_socket); + } virtual int fd() const override { return server_setup_socket; } @@ -165,9 +285,11 @@ class RDMAServerSocketImpl : public ServerSocketImpl { class RDMAStack : public NetworkStack { vector threads; + RDMADispatcher *dispatcher; public: explicit RDMAStack(CephContext *cct, const string &t); + virtual ~RDMAStack() { delete dispatcher; } virtual bool support_zero_copy_read() const override { return true; } //virtual bool support_local_listen_table() const { return true; } @@ -179,6 +301,7 @@ class RDMAStack : public NetworkStack { assert(threads.size() > i && threads[i].joinable()); threads[i].join(); } + RDMADispatcher *get_dispatcher() { return dispatcher; } }; #endif