msg/async/rdma/Infiniband.cc
msg/async/rdma/Device.cc
msg/async/rdma/RDMAConnectedSocketImpl.cc
+ msg/async/rdma/RDMAConnTCP.cc
msg/async/rdma/RDMAServerSocketImpl.cc
msg/async/rdma/RDMAStack.cc)
endif(HAVE_RDMA)
#define dout_prefix *_dout << "Infiniband "
static const uint32_t MAX_INLINE_DATA = 0;
-static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
Infiniband::QueuePair::QueuePair(
CephContext *c, Device &device, ibv_qp_type type,
return device_list->get_device(device_name);
}
-// 1 means no valid buffer read, 0 means got enough buffer
-// else return < 0 means error
-int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
-{
- char msg[TCP_MSG_LEN];
- char gid[33];
- ssize_t r = ::read(sd, &msg, sizeof(msg));
- // Drop incoming qpt
- if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
- if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
- ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
- return -EINVAL;
- }
- }
- if (r < 0) {
- r = -errno;
- lderr(cct) << __func__ << " got error " << r << ": "
- << cpp_strerror(r) << dendl;
- } else if (r == 0) { // valid disconnect message of length 0
- ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
- } else if ((size_t)r != sizeof(msg)) { // invalid message
- ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
- r = -EINVAL;
- } else { // valid message
- sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
- wire_gid_to_gid(gid, &(im.gid));
- ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
- }
- return r;
-}
-
-int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
-{
- int retry = 0;
- ssize_t r;
-
- char msg[TCP_MSG_LEN];
- char gid[33];
-retry:
- gid_to_wire_gid(&(im.gid), gid);
- sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
- ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
- << ", " << im.peer_qpn << ", " << gid << dendl;
- r = ::write(sd, msg, sizeof(msg));
- // Drop incoming qpt
- if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
- if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
- ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
- return -EINVAL;
- }
- }
-
- if ((size_t)r != sizeof(msg)) {
- // FIXME need to handle EAGAIN instead of retry
- if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
- retry++;
- goto retry;
- }
- if (r < 0)
- lderr(cct) << __func__ << " send returned error " << errno << ": "
- << cpp_strerror(errno) << dendl;
- else
- lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
- return -errno;
- }
- return 0;
-}
-
-void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
-{
- char tmp[9];
- uint32_t v32;
- int i;
-
- for (tmp[8] = 0, i = 0; i < 4; ++i) {
- memcpy(tmp, wgid + i * 8, 8);
- sscanf(tmp, "%x", &v32);
- *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
- }
-}
-
-void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
-{
- for (int i = 0; i < 4; ++i)
- sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
-}
-
Infiniband::QueuePair::~QueuePair()
{
if (qp) {
DeviceList *device_list;
RDMADispatcher *dispatcher = nullptr;
- void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
- void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
-
public:
explicit Infiniband(CephContext *c);
~Infiniband();
};
public:
- int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
- int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
RDMADispatcher *get_dispatcher() { return dispatcher; }
};
+inline ostream& operator<<(ostream& out, const Infiniband::QueuePair &qp)
+{
+ return out << qp.get_local_qp_number();
+}
+
#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "RDMAStack.h"
+#include "Device.h"
+#include "RDMAConnTCP.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAConnTCP "
+
+static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
+
+// 1 means no valid buffer read, 0 means got enough buffer
+// else return < 0 means error
+int RDMAConnTCP::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+ ssize_t r = ::read(sd, &msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+ if (r < 0) {
+ r = -errno;
+ lderr(cct) << __func__ << " got error " << r << ": "
+ << cpp_strerror(r) << dendl;
+ } else if (r == 0) { // valid disconnect message of length 0
+ ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
+ } else if ((size_t)r != sizeof(msg)) { // invalid message
+ ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
+ r = -EINVAL;
+ } else { // valid message
+ sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
+ wire_gid_to_gid(gid, &(im.gid));
+ ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
+ }
+ return r;
+}
+
+int RDMAConnTCP::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+ int retry = 0;
+ ssize_t r;
+
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+retry:
+ gid_to_wire_gid(&(im.gid), gid);
+ sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
+ ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
+ << ", " << im.peer_qpn << ", " << gid << dendl;
+ r = ::write(sd, msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+
+ if ((size_t)r != sizeof(msg)) {
+ // FIXME need to handle EAGAIN instead of retry
+ if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
+ retry++;
+ goto retry;
+ }
+ if (r < 0)
+ lderr(cct) << __func__ << " send returned error " << errno << ": "
+ << cpp_strerror(errno) << dendl;
+ else
+ lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ return 0;
+}
+
+void RDMAConnTCP::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
+{
+ char tmp[9];
+ uint32_t v32;
+ int i;
+
+ for (tmp[8] = 0, i = 0; i < 4; ++i) {
+ memcpy(tmp, wgid + i * 8, 8);
+ sscanf(tmp, "%x", &v32);
+ *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
+ }
+}
+
+void RDMAConnTCP::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
+{
+ for (int i = 0; i < 4; ++i)
+ sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
+}
+
+ostream &RDMAConnTCP::print(ostream &out) const
+{
+ return out << "TCP {tcp_fd: " << tcp_fd << "}";
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H
+#define CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "msg/async/Stack.h"
+#include "Infiniband.h"
+#include "RDMAConnectedSocketImpl.h"
+
+class RDMAWorker;
+class RDMADispatcher;
+
+class RDMAConnTCP : public RDMAConnMgr {
+ class C_handle_connection : public EventCallback {
+ RDMAConnTCP *cst;
+ bool active;
+ public:
+ C_handle_connection(RDMAConnTCP *w): cst(w), active(true) {};
+ void do_request(int fd) {
+ if (active)
+ cst->handle_connection();
+ };
+ void close() {
+ active = false;
+ };
+ };
+
+ IBSYNMsg peer_msg;
+ IBSYNMsg my_msg;
+ EventCallbackRef con_handler;
+ int tcp_fd = -1;
+
+ private:
+ void handle_connection();
+ int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+ int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+ int activate();
+ void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
+ void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
+
+ public:
+ RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
+ Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
+ virtual ~RDMAConnTCP();
+
+ virtual ostream &print(ostream &out) const override;
+
+ void set_accept_fd(int sd);
+
+ virtual void cleanup() override;
+ virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
+};
+
+class RDMAServerConnTCP : public RDMAServerSocketImpl {
+ NetHandler net;
+ int server_setup_socket;
+
+ public:
+ RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+
+ int listen(entity_addr_t &sa, const SocketOptions &opt);
+ virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+ virtual void abort_accept() override;
+ virtual int fd() const override { return server_setup_socket; }
+};
+
+#endif
#include "RDMAStack.h"
#include "Device.h"
+#include "RDMAConnectedSocketImpl.h"
+#include "RDMAConnTCP.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
+RDMAConnMgr::RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock,
+ Infiniband* ib, RDMADispatcher* s, RDMAWorker *w)
+ : cct(cct), socket(sock), infiniband(ib), dispatcher(s), worker(w),
+ is_server(false), active(false),
+ connected(0)
+{
+}
+
RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
RDMAWorker *w)
- : cct(cct), connected(0), error(0), infiniband(ib),
- dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
- is_server(false), con_handler(new C_handle_connection(this)),
- active(false)
+ : cct(cct), infiniband(ib), dispatcher(s), worker(w),
+ error(0), lock("RDMAConnectedSocketImpl::lock")
+{
+ cmgr = new RDMAConnTCP(cct, this, ib, s, w);
+}
+
+QueuePair *RDMAConnectedSocketImpl::create_queue_pair(Device *d, int p)
{
- ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
- ibport = cct->_conf->ms_async_rdma_port_num;
+ ibdev = d;
+ ibport = p;
+
+ qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
+
+ local_qpn = qp->get_local_qp_number();
+
+ return qp;
+}
+
+RDMAConnTCP::RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
+ Infiniband* ib, RDMADispatcher* s, RDMAWorker *w)
+ : RDMAConnMgr(cct, sock, ib, s, w), con_handler(new C_handle_connection(this))
+{
+ Device *ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
+ int ibport = cct->_conf->ms_async_rdma_port_num;
assert(ibdev);
assert(ibport > 0);
ibdev->init(ibport);
- qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
- my_msg.qpn = qp->get_local_qp_number();
+ QueuePair *qp = socket->create_queue_pair(ibdev, ibport);
+
+ my_msg.qpn = socket->local_qpn;
my_msg.psn = qp->get_initial_psn();
my_msg.lid = ibdev->get_lid();
my_msg.peer_qpn = 0;
my_msg.gid = ibdev->get_gid();
+ socket->register_qp(qp);
+}
+
+void RDMAConnectedSocketImpl::register_qp(QueuePair *qp)
+{
notify_fd = dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
}
+RDMAConnTCP::~RDMAConnTCP()
+{
+ cleanup();
+ if (tcp_fd >= 0)
+ ::close(tcp_fd);
+}
+
RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
{
ldout(cct, 20) << __func__ << " destruct." << dendl;
- cleanup();
worker->remove_pending_conn(this);
- dispatcher->erase_qpn(my_msg.qpn);
+ dispatcher->erase_qpn(local_qpn);
Mutex::Locker l(lock);
if (notify_fd >= 0)
::close(notify_fd);
- if (tcp_fd >= 0)
- ::close(tcp_fd);
error = ECONNRESET;
int ret = 0;
for (unsigned i=0; i < wc.size(); ++i) {
ret = ibdev->post_chunk(buffers[i]);
assert(ret == 0);
}
+
+ delete cmgr;
}
void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
w.swap(wc);
}
-int RDMAConnectedSocketImpl::activate()
+int RDMAConnTCP::activate()
{
ibv_qp_attr qpa;
int r;
+ socket->remote_qpn = peer_msg.qpn;
+
// now connect up the qps and switch to RTR
memset(&qpa, 0, sizeof(qpa));
qpa.qp_state = IBV_QPS_RTR;
qpa.ah_attr.grh.hop_limit = 6;
qpa.ah_attr.grh.dgid = peer_msg.gid;
- qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx();
+ qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx();
qpa.ah_attr.dlid = peer_msg.lid;
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
qpa.ah_attr.src_path_bits = 0;
- qpa.ah_attr.port_num = (uint8_t)ibport;
+ qpa.ah_attr.port_num = (uint8_t)socket->get_ibport();
ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
+ QueuePair *qp = socket->get_qp();
+
r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
IBV_QP_AV |
IBV_QP_PATH_MTU |
if (!is_server) {
connected = 1; //indicate successfully
- ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
- submit(false);
+ ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << *qp << dendl;
+ socket->submit(false);
}
active = true;
return 0;
}
-int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
+int RDMAConnTCP::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
<< opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
NetHandler net(cct);
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
my_msg.peer_qpn = 0;
- r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ r = send_msg(cct, tcp_fd, my_msg);
if (r < 0)
return r;
return 0;
}
-void RDMAConnectedSocketImpl::handle_connection() {
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;
- int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
+void RDMAConnTCP::handle_connection() {
+ ldout(cct, 20) << __func__ << " " << *socket << dendl;
+ int r = recv_msg(cct, tcp_fd, peer_msg);
if (r < 0) {
if (r != -EAGAIN) {
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
- fault();
+ socket->fault();
}
return;
}
r = activate();
assert(!r);
}
- notify();
- r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ socket->notify();
+ r = send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
- fault();
+ socket->fault();
}
} else {
if (peer_msg.peer_qpn == 0) {// syn from client
ldout(cct, 10) << __func__ << " server is already active." << dendl;
return ;
}
- r = infiniband->send_msg(cct, tcp_fd, my_msg);
+ r = send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
- fault();
+ socket->fault();
return ;
}
r = activate();
} else { // ack from client
connected = 1;
cleanup();
- submit(false);
- notify();
+ socket->submit(false);
+ socket->notify();
}
}
}
+void RDMAConnMgr::post_read()
+{
+ if (!is_server || connected)
+ return;
+
+ ldout(cct, 20) << __func__ << " we do not need last handshake, " << *socket << dendl;
+ connected = 1; //if so, we don't need the last handshake
+ cleanup();
+ socket->submit(false);
+}
+
ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
{
uint64_t i = 0;
int r = ::read(notify_fd, &i, sizeof(i));
- ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
+ ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << *qp << " r = " << r << dendl;
if (error)
return -error;
ssize_t read = 0;
if (cqe.empty())
return read == 0 ? -EAGAIN : read;
- ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
+ ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << *qp << dendl;
for (size_t i = 0; i < cqe.size(); ++i) {
ibv_wc* response = &cqe[i];
assert(response->status == IBV_WC_SUCCESS);
worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
if (response->byte_len == 0) {
dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
- if (connected) {
+ if (cmgr->connected) {
error = ECONNRESET;
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
}
worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
- if (is_server && connected == 0) {
- ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
- connected = 1; //if so, we don't need the last handshake
- cleanup();
- submit(false);
- }
+ cmgr->post_read();
if (read == 0 && error)
return -error;
ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
{
if (error) {
- if (!active)
+ if (!cmgr->active)
return -EPIPE;
return -error;
}
{
Mutex::Locker l(lock);
pending_bl.claim_append(bl);
- if (!connected) {
- ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
+ if (!cmgr->connected) {
+ ldout(cct, 20) << __func__ << " fake send to upper, QP: " << *qp << dendl;
return bytes;
}
}
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
+ ldout(cct, 20) << __func__ << " QP: " << *qp << dendl;
ssize_t r = submit(more);
if (r < 0 && r != -EAGAIN)
return r;
int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
{
- ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
+ ldout(cct, 20) << __func__ << " QP: " << *qp << " " << tx_buffers[0] << dendl;
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
ibv_sge isge[tx_buffers.size()];
uint32_t current_sge = 0;
}
}
-void RDMAConnectedSocketImpl::cleanup() {
+void RDMAConnTCP::cleanup() {
if (con_handler && tcp_fd >= 0) {
(static_cast<C_handle_connection*>(con_handler))->close();
worker->center.submit_to(worker->center.get_id(), [this]() {
assert(ret = sizeof(i));
}
-void RDMAConnectedSocketImpl::shutdown()
+void RDMAConnMgr::shutdown()
{
- if (!error)
- fin();
- error = ECONNRESET;
+ if (!socket->error)
+ socket->fin();
+ socket->error = ECONNRESET;
active = false;
}
-void RDMAConnectedSocketImpl::close()
+void RDMAConnMgr::close()
{
- if (!error)
- fin();
- error = ECONNRESET;
- active = false;
+ shutdown();
}
void RDMAConnectedSocketImpl::fault()
{
- ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
+ ldout(cct, 1) << __func__ << dendl;
/*if (qp) {
qp->to_dead();
qp = NULL;
}*/
error = ECONNRESET;
- connected = 1;
+ cmgr->connected = 1;
notify();
}
-void RDMAConnectedSocketImpl::set_accept_fd(int sd)
+void RDMAConnTCP::set_accept_fd(int sd)
{
tcp_fd = sd;
is_server = true;
class RDMAWorker;
class RDMADispatcher;
+class RDMAConnectedSocketImpl;
+
+typedef Infiniband::QueuePair QueuePair;
+
+class RDMAConnMgr {
+ friend class RDMAConnectedSocketImpl;
+
+ protected:
+ CephContext *cct;
+ RDMAConnectedSocketImpl *socket;
+ Infiniband* infiniband;
+ RDMADispatcher* dispatcher;
+ RDMAWorker* worker;
+
+ bool is_server;
+ bool active;// qp is active ?
+ int connected;
+
+ public:
+ RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock,
+ Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
+ virtual ~RDMAConnMgr() { };
+
+ virtual ostream &print(ostream &out) const = 0;
+
+ virtual void cleanup() = 0;
+ virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) = 0;
+ virtual void set_accept_fd(int sd) = 0;
+
+ void post_read();
+
+ void shutdown();
+ void close();
+};
+inline ostream& operator<<(ostream& out, const RDMAConnMgr &m)
+{
+ return m.print(out);
+}
class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
+ friend class RDMAConnMgr;
+
+ protected:
+ CephContext *cct;
+ Infiniband* infiniband;
+ RDMADispatcher* dispatcher;
+ RDMAWorker* worker;
+ Device *ibdev = nullptr;
+ int ibport = -1;
+ QueuePair *qp = nullptr;
+
public:
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::CompletionChannel CompletionChannel;
typedef Infiniband::CompletionQueue CompletionQueue;
private:
- CephContext *cct;
- Infiniband::QueuePair *qp;
- Device *ibdev;
- int ibport;
- IBSYNMsg peer_msg;
- IBSYNMsg my_msg;
- int connected;
+ RDMAConnMgr *cmgr;
int error;
- Infiniband* infiniband;
- RDMADispatcher* dispatcher;
- RDMAWorker* worker;
std::vector<Chunk*> buffers;
int notify_fd = -1;
bufferlist pending_bl;
Mutex lock;
std::vector<ibv_wc> wc;
- bool is_server;
- EventCallbackRef con_handler;
- int tcp_fd = -1;
- bool active;// qp is active ?
- void notify();
ssize_t read_buffers(char* buf, size_t len);
int post_work_request(std::vector<Chunk*>&);
public:
+ uint32_t local_qpn = 0;
+ uint32_t remote_qpn = 0;
+
RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
RDMAWorker *w);
virtual ~RDMAConnectedSocketImpl();
- Device *get_device() { return ibdev; }
+ ostream &print(ostream &out) const {
+ return out << "socket {lqpn: " << local_qpn << " rqpn: " << remote_qpn << " " << *cmgr << "}";
+ };
+
+ Device *get_device() { return ibdev; };
+ int get_ibport() { return ibport; };
void pass_wc(std::vector<ibv_wc> &&v);
void get_wc(std::vector<ibv_wc> &w);
- virtual int is_connected() override { return connected; }
+ virtual int is_connected() override { return cmgr->connected; }
virtual ssize_t read(char* buf, size_t len) override;
virtual ssize_t zero_copy_read(bufferptr &data) override;
virtual ssize_t send(bufferlist &bl, bool more) override;
- virtual void shutdown() override;
- virtual void close() override;
+ virtual void shutdown() override { cmgr->shutdown(); };
+ virtual void close() override { cmgr->close(); };
virtual int fd() const override { return notify_fd; }
void fault();
const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
+ QueuePair *get_qp() { return qp; };
ssize_t submit(bool more);
- int activate();
void fin();
- void handle_connection();
- void cleanup();
- void set_accept_fd(int sd);
- int try_connect(const entity_addr_t&, const SocketOptions &opt);
-
- class C_handle_connection : public EventCallback {
- RDMAConnectedSocketImpl *csi;
- bool active;
- public:
- C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
- void do_request(int fd) {
- if (active)
- csi->handle_connection();
- }
- void close() {
- active = false;
- }
- };
+ void register_qp(QueuePair *qp);
+ void notify();
+
+ QueuePair *create_queue_pair(Device *d, int p);
+ void set_accept_fd(int sd) {cmgr->set_accept_fd(sd); };
+ int try_connect(const entity_addr_t &sa, const SocketOptions &opt) { return cmgr->try_connect(sa, opt); };
};
+inline ostream& operator<<(ostream& out, const RDMAConnectedSocketImpl &s)
+{
+ return s.print(out);
+}
+
class RDMAServerSocketImpl : public ServerSocketImpl {
+ protected:
CephContext *cct;
Device *ibdev;
int ibport;
- NetHandler net;
- int server_setup_socket;
Infiniband* infiniband;
RDMADispatcher *dispatcher;
RDMAWorker *worker;
public:
RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
- int listen(entity_addr_t &sa, const SocketOptions &opt);
- virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
- virtual void abort_accept() override;
- virtual int fd() const override { return server_setup_socket; }
- int get_fd() { return server_setup_socket; }
+ virtual int listen(entity_addr_t &sa, const SocketOptions &opt) = 0;
+ virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) = 0;
+ virtual void abort_accept() = 0;
+ virtual int fd() const = 0;
};
#endif
#include "msg/async/net_handler.h"
#include "RDMAStack.h"
#include "Device.h"
+#include "RDMAConnTCP.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << " RDMAServerSocketImpl "
RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
- : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a)
+ : cct(cct), infiniband(i), dispatcher(s), worker(w), sa(a)
+{
+}
+
+RDMAServerConnTCP::RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
+ : RDMAServerSocketImpl(cct, i, s, w, a), net(cct), server_setup_socket(-1)
{
ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
ibport = cct->_conf->ms_async_rdma_port_num;
ibdev->init(ibport);
}
-int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
+int RDMAServerConnTCP::listen(entity_addr_t &sa, const SocketOptions &opt)
{
int rc = 0;
server_setup_socket = net.create_socket(sa.get_family(), true);
return -errno;
}
-int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
+int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
{
ldout(cct, 15) << __func__ << dendl;
}
net.set_priority(sd, opt.priority, out->get_family());
- RDMAConnectedSocketImpl* server;
+ RDMAConnectedSocketImpl *server;
//Worker* w = dispatcher->get_stack()->get_worker();
server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));
server->set_accept_fd(sd);
return 0;
}
-void RDMAServerSocketImpl::abort_accept()
+void RDMAServerConnTCP::abort_accept()
{
if (server_setup_socket >= 0)
::close(server_setup_socket);
#include "common/deleter.h"
#include "common/Tub.h"
#include "RDMAStack.h"
+#include "RDMAConnTCP.h"
#include "Device.h"
#define dout_subsys ceph_subsys_ms
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
- auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
+ auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
delete p;