msg/async/rdma/Infiniband.cc
msg/async/rdma/Device.cc
msg/async/rdma/RDMAConnectedSocketImpl.cc
- msg/async/rdma/RDMAConnTCP.cc
msg/async/rdma/RDMAServerSocketImpl.cc
msg/async/rdma/RDMAStack.cc)
endif(HAVE_RDMA)
#define dout_prefix *_dout << "Infiniband "
static const uint32_t MAX_INLINE_DATA = 0;
+static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
Infiniband::QueuePair::QueuePair(
CephContext *c, Device &device, ibv_qp_type type,
return device_list->get_device(device_name);
}
+// 1 means no valid buffer read, 0 means got enough buffer
+// else return < 0 means error
+int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+ ssize_t r = ::read(sd, &msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+ if (r < 0) {
+ r = -errno;
+ lderr(cct) << __func__ << " got error " << r << ": "
+ << cpp_strerror(r) << dendl;
+ } else if (r == 0) { // valid disconnect message of length 0
+ ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
+ } else if ((size_t)r != sizeof(msg)) { // invalid message
+ ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
+ r = -EINVAL;
+ } else { // valid message
+ sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
+ wire_gid_to_gid(gid, &(im.gid));
+ ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
+ }
+ return r;
+}
+
+int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
+{
+ int retry = 0;
+ ssize_t r;
+
+ char msg[TCP_MSG_LEN];
+ char gid[33];
+retry:
+ gid_to_wire_gid(&(im.gid), gid);
+ sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
+ ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
+ << ", " << im.peer_qpn << ", " << gid << dendl;
+ r = ::write(sd, msg, sizeof(msg));
+ // Drop incoming qpt
+ if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
+ return -EINVAL;
+ }
+ }
+
+ if ((size_t)r != sizeof(msg)) {
+ // FIXME need to handle EAGAIN instead of retry
+ if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
+ retry++;
+ goto retry;
+ }
+ if (r < 0)
+ lderr(cct) << __func__ << " send returned error " << errno << ": "
+ << cpp_strerror(errno) << dendl;
+ else
+ lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
+ return -errno;
+ }
+ return 0;
+}
+
+void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
+{
+ char tmp[9];
+ uint32_t v32;
+ int i;
+
+ for (tmp[8] = 0, i = 0; i < 4; ++i) {
+ memcpy(tmp, wgid + i * 8, 8);
+ sscanf(tmp, "%x", &v32);
+ *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
+ }
+}
+
+void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
+{
+ for (int i = 0; i < 4; ++i)
+ sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
+}
+
Infiniband::QueuePair::~QueuePair()
{
if (qp) {
DeviceList *device_list;
RDMADispatcher *dispatcher = nullptr;
+ void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
+ void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
+
public:
explicit Infiniband(CephContext *c);
~Infiniband();
};
public:
+ int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
+ int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
RDMADispatcher *get_dispatcher() { return dispatcher; }
};
-inline ostream& operator<<(ostream& out, const Infiniband::QueuePair &qp)
-{
- return out << qp.get_local_qp_number();
-}
-
#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 XSKY <haomai@xsky.com>
- *
- * Author: Haomai Wang <haomaiwang@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "RDMAStack.h"
-#include "Device.h"
-#include "RDMAConnTCP.h"
-
-#define dout_subsys ceph_subsys_ms
-#undef dout_prefix
-#define dout_prefix *_dout << " RDMAConnTCP "
-
-static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
-
-// 1 means no valid buffer read, 0 means got enough buffer
-// else return < 0 means error
-int RDMAConnTCP::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
-{
- char msg[TCP_MSG_LEN];
- char gid[33];
- ssize_t r = ::read(sd, &msg, sizeof(msg));
- // Drop incoming qpt
- if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
- if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
- ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
- return -EINVAL;
- }
- }
- if (r < 0) {
- r = -errno;
- lderr(cct) << __func__ << " got error " << r << ": "
- << cpp_strerror(r) << dendl;
- } else if (r == 0) { // valid disconnect message of length 0
- ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
- } else if ((size_t)r != sizeof(msg)) { // invalid message
- ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
- r = -EINVAL;
- } else { // valid message
- sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
- wire_gid_to_gid(gid, &(im.gid));
- ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
- }
- return r;
-}
-
-int RDMAConnTCP::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
-{
- int retry = 0;
- ssize_t r;
-
- char msg[TCP_MSG_LEN];
- char gid[33];
-retry:
- gid_to_wire_gid(&(im.gid), gid);
- sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
- ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
- << ", " << im.peer_qpn << ", " << gid << dendl;
- r = ::write(sd, msg, sizeof(msg));
- // Drop incoming qpt
- if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
- if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
- ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
- return -EINVAL;
- }
- }
-
- if ((size_t)r != sizeof(msg)) {
- // FIXME need to handle EAGAIN instead of retry
- if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
- retry++;
- goto retry;
- }
- if (r < 0)
- lderr(cct) << __func__ << " send returned error " << errno << ": "
- << cpp_strerror(errno) << dendl;
- else
- lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
- return -errno;
- }
- return 0;
-}
-
-void RDMAConnTCP::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
-{
- char tmp[9];
- uint32_t v32;
- int i;
-
- for (tmp[8] = 0, i = 0; i < 4; ++i) {
- memcpy(tmp, wgid + i * 8, 8);
- sscanf(tmp, "%x", &v32);
- *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
- }
-}
-
-void RDMAConnTCP::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
-{
- for (int i = 0; i < 4; ++i)
- sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
-}
-
-ostream &RDMAConnTCP::print(ostream &out) const
-{
- return out << "TCP {tcp_fd: " << tcp_fd << "}";
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 XSKY <haomai@xsky.com>
- *
- * Author: Haomai Wang <haomaiwang@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H
-#define CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H
-
-#include "common/ceph_context.h"
-#include "common/debug.h"
-#include "common/errno.h"
-#include "msg/async/Stack.h"
-#include "Infiniband.h"
-#include "RDMAConnectedSocketImpl.h"
-
-class RDMAWorker;
-class RDMADispatcher;
-
-class RDMAConnTCP : public RDMAConnMgr {
- class C_handle_connection : public EventCallback {
- RDMAConnTCP *cst;
- bool active;
- public:
- C_handle_connection(RDMAConnTCP *w): cst(w), active(true) {};
- void do_request(int fd) {
- if (active)
- cst->handle_connection();
- };
- void close() {
- active = false;
- };
- };
-
- IBSYNMsg peer_msg;
- IBSYNMsg my_msg;
- EventCallbackRef con_handler;
- int tcp_fd = -1;
-
- private:
- void handle_connection();
- int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
- int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
- int activate();
- void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
- void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
-
- public:
- RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
- Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
- virtual ~RDMAConnTCP();
-
- virtual ostream &print(ostream &out) const override;
-
- void set_accept_fd(int sd);
-
- virtual void cleanup() override;
- virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
-};
-
-class RDMAServerConnTCP : public RDMAServerSocketImpl {
- NetHandler net;
- int server_setup_socket;
-
- public:
- RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
-
- int listen(entity_addr_t &sa, const SocketOptions &opt);
- virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
- virtual void abort_accept() override;
- virtual int fd() const override { return server_setup_socket; }
-};
-
-#endif
#include "RDMAStack.h"
#include "Device.h"
-#include "RDMAConnectedSocketImpl.h"
-#include "RDMAConnTCP.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
-RDMAConnMgr::RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock,
- Infiniband* ib, RDMADispatcher* s, RDMAWorker *w)
- : cct(cct), socket(sock), infiniband(ib), dispatcher(s), worker(w),
- is_server(false), active(false),
- connected(0)
-{
-}
-
RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
RDMAWorker *w)
- : cct(cct), infiniband(ib), dispatcher(s), worker(w),
- error(0), lock("RDMAConnectedSocketImpl::lock")
-{
- cmgr = new RDMAConnTCP(cct, this, ib, s, w);
-}
-
-QueuePair *RDMAConnectedSocketImpl::create_queue_pair(Device *d, int p)
+ : cct(cct), connected(0), error(0), infiniband(ib),
+ dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
+ is_server(false), con_handler(new C_handle_connection(this)),
+ active(false)
{
- ibdev = d;
- ibport = p;
-
- qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
-
- local_qpn = qp->get_local_qp_number();
-
- return qp;
-}
-
-RDMAConnTCP::RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
- Infiniband* ib, RDMADispatcher* s, RDMAWorker *w)
- : RDMAConnMgr(cct, sock, ib, s, w), con_handler(new C_handle_connection(this))
-{
- Device *ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
- int ibport = cct->_conf->ms_async_rdma_port_num;
+ ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
+ ibport = cct->_conf->ms_async_rdma_port_num;
assert(ibdev);
assert(ibport > 0);
ibdev->init(ibport);
- QueuePair *qp = socket->create_queue_pair(ibdev, ibport);
-
- my_msg.qpn = socket->local_qpn;
+ qp = ibdev->create_queue_pair(cct, IBV_QPT_RC);
+ my_msg.qpn = qp->get_local_qp_number();
my_msg.psn = qp->get_initial_psn();
my_msg.lid = ibdev->get_lid();
my_msg.peer_qpn = 0;
my_msg.gid = ibdev->get_gid();
- socket->register_qp(qp);
-}
-
-void RDMAConnectedSocketImpl::register_qp(QueuePair *qp)
-{
notify_fd = dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
}
-RDMAConnTCP::~RDMAConnTCP()
-{
- cleanup();
- if (tcp_fd >= 0)
- ::close(tcp_fd);
-}
-
RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
{
ldout(cct, 20) << __func__ << " destruct." << dendl;
+ cleanup();
worker->remove_pending_conn(this);
- dispatcher->erase_qpn(local_qpn);
+ dispatcher->erase_qpn(my_msg.qpn);
Mutex::Locker l(lock);
if (notify_fd >= 0)
::close(notify_fd);
+ if (tcp_fd >= 0)
+ ::close(tcp_fd);
error = ECONNRESET;
int ret = 0;
for (unsigned i=0; i < wc.size(); ++i) {
assert(ret == 0);
dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
}
-
- delete cmgr;
}
void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
w.swap(wc);
}
-int RDMAConnTCP::activate()
+int RDMAConnectedSocketImpl::activate()
{
ibv_qp_attr qpa;
int r;
- socket->remote_qpn = peer_msg.qpn;
-
// now connect up the qps and switch to RTR
memset(&qpa, 0, sizeof(qpa));
qpa.qp_state = IBV_QPS_RTR;
qpa.ah_attr.grh.hop_limit = 6;
qpa.ah_attr.grh.dgid = peer_msg.gid;
- qpa.ah_attr.grh.sgid_index = socket->get_device()->get_gid_idx();
+ qpa.ah_attr.grh.sgid_index = ibdev->get_gid_idx();
qpa.ah_attr.dlid = peer_msg.lid;
qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
qpa.ah_attr.src_path_bits = 0;
- qpa.ah_attr.port_num = (uint8_t)socket->get_ibport();
+ qpa.ah_attr.port_num = (uint8_t)ibport;
ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
- QueuePair *qp = socket->get_qp();
-
r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
IBV_QP_AV |
IBV_QP_PATH_MTU |
if (!is_server) {
connected = 1; //indicate successfully
- ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << *qp << dendl;
- socket->submit(false);
+ ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
+ submit(false);
}
active = true;
return 0;
}
-int RDMAConnTCP::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
+int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
<< opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
NetHandler net(cct);
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
my_msg.peer_qpn = 0;
- r = send_msg(cct, tcp_fd, my_msg);
+ r = infiniband->send_msg(cct, tcp_fd, my_msg);
if (r < 0)
return r;
return 0;
}
-void RDMAConnTCP::handle_connection() {
- ldout(cct, 20) << __func__ << " " << *socket << dendl;
- int r = recv_msg(cct, tcp_fd, peer_msg);
+void RDMAConnectedSocketImpl::handle_connection() {
+ ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;
+ int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
if (r < 0) {
if (r != -EAGAIN) {
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
- socket->fault();
+ fault();
}
return;
}
r = activate();
assert(!r);
}
- socket->notify();
- r = send_msg(cct, tcp_fd, my_msg);
+ notify();
+ r = infiniband->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
- socket->fault();
+ fault();
}
} else {
if (peer_msg.peer_qpn == 0) {// syn from client
ldout(cct, 10) << __func__ << " server is already active." << dendl;
return ;
}
- r = send_msg(cct, tcp_fd, my_msg);
+ r = infiniband->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
- socket->fault();
+ fault();
return ;
}
r = activate();
} else { // ack from client
connected = 1;
cleanup();
- socket->submit(false);
- socket->notify();
+ submit(false);
+ notify();
}
}
}
-void RDMAConnMgr::post_read()
-{
- if (!is_server || connected)
- return;
-
- ldout(cct, 20) << __func__ << " we do not need last handshake, " << *socket << dendl;
- connected = 1; //if so, we don't need the last handshake
- cleanup();
- socket->submit(false);
-}
-
ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
{
uint64_t i = 0;
int r = ::read(notify_fd, &i, sizeof(i));
- ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << *qp << " r = " << r << dendl;
+ ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
if (error)
return -error;
ssize_t read = 0;
if (cqe.empty())
return read == 0 ? -EAGAIN : read;
- ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << *qp << dendl;
+ ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
for (size_t i = 0; i < cqe.size(); ++i) {
ibv_wc* response = &cqe[i];
assert(response->status == IBV_WC_SUCCESS);
worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
if (response->byte_len == 0) {
dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
- if (cmgr->connected) {
+ if (connected) {
error = ECONNRESET;
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
}
worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
- cmgr->post_read();
+ if (is_server && connected == 0) {
+ ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
+ connected = 1; //if so, we don't need the last handshake
+ cleanup();
+ submit(false);
+ }
if (read == 0 && error)
return -error;
ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
{
if (error) {
- if (!cmgr->active)
+ if (!active)
return -EPIPE;
return -error;
}
{
Mutex::Locker l(lock);
pending_bl.claim_append(bl);
- if (!cmgr->connected) {
- ldout(cct, 20) << __func__ << " fake send to upper, QP: " << *qp << dendl;
+ if (!connected) {
+ ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
return bytes;
}
}
- ldout(cct, 20) << __func__ << " QP: " << *qp << dendl;
+ ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
ssize_t r = submit(more);
if (r < 0 && r != -EAGAIN)
return r;
int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
{
- ldout(cct, 20) << __func__ << " QP: " << *qp << " " << tx_buffers[0] << dendl;
+ ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
ibv_sge isge[tx_buffers.size()];
uint32_t current_sge = 0;
}
}
-void RDMAConnTCP::cleanup() {
+void RDMAConnectedSocketImpl::cleanup() {
if (con_handler && tcp_fd >= 0) {
(static_cast<C_handle_connection*>(con_handler))->close();
worker->center.submit_to(worker->center.get_id(), [this]() {
assert(ret = sizeof(i));
}
-void RDMAConnMgr::shutdown()
+void RDMAConnectedSocketImpl::shutdown()
{
- if (!socket->error)
- socket->fin();
- socket->error = ECONNRESET;
+ if (!error)
+ fin();
+ error = ECONNRESET;
active = false;
}
-void RDMAConnMgr::close()
+void RDMAConnectedSocketImpl::close()
{
- shutdown();
+ if (!error)
+ fin();
+ error = ECONNRESET;
+ active = false;
}
void RDMAConnectedSocketImpl::fault()
{
- ldout(cct, 1) << __func__ << dendl;
+ ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
/*if (qp) {
qp->to_dead();
qp = NULL;
}*/
error = ECONNRESET;
- cmgr->connected = 1;
+ connected = 1;
notify();
}
-void RDMAConnTCP::set_accept_fd(int sd)
+void RDMAConnectedSocketImpl::set_accept_fd(int sd)
{
tcp_fd = sd;
is_server = true;
class RDMAWorker;
class RDMADispatcher;
-class RDMAConnectedSocketImpl;
-
-typedef Infiniband::QueuePair QueuePair;
-
-class RDMAConnMgr {
- friend class RDMAConnectedSocketImpl;
-
- protected:
- CephContext *cct;
- RDMAConnectedSocketImpl *socket;
- Infiniband* infiniband;
- RDMADispatcher* dispatcher;
- RDMAWorker* worker;
-
- bool is_server;
- bool active;// qp is active ?
- int connected;
-
- public:
- RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock,
- Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
- virtual ~RDMAConnMgr() { };
-
- virtual ostream &print(ostream &out) const = 0;
-
- virtual void cleanup() = 0;
- virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) = 0;
- virtual void set_accept_fd(int sd) = 0;
-
- void post_read();
-
- void shutdown();
- void close();
-};
-inline ostream& operator<<(ostream& out, const RDMAConnMgr &m)
-{
- return m.print(out);
-}
class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
- friend class RDMAConnMgr;
-
- protected:
- CephContext *cct;
- Infiniband* infiniband;
- RDMADispatcher* dispatcher;
- RDMAWorker* worker;
- Device *ibdev = nullptr;
- int ibport = -1;
- QueuePair *qp = nullptr;
-
public:
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::CompletionChannel CompletionChannel;
typedef Infiniband::CompletionQueue CompletionQueue;
private:
- RDMAConnMgr *cmgr;
+ CephContext *cct;
+ Infiniband::QueuePair *qp;
+ Device *ibdev;
+ int ibport;
+ IBSYNMsg peer_msg;
+ IBSYNMsg my_msg;
+ int connected;
int error;
+ Infiniband* infiniband;
+ RDMADispatcher* dispatcher;
+ RDMAWorker* worker;
std::vector<Chunk*> buffers;
int notify_fd = -1;
bufferlist pending_bl;
Mutex lock;
std::vector<ibv_wc> wc;
+ bool is_server;
+ EventCallbackRef con_handler;
+ int tcp_fd = -1;
+ bool active;// qp is active ?
+ void notify();
ssize_t read_buffers(char* buf, size_t len);
int post_work_request(std::vector<Chunk*>&);
public:
- uint32_t local_qpn = 0;
- uint32_t remote_qpn = 0;
-
RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
RDMAWorker *w);
virtual ~RDMAConnectedSocketImpl();
- ostream &print(ostream &out) const {
- return out << "socket {lqpn: " << local_qpn << " rqpn: " << remote_qpn << " " << *cmgr << "}";
- };
-
- Device *get_device() { return ibdev; };
- int get_ibport() { return ibport; };
+ Device *get_device() { return ibdev; }
void pass_wc(std::vector<ibv_wc> &&v);
void get_wc(std::vector<ibv_wc> &w);
- virtual int is_connected() override { return cmgr->connected; }
+ virtual int is_connected() override { return connected; }
virtual ssize_t read(char* buf, size_t len) override;
virtual ssize_t zero_copy_read(bufferptr &data) override;
virtual ssize_t send(bufferlist &bl, bool more) override;
- virtual void shutdown() override { cmgr->shutdown(); };
- virtual void close() override { cmgr->close(); };
+ virtual void shutdown() override;
+ virtual void close() override;
virtual int fd() const override { return notify_fd; }
void fault();
const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
- QueuePair *get_qp() { return qp; };
ssize_t submit(bool more);
+ int activate();
void fin();
- void register_qp(QueuePair *qp);
- void notify();
-
- QueuePair *create_queue_pair(Device *d, int p);
- void set_accept_fd(int sd) {cmgr->set_accept_fd(sd); };
- int try_connect(const entity_addr_t &sa, const SocketOptions &opt) { return cmgr->try_connect(sa, opt); };
+ void handle_connection();
+ void cleanup();
+ void set_accept_fd(int sd);
+ int try_connect(const entity_addr_t&, const SocketOptions &opt);
+
+ class C_handle_connection : public EventCallback {
+ RDMAConnectedSocketImpl *csi;
+ bool active;
+ public:
+ C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
+ void do_request(int fd) {
+ if (active)
+ csi->handle_connection();
+ }
+ void close() {
+ active = false;
+ }
+ };
};
-inline ostream& operator<<(ostream& out, const RDMAConnectedSocketImpl &s)
-{
- return s.print(out);
-}
-
class RDMAServerSocketImpl : public ServerSocketImpl {
- protected:
CephContext *cct;
Device *ibdev;
int ibport;
+ NetHandler net;
+ int server_setup_socket;
Infiniband* infiniband;
RDMADispatcher *dispatcher;
RDMAWorker *worker;
public:
RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
- virtual int listen(entity_addr_t &sa, const SocketOptions &opt) = 0;
- virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) = 0;
- virtual void abort_accept() = 0;
- virtual int fd() const = 0;
+ int listen(entity_addr_t &sa, const SocketOptions &opt);
+ virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+ virtual void abort_accept() override;
+ virtual int fd() const override { return server_setup_socket; }
+ int get_fd() { return server_setup_socket; }
};
#endif
#include "msg/async/net_handler.h"
#include "RDMAStack.h"
#include "Device.h"
-#include "RDMAConnTCP.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << " RDMAServerSocketImpl "
RDMAServerSocketImpl::RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
- : cct(cct), infiniband(i), dispatcher(s), worker(w), sa(a)
-{
-}
-
-RDMAServerConnTCP::RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
- : RDMAServerSocketImpl(cct, i, s, w, a), net(cct), server_setup_socket(-1)
+ : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a)
{
ibdev = infiniband->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
ibport = cct->_conf->ms_async_rdma_port_num;
ibdev->init(ibport);
}
-int RDMAServerConnTCP::listen(entity_addr_t &sa, const SocketOptions &opt)
+int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
{
int rc = 0;
server_setup_socket = net.create_socket(sa.get_family(), true);
return -errno;
}
-int RDMAServerConnTCP::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
+int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)
{
ldout(cct, 15) << __func__ << dendl;
out->set_sockaddr((sockaddr*)&ss);
net.set_priority(sd, opt.priority, out->get_family());
- RDMAConnectedSocketImpl *server;
+ RDMAConnectedSocketImpl* server;
//Worker* w = dispatcher->get_stack()->get_worker();
server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));
server->set_accept_fd(sd);
return 0;
}
-void RDMAServerConnTCP::abort_accept()
+void RDMAServerSocketImpl::abort_accept()
{
if (server_setup_socket >= 0)
::close(server_setup_socket);
#include "common/deleter.h"
#include "common/Tub.h"
#include "RDMAStack.h"
-#include "RDMAConnTCP.h"
#include "Device.h"
#define dout_subsys ceph_subsys_ms
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
- auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
+ auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
delete p;