#include "Infiniband.h"\r
#include "common/errno.h"\r
#include "common/debug.h"\r
-#include "RDMAStack.h"\r
\r
#define dout_subsys ceph_subsys_ms\r
#undef dout_prefix\r
}\r
}\r
\r
-Infiniband::Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c), stack(s)\r
+Infiniband::Infiniband(CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c)\r
{\r
device = device_list.get_device(device_name.c_str());\r
assert(device);\r
* QueuePair on success or NULL if init fails\r
* See QueuePair::QueuePair for parameter documentation.\r
*/\r
-Infiniband::QueuePair* Infiniband::create_queue_pair(ibv_qp_type type)\r
+Infiniband::QueuePair* Infiniband::create_queue_pair(CompletionQueue *c, ibv_qp_type type)\r
{\r
Infiniband::CompletionChannel* cc = create_comp_channel();\r
if (!cc)\r
return NULL;\r
}\r
\r
- RDMAWorker* w = static_cast<RDMAWorker*>(stack->get_worker());\r
- Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, w->get_tx_cq(), cq, max_send_wr, max_recv_wr);\r
+ Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, c, cq, max_send_wr, max_recv_wr);\r
if (qp->init()) {\r
delete cc;\r
delete cq;\r
return 0;\r
}\r
\r
+/**\r
+ * Change RC QueuePair into the ERROR state. This is necessary modify\r
+ * the Queue Pair into the Error state and poll all of the relevant\r
+ * Work Completions prior to destroying a Queue Pair.\r
+ * Since destroying a Queue Pair does not guarantee that its Work\r
+ * Completions are removed from the CQ upon destruction. Even if the\r
+ * Work Completions are already in the CQ, it might not be possible to\r
+ * retrieve them. If the Queue Pair is associated with an SRQ, it is\r
+ * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED\r
+ *\r
+ * \return\r
+ * -errno if the QueuePair can't switch to ERROR\r
+ * 0 for success.\r
+ */\r
+int Infiniband::QueuePair::to_dead()\r
+{\r
+ if (dead)\r
+ return 0;\r
+ ibv_qp_attr qpa;\r
+ memset(&qpa, 0, sizeof(qpa));\r
+ qpa.qp_state = IBV_QPS_ERR;\r
+\r
+ int mask = IBV_QP_STATE;\r
+ int ret = ibv_modify_qp(qp, &qpa, mask);\r
+ if (ret) {\r
+ lderr(infiniband.cct) << __func__ << " failed to transition to ERROR state: "\r
+ << cpp_strerror(errno) << dendl;\r
+ return -errno;\r
+ }\r
+ dead = true;\r
+ return ret;\r
+}\r
+\r
int Infiniband::post_chunk(Chunk* chunk)\r
{\r
ibv_sge isge;\r
initial_psn(0),\r
max_send_wr(max_send_wr),\r
max_recv_wr(max_recv_wr),\r
- q_key(q_key)\r
+ q_key(q_key),\r
+ dead(false)\r
{\r
initial_psn = lrand48() & 0xffffff;\r
if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {\r
return 0;\r
}\r
\r
- int take_back(Chunk* ck) {\r
+ void take_back(Chunk* ck) {\r
Mutex::Locker l(lock);\r
free_chunks.push_back(ck);\r
- return 0;\r
}\r
\r
int get_buffers(std::vector<Chunk*> &chunks, size_t bytes) {\r
uint32_t chunk_size;\r
Mutex lock;\r
std::vector<Chunk*> free_chunks;\r
- set<Chunk*> all_chunks;\r
+ std::set<Chunk*> all_chunks;\r
char* base;\r
};\r
\r
send = new Cluster(*this, size);\r
send->add(tx_num);\r
}\r
- int return_tx(Chunk* c) {\r
- c->clear();\r
- return send->take_back(c);\r
+ void return_tx(std::vector<Chunk*> &chunks) {\r
+ for (auto c : chunks) {\r
+ c->clear();\r
+ send->take_back(c);\r
+ }\r
}\r
\r
int get_send_buffers(std::vector<Chunk*> &c, size_t bytes) {\r
\r
public:\r
NetHandler net;\r
- RDMAStack* stack;\r
- explicit Infiniband(RDMAStack* s, CephContext *c, const std::string &device_name);\r
+ explicit Infiniband(CephContext *c, const std::string &device_name);\r
\r
/**\r
* Destroy an Infiniband object.\r
class QueuePair {\r
public:\r
QueuePair(Infiniband& infiniband, ibv_qp_type type,int ib_physical_port, ibv_srq *srq, Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);\r
- // exists solely as superclass constructor for MockQueuePair derivative\r
- explicit QueuePair(Infiniband& infiniband):\r
- infiniband(infiniband), type(IBV_QPT_RC), ctxt(NULL), ib_physical_port(-1),\r
- pd(NULL), srq(NULL), qp(NULL), txcq(NULL), rxcq(NULL),\r
- initial_psn(-1) {}\r
~QueuePair();\r
\r
int init();\r
ibv_qp* get_qp() const { return qp; }\r
Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }\r
Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }\r
- int to_reset();\r
int to_dead();\r
- int get_fd() { return fd; }\r
+ bool is_dead() const { return dead; }\r
+\r
private:\r
Infiniband& infiniband; // Infiniband to which this QP belongs\r
ibv_qp_type type; // QP type (IBV_QPT_RC, etc.)\r
uint32_t max_send_wr;\r
uint32_t max_recv_wr;\r
uint32_t q_key;\r
- int fd;\r
+ bool dead;\r
};\r
\r
public:\r
typedef MemoryManager::Cluster Cluster;\r
typedef MemoryManager::Chunk Chunk;\r
- QueuePair* create_queue_pair(ibv_qp_type type);\r
+ QueuePair* create_queue_pair(CompletionQueue *w, ibv_qp_type type);\r
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);\r
int post_chunk(Chunk* chunk);\r
int post_channel_cluster();\r
ibv_gid get_gid() { return device->get_gid(); }\r
MemoryManager* get_memory_manager() { return memory_manager; }\r
Device* get_device() { return device; }\r
+ int get_async_fd() { return device->ctxt->async_fd; }\r
static const char* wc_status_to_string(int status);\r
};\r
\r
{\r
ldout(cct, 20) << __func__ << " need to read bytes: " << len << " buffers size: " << buffers.size() << dendl;\r
\r
+ if (error)\r
+ return -error;\r
ssize_t read = 0;\r
if (!buffers.empty())\r
read = read_buffers(buf,len);\r
\r
- static const int MAX_COMPLETIONS = 16;\r
- ibv_wc wc[MAX_COMPLETIONS];\r
+ std::vector<ibv_wc> cqe;\r
+ get_wc(cqe);\r
+ if (cqe.empty())\r
+ return read == 0 ? -EAGAIN : read;\r
\r
- bool rearmed = false;\r
- int n;\r
- again:\r
- n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);\r
- ldout(cct, 20) << __func__ << " poll completion queue got " << n << " responses."<< dendl;\r
- for (int i = 0; i < n; ++i) {\r
- ibv_wc* response = &wc[i];\r
- ldout(cct, 20) << __func__ << " cqe " << response->byte_len << " bytes." << dendl;\r
+ ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses."<< dendl;\r
+ for (size_t i = 0; i < cqe.size(); ++i) {\r
+ ibv_wc* response = &cqe[i];\r
+ assert(response->status == IBV_WC_SUCCESS);\r
+ ldout(cct, 20) << __func__ << " cqe " << response->byte_len << " bytes." << dendl;\r
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);\r
chunk->prepare_read(response->byte_len);\r
- if (!response->byte_len) {\r
- wait_close = true;\r
- return 0;\r
- }\r
- if (response->status != IBV_WC_SUCCESS) {\r
- lderr(cct) << __func__ << " poll cqe failed! " << " number: " << n << ", status: "<< response->status << cpp_strerror(errno) << dendl;\r
- assert(0);\r
+ assert(!response->byte_len);\r
+ if (read == (ssize_t)len) {\r
+ buffers.push_back(chunk);\r
+ ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
+ } else if (read + response->byte_len > (ssize_t)len) {\r
+ read += chunk->read(buf+read, (ssize_t)len-read);\r
+ buffers.push_back(chunk);\r
+ ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
} else {\r
- if (read == (ssize_t)len) {\r
- buffers.push_back(chunk);\r
- ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
- } else if (read + response->byte_len > (ssize_t)len) {\r
- read += chunk->read(buf+read, (ssize_t)len-read);\r
- buffers.push_back(chunk);\r
- ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
- } else {\r
- read += chunk->read(buf+read, response->byte_len);\r
- assert(infiniband->post_chunk(chunk) == 0);\r
- }\r
+ read += chunk->read(buf+read, response->byte_len);\r
+ assert(infiniband->post_chunk(chunk) == 0);\r
}\r
}\r
\r
- if (n)\r
- goto again;\r
- if (!rearmed) {\r
- rx_cq->rearm_notify();\r
- rearmed = true;\r
- // Clean up cq events after rearm notify ensure no new incoming event\r
- // arrived between polling and rearm\r
- goto again;\r
- }\r
- if (read == 0)\r
- return -EAGAIN;\r
- return read;\r
+ return read == 0 ? -EAGAIN : read;\r
}\r
\r
ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)\r
\r
ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)\r
{\r
- ssize_t size = 0;\r
+ if (error)\r
+ return -error;\r
static const int MAX_COMPLETIONS = 16;\r
ibv_wc wc[MAX_COMPLETIONS];\r
-\r
- bool rearmed = false;\r
- int n;\r
- again:\r
- n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);\r
- ldout(cct, 20) << __func__ << " pool completion queue got " << n << " responses."<< dendl;\r
+ ssize_t size;\r
\r
ibv_wc* response;\r
Chunk* chunk;\r
bool loaded = false;\r
auto iter = buffers.begin();\r
- if(iter != buffers.end()) {\r
+ if (iter != buffers.end()) {\r
chunk = *iter;\r
- if (chunk->bound == 0) {\r
- wait_close = true;\r
- return 0;\r
- }\r
auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
buffers.erase(iter);\r
loaded = true;\r
size = chunk->bound;\r
}\r
\r
- for (int i = 0; i < n; ++i) {\r
+ std::vector<ibv_wc> cqe;\r
+ get_wc(cqe);\r
+ if (cqe.empty())\r
+ return size == 0 ? -EAGAIN : size;\r
+\r
+ ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;\r
+\r
+ for (size_t i = 0; i < cqe.size(); ++i) {\r
response = &wc[i];\r
chunk = reinterpret_cast<Chunk*>(response->wr_id);\r
chunk->prepare_read(response->byte_len);\r
if(!loaded && i == 0) {\r
- if (chunk->bound == 0) {\r
- wait_close = true;\r
- return 0;\r
- }\r
auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
size = chunk->bound;\r
continue;\r
}\r
buffers.push_back(chunk);\r
+ iter++;\r
}\r
\r
- if (n)\r
- goto again;\r
- if (!rearmed) {\r
- rx_cq->rearm_notify();\r
- rearmed = true;\r
- // Clean up cq events after rearm notify ensure no new incoming event\r
- // arrived between polling and rearm\r
- goto again;\r
- }\r
if (size == 0)\r
return -EAGAIN;\r
return size;\r
\r
ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)\r
{\r
+ if (error)\r
+ return -error;\r
size_t bytes = bl.length();\r
if (!bytes)\r
return 0;\r
- vector<Chunk*> tx_buffers;\r
- if (infiniband->get_tx_buffers(tx_buffers, bytes) < 0) {\r
+ pending_bl.claim_append(bl);\r
+ ssize_t r = submit(more);\r
+ if (r < 0 && r != -EAGAIN)\r
+ return r;\r
+ return bytes;\r
+}\r
+\r
+ssize_t RDMAConnectedSocketImpl::submit(bool more)\r
+{\r
+ if (error)\r
+ return -error;\r
+ std::vector<Chunk*> tx_buffers;\r
+ size_t bytes = pending_bl.length();\r
+ if (worker->reserve_message_buffer(this, tx_buffers, bytes) < 0) {\r
ldout(cct, 10) << __func__ << " no enough buffers" << dendl;\r
- return 0;\r
+ pending_bl.claim_append(pending_bl);\r
+ return -EAGAIN;\r
}\r
ldout(cct, 20) << __func__ << " prepare " << bytes << " bytes, tx buffer count: " << tx_buffers.size() << dendl;\r
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
- list<bufferptr>::const_iterator it = bl.buffers().begin();\r
- while (it != bl.buffers().end()) {\r
+ list<bufferptr>::const_iterator it = pending_bl.buffers().begin();\r
+ while (it != pending_bl.buffers().end()) {\r
const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());\r
uint32_t copied = 0;\r
// ldout(cct, 20) << __func__ << " app_buffer: " << addr << " length: " << it->length() << dendl;\r
return r;\r
\r
ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;\r
- bl.clear();\r
+ pending_bl.clear();\r
return bytes;\r
}\r
\r
}\r
return 0;\r
}\r
-\r
-void RDMAConnectedSocketImpl::fin() {\r
- //ibv_sge list;\r
- //memset(&list, 0, sizeof(list));\r
- ibv_send_wr wr;\r
- memset(&wr, 0, sizeof(wr));\r
- wr.wr_id = reinterpret_cast<uint64_t>(this);\r
- wr.num_sge = 0;\r
- //wr.sg_list = &list;\r
- wr.opcode = IBV_WR_SEND;\r
- wr.send_flags = IBV_SEND_SIGNALED;\r
- ibv_send_wr* bad_tx_work_request;\r
- if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {\r
- lderr(cct) << __func__ << " failed to send FIN"\r
- << "(most probably should be peer not ready): "\r
- << cpp_strerror(errno) << dendl;\r
- return ;\r
- }\r
-}\r
ldout(cct, 1) << __func__ << " recv msg not whole." << dendl;\r
continue;\r
} else {\r
- //RDMAWorker* w = static_cast<RDMAWorker*>(infiniband->stack->get_worker());\r
- server = new RDMAConnectedSocketImpl(cct, infiniband, NULL, msg);\r
+ server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, worker, msg);\r
msg = server->get_my_msg();\r
r = infiniband->send_udp_msg(server_setup_socket, msg, addr);\r
server->activate();\r
*
*/
-#include "RDMAStack.h"
#include "include/str_list.h"
+#include "RDMAStack.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
static Infiniband* global_infiniband;
+RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
+ : Worker(c, i), stack(nullptr), infiniband(NULL),
+ tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL)
+{}
+
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
- auto p = new RDMAServerSocketImpl(cct, infiniband, sa);
+ auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
delete p;
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
- RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, this);
+ RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this);
entity_addr_t sa;
memcpy(&sa, &addr, sizeof(addr));
ldout(cct, 20) << __func__ << " connecting to " << sa.get_sockaddr() << " : " << sa.get_port() << dendl;
ldout(cct, 20) << __func__ << " my syn msg : < " << msg.qpn << ", " << msg.psn << ", " << msg.lid << ">"<< dendl;
- client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0);
+ int client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0);
if (client_setup_socket == -1) {
lderr(cct) << __func__ << " failed to create client socket: " << strerror(errno) << dendl;
return -errno;
assert(!r);
std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
*socket = ConnectedSocket(std::move(csi));
+ ::close(client_setup_socket);
return 0;
}
{
if (!global_infiniband)
global_infiniband = new Infiniband(
- this, cct, cct->_conf->ms_async_rdma_device_name);
+ cct, cct->_conf->ms_async_rdma_device_name);
+ dispatcher = new RDMADispatcher(cct, global_infiniband);
}
void RDMAWorker::initialize()
memory_manager = infiniband->get_memory_manager();
}
+int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
+{
+ int r = infiniband->get_tx_buffers(c, bytes);
+ if (r == 0) {
+ stack->get_dispatcher()->inflight += c.size();
+ return r;
+ }
+
+ if (pending_sent_conns.back() != o)
+ pending_sent_conns.push_back(o);
+ return 0;
+}
+
+/**
+ * Add the given Chunks to the given free queue.
+ *
+ * \param[in] chunks
+ * The Chunks to enqueue.
+ * \return
+ * 0 if success or -1 for failure
+ */
+int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
+{
+ stack->get_dispatcher()->inflight -= chunks.size();
+ infiniband->get_memory_manager()->return_tx(chunks);
+ while (!pending_sent_conns.empty()) {
+ RDMAConnectedSocketImpl *o = pending_sent_conns.front();
+ ssize_t r = o->submit(false);
+ ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
+ if (r < 0) {
+ if (r == -EAGAIN)
+ break;
+ o->fault();
+ }
+ pending_sent_conns.pop_front();
+ }
+ return 0;
+}
+
void RDMAWorker::handle_tx_event()
{
ldout(cct, 20) << __func__ << dendl;
static const int MAX_COMPLETIONS = 16;
ibv_wc wc[MAX_COMPLETIONS];
+ std::vector<Chunk*> tx_chunks;
+ tx_chunks.reserve(MAX_COMPLETIONS);
bool rearmed = false;
int n;
<< response->wr_id << ") status(" << response->status << "): "
<< infiniband->wc_status_to_string(response->status) << dendl;
}
- assert(0);
+ RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
+ if (conn) {
+ conn->fault();
+ } else {
+ ldout(cct, 0) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+ }
}
- if (memory_manager->is_tx_chunk(chunk))
- infiniband->get_memory_manager()->return_tx(chunk);
- else
- ldout(cct, 20) << __func__ << " chunk belongs to none " << dendl;
+ assert(memory_manager->is_tx_chunk(chunk));
+ tx_chunks.push_back(chunk);
}
- if (n)
+ if (n) {
+ post_tx_buffer(tx_chunks);
+ tx_chunks.clear();
goto again;
+ }
if (!rearmed) {
tx_cq->rearm_notify();
// arrived between polling and rearm
goto again;
}
+
ldout(cct, 20) << __func__ << " leaving handle_tx_event. " << dendl;
}
+
+RDMADispatcher::~RDMADispatcher()
+{
+ done = true;
+ t.join();
+ assert(qp_conns.empty());
+ while (!dead_queue_pairs.empty()) {
+ delete dead_queue_pairs.back();
+ dead_queue_pairs.pop_back();
+ }
+
+ rx_cc->ack_events();
+ delete rx_cq;
+ delete rx_cc;
+ delete async_handler;
+}
+
+void RDMADispatcher::handle_async_event()
+{
+ ldout(cct, 20) << __func__ << dendl;
+ while (1) {
+ ibv_async_event async_event;
+ if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
+ if (errno != EAGAIN)
+ lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
+ << " " << cpp_strerror(errno) << ")" << dendl;
+ return;
+ }
+ // FIXME: Currently we must ensure no other factor make QP in ERROR state,
+ // otherwise this qp can't be deleted in current cleanup flow.
+ if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
+ uint64_t qpn = async_event.element.qp->qp_num;
+ ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
+ << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+ RDMAConnectedSocketImpl *conn = get_conn_by_qp(qpn);
+ if (!conn) {
+ ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
+ } else {
+ ldout(cct, 0) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
+ conn->fault();
+ erase_qpn(qpn);
+ }
+ } else {
+ ldout(cct, 0) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt
+ << " evt: " << ibv_event_type_str(async_event.event_type)
+ << dendl;
+ }
+ ibv_ack_async_event(&async_event);
+ }
+}
+
+
+void RDMADispatcher::polling()
+{
+ static int MAX_COMPLETIONS = 32;
+ ibv_wc wc[MAX_COMPLETIONS];
+
+ std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
+ int i, n;
+ while (!done) {
+ n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
+ if (!n) {
+ // NOTE: Has TX just transitioned to idle? We should do it when idle!
+ // It's now safe to delete queue pairs (see comment by declaration
+ // for dead_queue_pairs).
+ // Additionally, don't delete qp while outstanding_buffers isn't empty,
+ // because we need to check qp's state before sending
+ if (!inflight.load()) {
+ while (!dead_queue_pairs.empty()) {
+ ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl;
+ delete dead_queue_pairs.back();
+ dead_queue_pairs.pop_back();
+ }
+ }
+ handle_async_event();
+ continue;
+ }
+
+ ldout(cct, 20) << __func__ << " pool completion queue got " << n
+ << " responses."<< dendl;
+ for (i = 0; i < n; ++i) {
+ ibv_wc* response = &wc[i];
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ ldout(cct, 20) << __func__ << " got chunk=" << response->wr_id << " qp:" << wc[i].qp_num << dendl;
+
+ if (response->status != IBV_WC_SUCCESS) {
+ lderr(cct) << __func__ << " work request returned error for buffer(" << response->wr_id
+ << ") status(" << response->status << ":"
+ << ib->wc_status_to_string(response->status) << dendl;
+ ib->post_chunk(chunk);
+ continue;
+ }
+
+ RDMAConnectedSocketImpl *conn = get_conn_by_qp(response->qp_num);
+ if (!conn) {
+ // discard buffer
+ ldout(cct, 0) << __func__ << " missing qp_num " << response->qp_num << ", discard bd "
+ << chunk << dendl;
+ continue;
+ }
+ polled[conn].push_back(*response);
+ }
+ for (auto &&i : polled)
+ i.first->pass_wc(std::move(i.second));
+ polled.clear();
+ }
+}
#ifndef CEPH_MSG_RDMASTACK_H
#define CEPH_MSG_RDMASTACK_H
+#include <sys/eventfd.h>
+
+#include <list>
+#include <vector>
+#include <thread>
+
#include "common/ceph_context.h"
#include "common/debug.h"
#include "common/errno.h"
#include "msg/async/Stack.h"
-#include <thread>
#include "Infiniband.h"
class RDMAConnectedSocketImpl;
+class RDMAStack;
+
+class RDMADispatcher {
+ typedef Infiniband::MemoryManager::Chunk Chunk;
+ typedef Infiniband::QueuePair QueuePair;
+
+ std::thread t;
+ CephContext *cct;
+ Infiniband* ib;
+ Infiniband::CompletionQueue* rx_cq; // common completion queue for all transmits
+ Infiniband::CompletionChannel* rx_cc;
+ EventCallbackRef async_handler;
+ bool done = false;
+ Mutex lock; // protect `qp_conns
+ // qp_num -> InfRcConnection
+ // The main usage of `qp_conns` is looking up connection by qp_num,
+ // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
+ //// make qp queue into dead state
+ /**
+ * 1. Connection call mark_down
+ * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
+ * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event)
+ * 4. Wait for CQ to be empty(handle_tx_event)
+ * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event)
+ *
+ * @param qp The qp needed to dead
+ */
+ ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns;
+
+ /// if a queue pair is closed when transmit buffers are active
+ /// on it, the transmit buffers never get returned via tx_cq. To
+ /// work around this problem, don't delete queue pairs immediately. Instead,
+ /// save them in this vector and delete them at a safe time, when there are
+ /// no outstanding transmit buffers to be lost.
+ std::vector<QueuePair*> dead_queue_pairs;
+
+ class C_handle_cq_async : public EventCallback {
+ RDMADispatcher *dispatcher;
+ public:
+ C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
+ void do_request(int fd) {
+ // worker->handle_tx_event();
+ dispatcher->handle_async_event();
+ }
+ };
+
+ public:
+ std::atomic_ulong inflight;
+ explicit RDMADispatcher(CephContext* cct, Infiniband* i)
+ : t(&RDMADispatcher::polling, this), cct(cct), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock") {
+ rx_cc = ib->create_comp_channel();
+ assert(rx_cc);
+ rx_cq = ib->create_comp_queue(rx_cc);
+ assert(rx_cq);
+ }
+ virtual ~RDMADispatcher();
+
+ void handle_async_event();
+ void polling();
+ int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) {
+ int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+ assert(fd >= 0);
+ Mutex::Locker l(lock);
+ assert(!qp_conns.count(qp->get_local_qp_number()));
+ qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
+ return fd;
+ }
+ RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp) {
+ Mutex::Locker l(lock);
+ auto it = qp_conns.find(qp);
+ if (it == qp_conns.end())
+ return NULL;
+ if (it->second.first->is_dead())
+ return NULL;
+ return it->second.second;
+ }
+ void erase_qpn(uint32_t qpn) {
+ Mutex::Locker l(lock);
+ auto it = qp_conns.find(qpn);
+ if (it == qp_conns.end())
+ return ;
+ dead_queue_pairs.push_back(it->second.first);
+ qp_conns.erase(it);
+ }
+};
+
class RDMAWorker : public Worker {
typedef Infiniband::CompletionQueue CompletionQueue;
typedef Infiniband::CompletionChannel CompletionChannel;
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::MemoryManager MemoryManager;
- int client_setup_socket;
- Infiniband* infiniband;
- CompletionQueue* tx_cq; // common completion queue for all transmits
- CompletionChannel* tx_cc;
+ RDMAStack *stack;
+ Infiniband *infiniband;
+ CompletionQueue *tx_cq; // common completion queue for all transmits
+ CompletionChannel *tx_cc;
EventCallbackRef tx_handler;
- MemoryManager* memory_manager;
- vector<RDMAConnectedSocketImpl*> to_delete;
+ MemoryManager *memory_manager;
+ std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
+
class C_handle_cq_tx : public EventCallback {
RDMAWorker *worker;
public:
};
public:
- explicit RDMAWorker(CephContext *c, unsigned i)
- : Worker(c, i), infiniband(NULL), tx_handler(new C_handle_cq_tx(this)) {}
+ explicit RDMAWorker(CephContext *c, unsigned i);
virtual ~RDMAWorker() {
tx_cc->ack_events();
delete tx_cq;
virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
virtual void initialize() override;
- void handle_tx_event();
CompletionQueue* get_tx_cq() { return tx_cq; }
- void remove_to_delete(RDMAConnectedSocketImpl* csi) {
- if (to_delete.empty())
- return ;
- vector<RDMAConnectedSocketImpl*>::iterator iter = to_delete.begin();
- for (; iter != to_delete.end(); ++iter) {
- if(csi == *iter) {
- to_delete.erase(iter);
- }
- }
+ RDMAStack *get_stack() {
+ return stack;
}
- void add_to_delete(RDMAConnectedSocketImpl* csi) {
- to_delete.push_back(csi);
+ int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
+ int post_tx_buffer(std::vector<Chunk*> &chunks);
+ void remove_pending_conn(RDMAConnectedSocketImpl *o) {
+ pending_sent_conns.remove(o);
}
+ void handle_tx_event();
};
class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
IBSYNMsg peer_msg;
IBSYNMsg my_msg;
int connected;
+ int error;
Infiniband* infiniband;
+ RDMADispatcher* dispatcher;
RDMAWorker* worker;
std::vector<Chunk*> buffers;
- CompletionChannel* rx_cc;
- CompletionQueue* rx_cq;
- bool wait_close;
+ int notify_fd;
+ bufferlist pending_bl;
+
+ Mutex lock;
+ std::vector<ibv_wc> wc;
+
+ void notify() {
+ uint64_t i = 1;
+ assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
+ }
+ ssize_t read_buffers(char* buf, size_t len);
+ int post_work_request(std::vector<Chunk*>&);
public:
- RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMAWorker* w, IBSYNMsg im = IBSYNMsg()) : cct(cct), peer_msg(im), infiniband(ib), worker(w), wait_close(false) {
- qp = infiniband->create_queue_pair(IBV_QPT_RC);
- rx_cq = qp->get_rx_cq();
- rx_cc = rx_cq->get_cc();
+ RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+ RDMAWorker *w, IBSYNMsg im = IBSYNMsg())
+ : cct(cct), peer_msg(im), connected(0), error(0), infiniband(ib),
+ dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock") {
+ qp = infiniband->create_queue_pair(w->get_tx_cq(), IBV_QPT_RC);
my_msg.qpn = qp->get_local_qp_number();
my_msg.psn = qp->get_initial_psn();
my_msg.lid = infiniband->get_lid();
my_msg.gid = infiniband->get_gid();
+ notify_fd = dispatcher->register_qp(qp, this);
+ }
+ virtual ~RDMAConnectedSocketImpl() {
+ worker->remove_pending_conn(this);
}
+ void pass_wc(std::vector<ibv_wc> &&v) {
+ Mutex::Locker l(lock);
+ if (wc.empty())
+ wc = std::move(v);
+ else
+ wc.insert(wc.end(), v.begin(), v.end());
+ notify();
+ }
+ void get_wc(std::vector<ibv_wc> &w) {
+ Mutex::Locker l(lock);
+ if (wc.empty())
+ return ;
+ w.swap(wc);
+ }
virtual int is_connected() override {
return connected;
}
virtual ssize_t zero_copy_read(bufferptr &data) override;
virtual ssize_t send(bufferlist &bl, bool more) override;
virtual void shutdown() override {
+ if (qp) {
+ qp->to_dead();
+ qp = NULL;
+ }
}
virtual void close() override {
- if (!wait_close) {
- fin();
- worker->add_to_delete(this);
- } else {
- clear_all();
+ if (qp) {
+ qp->to_dead();
+ qp = NULL;
}
}
virtual int fd() const override {
- return rx_cc->get_fd();
+ return notify_fd;
}
- void clear_all() {
- delete qp;
- rx_cc->ack_events();
- delete rx_cq;
- rx_cq = NULL;
- if (!wait_close)
- worker->remove_to_delete(this);
+ void fault() {
+ if (qp) {
+ qp->to_dead();
+ qp = NULL;
+ }
+ error = ECONNRESET;
+ notify();
}
+ ssize_t submit(bool more);
int activate();
- ssize_t read_buffers(char* buf, size_t len);
- int poll_cq(int num_entries, ibv_wc *ret_wc_array);
IBSYNMsg get_my_msg() { return my_msg; }
- IBSYNMsg get_peer_msg() { return peer_msg; }
void set_peer_msg(IBSYNMsg m) { peer_msg = m ;}
- int post_work_request(std::vector<Chunk*>&);
- void fin();
};
NetHandler net;
int server_setup_socket;
Infiniband* infiniband;
+ RDMADispatcher *dispatcher;
+ RDMAWorker *worker;
entity_addr_t sa;
+
public:
- RDMAServerSocketImpl(CephContext *cct, Infiniband* i, entity_addr_t& a)
- : cct(cct), net(cct), infiniband(i), sa(a) {}
+ RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
+ : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) {}
int listen(entity_addr_t &sa, const SocketOptions &opt);
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out) override;
- virtual void abort_accept() override {}
+ virtual void abort_accept() override {
+ if (server_setup_socket >= 0)
+ ::close(server_setup_socket);
+ }
virtual int fd() const override {
return server_setup_socket;
}
class RDMAStack : public NetworkStack {
vector<std::thread> threads;
+ RDMADispatcher *dispatcher;
public:
explicit RDMAStack(CephContext *cct, const string &t);
+ virtual ~RDMAStack() { delete dispatcher; }
virtual bool support_zero_copy_read() const override { return true; }
//virtual bool support_local_listen_table() const { return true; }
assert(threads.size() > i && threads[i].joinable());
threads[i].join();
}
+ RDMADispatcher *get_dispatcher() { return dispatcher; }
};
#endif