From 3662403e997b6400845b7e22e57c74772dfb546c Mon Sep 17 00:00:00 2001 From: Amir Vadai Date: Mon, 2 Jan 2017 10:36:32 +0200 Subject: [PATCH] RDMA: Move defenitions from RDMAStack.h into .cc Signed-off-by: Amir Vadai --- src/msg/async/rdma/RDMAConnectedSocketImpl.cc | 94 ++++ src/msg/async/rdma/RDMAServerSocketImpl.cc | 11 + src/msg/async/rdma/RDMAStack.cc | 472 ++++++++++++------ src/msg/async/rdma/RDMAStack.h | 269 ++-------- 4 files changed, 460 insertions(+), 386 deletions(-) diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 1f97c3e21dd8..f8f6e79e46c4 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -20,6 +20,58 @@ #undef dout_prefix #define dout_prefix *_dout << " RDMAConnectedSocketImpl " +RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, + RDMAWorker *w) + : cct(cct), connected(0), error(0), infiniband(ib), + dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), + is_server(false), con_handler(new C_handle_connection(this)), + active(false), detached(false) +{ + qp = infiniband->create_queue_pair( + cct, s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC); + my_msg.qpn = qp->get_local_qp_number(); + my_msg.psn = qp->get_initial_psn(); + my_msg.lid = infiniband->get_lid(); + my_msg.peer_qpn = 0; + my_msg.gid = infiniband->get_gid(); + notify_fd = dispatcher->register_qp(qp, this); +} + +RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() +{ + worker->remove_pending_conn(this); + dispatcher->erase_qpn(my_msg.qpn); + cleanup(); + if (notify_fd >= 0) + ::close(notify_fd); + if (tcp_fd >= 0) + ::close(tcp_fd); + error = ECONNRESET; + Mutex::Locker l(lock); + for (unsigned i=0; i < wc.size(); ++i) + infiniband->recall_chunk(reinterpret_cast(wc[i].wr_id)); + for (unsigned i=0; i < buffers.size(); ++i) + infiniband->recall_chunk(buffers[i]); +} + +void RDMAConnectedSocketImpl::pass_wc(std::vector &&v) +{ + Mutex::Locker l(lock); + if (wc.empty()) + wc = std::move(v); + else + wc.insert(wc.end(), v.begin(), v.end()); + notify(); +} + +void RDMAConnectedSocketImpl::get_wc(std::vector &w) +{ + Mutex::Locker l(lock); + if (wc.empty()) + return ; + w.swap(wc); +} + int RDMAConnectedSocketImpl::activate() { ibv_qp_attr qpa; @@ -467,3 +519,45 @@ void RDMAConnectedSocketImpl::cleanup() { con_handler = nullptr; } } + +void RDMAConnectedSocketImpl::notify() +{ + uint64_t i = 1; + assert(write(notify_fd, &i, sizeof(i)) == sizeof(i)); +} + +void RDMAConnectedSocketImpl::shutdown() +{ + if (!error) + fin(); + error = ECONNRESET; + active = false; +} + +void RDMAConnectedSocketImpl::close() +{ + if (!error) + fin(); + error = ECONNRESET; + active = false; +} + +void RDMAConnectedSocketImpl::fault() +{ + /*if (qp) { + qp->to_dead(); + qp = NULL; + }*/ + error = ECONNRESET; + connected = 1; + notify(); +} + +void RDMAConnectedSocketImpl::set_accept_fd(int sd) +{ + tcp_fd = sd; + is_server = true; + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); + }, true); +} diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc index 03be56da7598..231865dd9fac 100644 --- a/src/msg/async/rdma/RDMAServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -21,6 +21,11 @@ #undef dout_prefix #define dout_prefix *_dout << " RDMAServerSocketImpl " +RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a) + : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) +{ +} + int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt) { int rc = 0; @@ -106,3 +111,9 @@ int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt return 0; } + +void RDMAServerSocketImpl::abort_accept() +{ + if (server_setup_socket >= 0) + ::close(server_setup_socket); +} diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 4d5c5077d75f..d33ab1a0c2e1 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -25,167 +25,6 @@ static Infiniband* global_infiniband; -RDMAWorker::RDMAWorker(CephContext *c, unsigned i) - : Worker(c, i), stack(nullptr), infiniband(NULL), - tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false) -{ -} - -int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) -{ - auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa); - int r = p->listen(sa, opt); - if (r < 0) { - delete p; - return r; - } - - *sock = ServerSocket(std::unique_ptr(p)); - return 0; -} - -int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) -{ - RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this); - int r = p->try_connect(addr, opts); - - if (r < 0) { - ldout(cct, 1) << __func__ << " try connecting failed." << dendl; - return r; - } - std::unique_ptr csi(p); - *socket = ConnectedSocket(std::move(csi)); - return 0; -} - - -RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) -{ - if (!global_infiniband) - global_infiniband = new Infiniband( - cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); - ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; - dispatcher = new RDMADispatcher(cct, global_infiniband, this); - unsigned num = get_num_worker(); - for (unsigned i = 0; i < num; ++i) { - RDMAWorker* w = dynamic_cast(get_worker(i)); - w->set_ib(global_infiniband); - w->set_stack(this); - } - ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl; -} - -void RDMAWorker::initialize() -{ - if (!dispatcher) { - dispatcher = stack->get_dispatcher(); - notify_fd = dispatcher->register_worker(this); - center.create_file_event(notify_fd, EVENT_READABLE, tx_handler); - memory_manager = infiniband->get_memory_manager(); - } -} - -int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) -{ - int r = infiniband->get_tx_buffers(c, bytes); - if (r > 0) { - stack->get_dispatcher()->inflight += c.size(); - ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl; - return r; - } - assert(r == 0); - - if (pending_sent_conns.back() != o) - pending_sent_conns.push_back(o); - dispatcher->pending_buffers(this); - return r; -} - -/** - * Add the given Chunks to the given free queue. - * - * \param[in] chunks - * The Chunks to enqueue. - * \return - * 0 if success or -1 for failure - */ -int RDMAWorker::post_tx_buffer(std::vector &chunks) -{ - if (chunks.empty()) - return 0; - - stack->get_dispatcher()->inflight -= chunks.size(); - memory_manager->return_tx(chunks); - ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl; - - pended = false; - std::set done; - while (!pending_sent_conns.empty()) { - RDMAConnectedSocketImpl *o = pending_sent_conns.front(); - if (done.count(o) == 0) { - done.insert(o); - } else { - pending_sent_conns.pop_front(); - continue; - } - ssize_t r = o->submit(false); - ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl; - if (r < 0) { - if (r == -EAGAIN) - break; - o->fault(); - } - pending_sent_conns.pop_front(); - } - return 0; -} - -void RDMAWorker::handle_tx_event() -{ - std::vector tx_chunks; - std::vector cqe; - get_wc(cqe); - - for (size_t i = 0; i < cqe.size(); ++i) { - ibv_wc* response = &cqe[i]; - Chunk* chunk = reinterpret_cast(response->wr_id); - ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl; - - if (response->status != IBV_WC_SUCCESS) { - if (response->status == IBV_WC_RETRY_EXC_ERR) { - ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl; - } else if (response->status == IBV_WC_WR_FLUSH_ERR) { - ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp=" - << response->qp_num << " should be down while this WR=" << response->wr_id - << " still in flight." << dendl; - } else { - ldout(cct, 1) << __func__ << " send work request returned error for buffer(" - << response->wr_id << ") status(" << response->status << "): " - << infiniband->wc_status_to_string(response->status) << dendl; - } - RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num); - if (conn) { - ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi - conn->fault(); - } else { - ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl; - } - } - - //assert(memory_manager->is_tx_chunk(chunk)); - if (memory_manager->is_tx_chunk(chunk)) { - tx_chunks.push_back(chunk); - } else { - ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin - } - } - - post_tx_buffer(tx_chunks); - - ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl; - dispatcher->notify_pending_workers(); -} - RDMADispatcher::~RDMADispatcher() { done = true; @@ -208,6 +47,18 @@ RDMADispatcher::~RDMADispatcher() delete async_handler; } +RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s) + : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), + w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s) +{ + rx_cc = ib->create_comp_channel(c); + assert(rx_cc); + rx_cq = ib->create_comp_queue(c, rx_cc); + assert(rx_cq); + t = std::thread(&RDMADispatcher::polling, this); + cct->register_fork_watcher(this); +} + void RDMADispatcher::handle_async_event() { ldout(cct, 20) << __func__ << dendl; @@ -358,3 +209,302 @@ void RDMADispatcher::notify_pending_workers() { pending_workers.front()->pass_wc(std::move(vector())); pending_workers.pop_front(); } + +int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) +{ + int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); + assert(fd >= 0); + Mutex::Locker l(lock); + assert(!qp_conns.count(qp->get_local_qp_number())); + qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi); + return fd; +} + +int RDMADispatcher::register_worker(RDMAWorker* w) +{ + int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); + assert(fd >= 0); + Mutex::Locker l(w_lock); + workers[w] = fd; + return fd; +} + +void RDMADispatcher::pending_buffers(RDMAWorker* w) +{ + Mutex::Locker l(w_lock); + pending_workers.push_back(w); +} + +RDMAWorker* RDMADispatcher::get_worker_from_list() +{ + Mutex::Locker l(w_lock); + if (pending_workers.empty()) + return nullptr; + else { + RDMAWorker* w = pending_workers.front(); + pending_workers.pop_front(); + return w; + } +} + +RDMAConnectedSocketImpl* RDMADispatcher::get_conn_by_qp(uint32_t qp) +{ + Mutex::Locker l(lock); + auto it = qp_conns.find(qp); + if (it == qp_conns.end()) + return nullptr; + if (it->second.first->is_dead()) + return nullptr; + return it->second.second; +} + +RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp) +{ + auto it = qp_conns.find(qp); + if (it == qp_conns.end()) + return nullptr; + if (it->second.first->is_dead()) + return nullptr; + return it->second.second; +} + +void RDMADispatcher::erase_qpn(uint32_t qpn) +{ + Mutex::Locker l(lock); + auto it = qp_conns.find(qpn); + if (it == qp_conns.end()) + return ; + dead_queue_pairs.push_back(it->second.first); + qp_conns.erase(it); +} + +void RDMADispatcher::handle_pre_fork() +{ + done = true; + t.join(); + done = false; +} + +void RDMADispatcher::handle_post_fork() +{ + t = std::thread(&RDMADispatcher::polling, this); +} + + +RDMAWorker::RDMAWorker(CephContext *c, unsigned i) + : Worker(c, i), stack(nullptr), infiniband(NULL), + tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false) +{ +} + +RDMAWorker::~RDMAWorker() +{ + delete tx_handler; + if (notify_fd >= 0) + ::close(notify_fd); +} + +void RDMAWorker::initialize() +{ + if (!dispatcher) { + dispatcher = stack->get_dispatcher(); + notify_fd = dispatcher->register_worker(this); + center.create_file_event(notify_fd, EVENT_READABLE, tx_handler); + memory_manager = infiniband->get_memory_manager(); + } +} + +void RDMAWorker::notify() +{ + uint64_t i = 1; + assert(write(notify_fd, &i, sizeof(i)) == sizeof(i)); +} + +void RDMAWorker::pass_wc(std::vector &&v) +{ + Mutex::Locker l(lock); + if (wc.empty()) + wc = std::move(v); + else + wc.insert(wc.end(), v.begin(), v.end()); + notify(); +} + +void RDMAWorker::add_pending_conn(RDMAConnectedSocketImpl* o) +{ + pending_sent_conns.push_back(o); + if (!pended) { + dispatcher->pending_buffers(this); + pended = true; + } +} + +void RDMAWorker::get_wc(std::vector &w) +{ + Mutex::Locker l(lock); + if (wc.empty()) + return ; + w.swap(wc); +} + +int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) +{ + auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa); + int r = p->listen(sa, opt); + if (r < 0) { + delete p; + return r; + } + + *sock = ServerSocket(std::unique_ptr(p)); + return 0; +} + +int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) +{ + RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this); + int r = p->try_connect(addr, opts); + + if (r < 0) { + ldout(cct, 1) << __func__ << " try connecting failed." << dendl; + return r; + } + std::unique_ptr csi(p); + *socket = ConnectedSocket(std::move(csi)); + return 0; +} + +int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes) +{ + int r = infiniband->get_tx_buffers(c, bytes); + if (r > 0) { + stack->get_dispatcher()->inflight += c.size(); + ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl; + return r; + } + assert(r == 0); + + if (pending_sent_conns.back() != o) + pending_sent_conns.push_back(o); + dispatcher->pending_buffers(this); + return r; +} + +/** + * Add the given Chunks to the given free queue. + * + * \param[in] chunks + * The Chunks to enqueue. + * \return + * 0 if success or -1 for failure + */ +int RDMAWorker::post_tx_buffer(std::vector &chunks) +{ + if (chunks.empty()) + return 0; + + stack->get_dispatcher()->inflight -= chunks.size(); + memory_manager->return_tx(chunks); + ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl; + + pended = false; + std::set done; + while (!pending_sent_conns.empty()) { + RDMAConnectedSocketImpl *o = pending_sent_conns.front(); + if (done.count(o) == 0) { + done.insert(o); + } else { + pending_sent_conns.pop_front(); + continue; + } + ssize_t r = o->submit(false); + ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl; + if (r < 0) { + if (r == -EAGAIN) + break; + o->fault(); + } + pending_sent_conns.pop_front(); + } + return 0; +} + +void RDMAWorker::handle_tx_event() +{ + std::vector tx_chunks; + std::vector cqe; + get_wc(cqe); + + for (size_t i = 0; i < cqe.size(); ++i) { + ibv_wc* response = &cqe[i]; + Chunk* chunk = reinterpret_cast(response->wr_id); + ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl; + + if (response->status != IBV_WC_SUCCESS) { + if (response->status == IBV_WC_RETRY_EXC_ERR) { + ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl; + } else if (response->status == IBV_WC_WR_FLUSH_ERR) { + ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp=" + << response->qp_num << " should be down while this WR=" << response->wr_id + << " still in flight." << dendl; + } else { + ldout(cct, 1) << __func__ << " send work request returned error for buffer(" + << response->wr_id << ") status(" << response->status << "): " + << infiniband->wc_status_to_string(response->status) << dendl; + } + RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num); + if (conn) { + ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi + conn->fault(); + } else { + ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl; + } + } + + //assert(memory_manager->is_tx_chunk(chunk)); + if (memory_manager->is_tx_chunk(chunk)) { + tx_chunks.push_back(chunk); + } else { + ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin + } + } + + post_tx_buffer(tx_chunks); + + ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl; + dispatcher->notify_pending_workers(); +} + + +RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) +{ + if (!global_infiniband) + global_infiniband = new Infiniband( + cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num); + ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl; + dispatcher = new RDMADispatcher(cct, global_infiniband, this); + unsigned num = get_num_worker(); + for (unsigned i = 0; i < num; ++i) { + RDMAWorker* w = dynamic_cast(get_worker(i)); + w->set_ib(global_infiniband); + w->set_stack(this); + } + ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl; +} + +RDMAStack::~RDMAStack() +{ + delete dispatcher; +} + +void RDMAStack::spawn_worker(unsigned i, std::function &&func) +{ + threads.resize(i+1); + threads[i] = std::move(std::thread(func)); +} + +void RDMAStack::join_worker(unsigned i) +{ + assert(threads.size() > i && threads[i].joinable()); + threads[i].join(); +} diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index f8574068d1ed..b3c726d90d68 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -72,6 +72,7 @@ class RDMADispatcher : public CephContext::ForkWatcher { ceph::unordered_map workers;; std::list pending_workers; RDMAStack* stack; + class C_handle_cq_async : public EventCallback { RDMADispatcher *dispatcher; public: @@ -83,87 +84,24 @@ class RDMADispatcher : public CephContext::ForkWatcher { }; public: - std::atomic inflight = {0}; - explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s) - : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"), - w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s) { - rx_cc = ib->create_comp_channel(c); - assert(rx_cc); - rx_cq = ib->create_comp_queue(c, rx_cc); - assert(rx_cq); - t = std::thread(&RDMADispatcher::polling, this); - cct->register_fork_watcher(this); - } + explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s); virtual ~RDMADispatcher(); void handle_async_event(); void polling(); - int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) { - int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); - assert(fd >= 0); - Mutex::Locker l(lock); - assert(!qp_conns.count(qp->get_local_qp_number())); - qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi); - return fd; - } - int register_worker(RDMAWorker* w) { - int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK); - assert(fd >= 0); - Mutex::Locker l(w_lock); - workers[w] = fd; - return fd; - } - void pending_buffers(RDMAWorker* w) { - Mutex::Locker l(w_lock); - pending_workers.push_back(w); - } - RDMAStack* get_stack() { - return stack; - } - RDMAWorker* get_worker_from_list() { - Mutex::Locker l(w_lock); - if (pending_workers.empty()) - return nullptr; - else { - RDMAWorker* w = pending_workers.front(); - pending_workers.pop_front(); - return w; - } - } - RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp) { - Mutex::Locker l(lock); - auto it = qp_conns.find(qp); - if (it == qp_conns.end()) - return nullptr; - if (it->second.first->is_dead()) - return nullptr; - return it->second.second; - } - RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp) { - auto it = qp_conns.find(qp); - if (it == qp_conns.end()) - return nullptr; - if (it->second.first->is_dead()) - return nullptr; - return it->second.second; - } - void erase_qpn(uint32_t qpn) { - Mutex::Locker l(lock); - auto it = qp_conns.find(qpn); - if (it == qp_conns.end()) - return ; - dead_queue_pairs.push_back(it->second.first); - qp_conns.erase(it); - } + int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); + int register_worker(RDMAWorker* w); + void pending_buffers(RDMAWorker* w); + RDMAStack* get_stack() { return stack; } + RDMAWorker* get_worker_from_list(); + RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp); + RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); + void erase_qpn(uint32_t qpn); Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; } void notify_pending_workers(); - virtual void handle_pre_fork() override { - done = true; - t.join(); - done = false; - } - virtual void handle_post_fork() override { - t = std::thread(&RDMADispatcher::polling, this); - } + virtual void handle_pre_fork() override; + virtual void handle_post_fork() override; + + std::atomic inflight = {0}; }; @@ -183,6 +121,7 @@ class RDMAWorker : public Worker { Mutex lock; std::vector wc; bool pended; + class C_handle_cq_tx : public EventCallback { RDMAWorker *worker; public: @@ -194,54 +133,21 @@ class RDMAWorker : public Worker { public: explicit RDMAWorker(CephContext *c, unsigned i); - virtual ~RDMAWorker() { - delete tx_handler; - if (notify_fd >= 0) - ::close(notify_fd); - } - void notify() { - uint64_t i = 1; - assert(write(notify_fd, &i, sizeof(i)) == sizeof(i)); - } - void pass_wc(std::vector &&v) { - Mutex::Locker l(lock); - if (wc.empty()) - wc = std::move(v); - else - wc.insert(wc.end(), v.begin(), v.end()); - notify(); - } - void get_wc(std::vector &w) { - Mutex::Locker l(lock); - if (wc.empty()) - return ; - w.swap(wc); - } + virtual ~RDMAWorker(); + void notify(); + void pass_wc(std::vector &&v); + void get_wc(std::vector &w); virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override; virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; virtual void initialize() override; - RDMAStack *get_stack() { - return stack; - } + RDMAStack *get_stack() { return stack; } int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes); int post_tx_buffer(std::vector &chunks); - void add_pending_conn(RDMAConnectedSocketImpl* o) { - pending_sent_conns.push_back(o); - if (!pended) { - dispatcher->pending_buffers(this); - pended = true; - } - } - void remove_pending_conn(RDMAConnectedSocketImpl *o) { - pending_sent_conns.remove(o); - } + void add_pending_conn(RDMAConnectedSocketImpl* o); + void remove_pending_conn(RDMAConnectedSocketImpl *o) { pending_sent_conns.remove(o); } void handle_tx_event(); - void set_ib(Infiniband* ib) { - infiniband = ib; - } - void set_stack(RDMAStack *s) { - stack = s; - } + void set_ib(Infiniband* ib) { infiniband = ib; } + void set_stack(RDMAStack *s) { stack = s; } }; class RDMAConnectedSocketImpl : public ConnectedSocketImpl { @@ -273,109 +179,35 @@ class RDMAConnectedSocketImpl : public ConnectedSocketImpl { bool active;// qp is active ? bool detached; - void notify() { - uint64_t i = 1; - assert(write(notify_fd, &i, sizeof(i)) == sizeof(i)); - } + void notify(); ssize_t read_buffers(char* buf, size_t len); int post_work_request(std::vector&); public: RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, - RDMAWorker *w) - : cct(cct), connected(0), error(0), infiniband(ib), - dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), - is_server(false), con_handler(new C_handle_connection(this)), - active(false), detached(false) { - qp = infiniband->create_queue_pair( - cct, s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC); - my_msg.qpn = qp->get_local_qp_number(); - my_msg.psn = qp->get_initial_psn(); - my_msg.lid = infiniband->get_lid(); - my_msg.peer_qpn = 0; - my_msg.gid = infiniband->get_gid(); - notify_fd = dispatcher->register_qp(qp, this); - } - - virtual ~RDMAConnectedSocketImpl() { - worker->remove_pending_conn(this); - dispatcher->erase_qpn(my_msg.qpn); - cleanup(); - if (notify_fd >= 0) - ::close(notify_fd); - if (tcp_fd >= 0) - ::close(tcp_fd); - error = ECONNRESET; - Mutex::Locker l(lock); - for (unsigned i=0; i < wc.size(); ++i) - infiniband->recall_chunk(reinterpret_cast(wc[i].wr_id)); - for (unsigned i=0; i < buffers.size(); ++i) - infiniband->recall_chunk(buffers[i]); - } - - void pass_wc(std::vector &&v) { - Mutex::Locker l(lock); - if (wc.empty()) - wc = std::move(v); - else - wc.insert(wc.end(), v.begin(), v.end()); - notify(); - } + RDMAWorker *w); + virtual ~RDMAConnectedSocketImpl(); - void get_wc(std::vector &w) { - Mutex::Locker l(lock); - if (wc.empty()) - return ; - w.swap(wc); - } - - virtual int is_connected() override { - return connected; - } + void pass_wc(std::vector &&v); + void get_wc(std::vector &w); + virtual int is_connected() override { return connected; } virtual ssize_t read(char* buf, size_t len) override; virtual ssize_t zero_copy_read(bufferptr &data) override; virtual ssize_t send(bufferlist &bl, bool more) override; - virtual void shutdown() override { - if (!error) - fin(); - error = ECONNRESET; - active = false; - } - virtual void close() override { - if (!error) - fin(); - error = ECONNRESET; - active = false; - } - virtual int fd() const override { - return notify_fd; - } - void fault() { - /*if (qp) { - qp->to_dead(); - qp = NULL; - }*/ - error = ECONNRESET; - connected = 1; - notify(); - } - const char* get_qp_state() { - return Infiniband::qp_state_string(qp->get_state()); - } + virtual void shutdown() override; + virtual void close() override; + virtual int fd() const override { return notify_fd; } + void fault(); + const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); } ssize_t submit(bool more); int activate(); void fin(); void handle_connection(); void cleanup(); - void set_accept_fd(int sd) { - tcp_fd = sd; - is_server = true; - worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); - }, true); - } + void set_accept_fd(int sd); int try_connect(const entity_addr_t&, const SocketOptions &opt); + class C_handle_connection : public EventCallback { RDMAConnectedSocketImpl *csi; bool active; @@ -401,17 +233,12 @@ class RDMAServerSocketImpl : public ServerSocketImpl { entity_addr_t sa; public: - RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a) - : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) {} + RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a); + int listen(entity_addr_t &sa, const SocketOptions &opt); virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; - virtual void abort_accept() override { - if (server_setup_socket >= 0) - ::close(server_setup_socket); - } - virtual int fd() const override { - return server_setup_socket; - } + virtual void abort_accept() override; + virtual int fd() const override { return server_setup_socket; } int get_fd() { return server_setup_socket; } }; @@ -421,20 +248,12 @@ class RDMAStack : public NetworkStack { public: explicit RDMAStack(CephContext *cct, const string &t); - virtual ~RDMAStack() { - delete dispatcher; - } + virtual ~RDMAStack(); virtual bool support_zero_copy_read() const override { return false; } virtual bool nonblock_connect_need_writable_event() const { return false; } - virtual void spawn_worker(unsigned i, std::function &&func) override { - threads.resize(i+1); - threads[i] = std::move(std::thread(func)); - } - virtual void join_worker(unsigned i) override { - assert(threads.size() > i && threads[i].joinable()); - threads[i].join(); - } + virtual void spawn_worker(unsigned i, std::function &&func) override; + virtual void join_worker(unsigned i) override; RDMADispatcher *get_dispatcher() { return dispatcher; } }; -- 2.47.3