OPTION(ms_async_rdma_device_name, OPT_STR, "")
OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL, false)
OPTION(ms_async_rdma_buffer_size, OPT_INT, 8192)
-OPTION(ms_async_rdma_send_buffers, OPT_U32, 32)
-OPTION(ms_async_rdma_receive_buffers, OPT_U32, 64)
+OPTION(ms_async_rdma_send_buffers, OPT_U32, 10240)
+OPTION(ms_async_rdma_receive_buffers, OPT_U32, 10240)
+OPTION(ms_async_rdma_port_num, OPT_U32, 1)
OPTION(inject_early_sigterm, OPT_BOOL, false)
goto fail;
} else if (r == 0) {
ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl;
- center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
+ if (async_msgr->get_stack()->nonblock_connect_need_writable_event())
+ center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
break;
}
while (true) {
entity_addr_t addr;
ConnectedSocket cli_socket;
- int r = listen_socket.accept(&cli_socket, opts, &addr);
+ Worker *w = worker;
+ if (!msgr->get_stack()->support_local_listen_table())
+ w = msgr->get_stack()->get_worker();
+ int r = listen_socket.accept(&cli_socket, opts, &addr, w);
if (r == 0) {
ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl;
- msgr->add_accept(worker, std::move(cli_socket), addr);
+ msgr->add_accept(w, std::move(cli_socket), addr);
continue;
} else {
if (r == -EINTR) {
started = false;
}
-AsyncConnectionRef AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
+void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
{
lock.Lock();
- if (!stack->support_local_listen_table())
- w = stack->get_worker();
AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
conn->accept(std::move(cli_socket), addr);
accepting_conns.insert(conn);
lock.Unlock();
- return conn;
}
AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
}
void learned_addr(const entity_addr_t &peer_addr_for_me);
- AsyncConnectionRef add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
+ void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
+ NetworkStack *get_stack() {
+ return stack;
+ }
/**
* This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.
public:
explicit PosixServerSocketImpl(NetHandler &h, const entity_addr_t &sa, int f): handler(h), sa(sa), _fd(f) {}
- virtual int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out) override;
+ virtual int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
virtual void abort_accept() override {
::close(_fd);
}
}
};
-int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) {
+int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
assert(sock);
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
#include "msg/msg_types.h"
#include "msg/async/Event.h"
+class Worker;
class ConnectedSocketImpl {
public:
virtual ~ConnectedSocketImpl() {}
class ServerSocketImpl {
public:
virtual ~ServerSocketImpl() {}
- virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) = 0;
+ virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
virtual void abort_accept() = 0;
/// Get file descriptor
virtual int fd() const = 0;
///
/// \Accepts a \ref ConnectedSocket representing the connection, and
/// a \ref entity_addr_t describing the remote endpoint.
- int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) {
- return _ssi->accept(sock, opt, out);
+ int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
+ return _ssi->accept(sock, opt, out, w);
}
/// Stops any \ref accept() in progress.
// But for dpdk backend, we maintain listen table in each thread. So we
// need to let each thread do binding port.
virtual bool support_local_listen_table() const { return false; }
+ virtual bool nonblock_connect_need_writable_event() const { return true; }
void start();
void stop();
\r
static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;\r
static const uint32_t MAX_INLINE_DATA = 128;\r
-static const uint32_t UDP_MSG_LEN = sizeof("0000:00000000:00000000:00000000000000000000000000000000") - 1;\r
+static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");\r
+static const uint32_t CQ_DEPTH = 30000;\r
\r
-Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(new ibv_device_attr)\r
+Device::Device(CephContext *c, ibv_device* d): cct(c), device(d), device_attr(new ibv_device_attr), active_port(nullptr)\r
{\r
if (device == NULL) {\r
lderr(cct) << __func__ << "device == NULL" << cpp_strerror(errno) << dendl;\r
lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;\r
assert(0);\r
}\r
+}\r
+\r
+void Device::binding_port(uint8_t port_num) {\r
port_cnt = device_attr->phys_port_cnt;\r
ports = new Port*[port_cnt];\r
for (uint8_t i = 0; i < port_cnt; ++i) {\r
ports[i] = new Port(cct, ctxt, i+1);\r
- if (ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {\r
+ if (i+1 == port_num && ports[i]->get_port_attr()->state == IBV_PORT_ACTIVE) {\r
active_port = ports[i];\r
ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;\r
+ return ;\r
} else {\r
- ldout(cct, 10) << __func__ << " port " << i+1 << " is unactive(" << ports[i]->get_port_attr()->state << ")"<< dendl;\r
+ ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << ports[i]->get_port_attr()->state << ")"<< dendl;\r
}\r
}\r
- if (!active_port) {\r
- lderr(cct) << __func__ << " no active port found" << dendl;\r
+ if (nullptr == active_port) {\r
+ lderr(cct) << __func__ << " port not found" << dendl;\r
assert(active_port);\r
}\r
}\r
\r
-Infiniband::Infiniband(CephContext *c, const std::string &device_name): cct(c), device_list(c), net(c)\r
+Infiniband::Infiniband(CephContext *c, const std::string &device_name, uint8_t port_num): cct(c), device_list(c), net(c)\r
{\r
device = device_list.get_device(device_name.c_str());\r
+ device->binding_port(port_num);\r
assert(device);\r
ib_physical_port = device->active_port->get_port_num();\r
pd = new ProtectionDomain(cct, device);\r
* QueuePair on success or NULL if init fails\r
* See QueuePair::QueuePair for parameter documentation.\r
*/\r
-Infiniband::QueuePair* Infiniband::create_queue_pair(CompletionQueue *c, ibv_qp_type type)\r
+Infiniband::QueuePair* Infiniband::create_queue_pair(CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)\r
{\r
- Infiniband::CompletionChannel* cc = create_comp_channel();\r
- if (!cc)\r
- return NULL;\r
-\r
- Infiniband::CompletionQueue* cq = create_comp_queue(cc);\r
- if (!cq) {\r
- delete cc;\r
- lderr(cct) << __func__ << " failed to create cq." << dendl;\r
- return NULL;\r
- }\r
-\r
- Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, c, cq, max_send_wr, max_recv_wr);\r
+ Infiniband::QueuePair *qp = new QueuePair(*this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);\r
if (qp->init()) {\r
- delete cc;\r
- delete cq;\r
delete qp;\r
return NULL;\r
}\r
int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);\r
if (ret) {\r
lderr(cct) << __func__ << " ib_post_srq_recv failed on post "\r
- << cpp_strerror(errno) << dendl;\r
+ << cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
return 0;\r
{\r
vector<Chunk*> free_chunks;\r
int r = memory_manager->get_channel_buffers(free_chunks, 0);\r
- assert(r == 0);\r
+ assert(r > 0);\r
for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {\r
r = post_chunk(*iter);\r
assert(r == 0);\r
\r
Infiniband::CompletionQueue* Infiniband::create_comp_queue(CompletionChannel *cc)\r
{\r
- ldout(cct, 20) << __func__ << " completion channel=" << cc << dendl;\r
- Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(*this, max_recv_wr, cc);\r
+ Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(*this, CQ_DEPTH, cc);\r
if (cq->init()) {\r
delete cq;\r
return NULL;\r
\r
// 1 means no valid buffer read, 0 means got enough buffer\r
// else return < 0 means error\r
-int Infiniband::recv_udp_msg(int sd, IBSYNMsg& im, entity_addr_t *addr)\r
+int Infiniband::recv_msg(int sd, IBSYNMsg& im)\r
{\r
- assert(sd >= 0);\r
- ssize_t r;\r
- entity_addr_t socket_addr;\r
- struct sockaddr from;\r
- socklen_t slen = sizeof(from);\r
- char msg[UDP_MSG_LEN];\r
- char gid[32];\r
- r = ::recvfrom(sd, &msg, sizeof(msg), 0, &from, &slen);\r
+ char msg[TCP_MSG_LEN];\r
+ char gid[33];\r
+ ssize_t r = ::read(sd, &msg, sizeof(msg));\r
// Drop incoming qpt\r
if (cct->_conf->ms_inject_socket_failures && sd >= 0) {\r
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {\r
ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;\r
- r = -1;\r
+ return -EINVAL;\r
}\r
}\r
- if (r == -1) {\r
- lderr(cct) << __func__ << " recv got error " << errno << ": "\r
- << cpp_strerror(errno) << dendl;\r
- return -1;\r
+ if (r < 0) {\r
+ r = -errno;\r
+ lderr(cct) << __func__ << " got error " << errno << ": "\r
+ << cpp_strerror(errno) << dendl;\r
} else if ((size_t)r != sizeof(msg)) { // valid message length\r
- lderr(cct) << __func__ << " recv got bad length (" << r << ")." << cpp_strerror(errno) << dendl;\r
- return 1;\r
+ r = -EINVAL;\r
+ lderr(cct) << __func__ << " got bad length (" << r << "): " << cpp_strerror(errno) << dendl;\r
} else { // valid message\r
- socket_addr.set_sockaddr(&from);\r
- if (addr) {\r
- *addr = socket_addr;\r
- }\r
- sscanf(msg, "%04x:%08x:%08x:%s", &(im.lid), &(im.qpn), &(im.psn), gid);\r
+ sscanf(msg, "%x:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);\r
wire_gid_to_gid(gid, &(im.gid));\r
- ldout(cct, 10) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << gid << dendl;\r
- return 0;\r
+ ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;\r
}\r
+ return r;\r
}\r
\r
-int Infiniband::send_udp_msg(int sd, IBSYNMsg& im, entity_addr_t &peeraddr)\r
+int Infiniband::send_msg(int sd, IBSYNMsg& im)\r
{\r
- assert(sd >= 0);\r
int retry = 0;\r
ssize_t r;\r
\r
- char msg[UDP_MSG_LEN];\r
- char gid[32];\r
+ char msg[TCP_MSG_LEN];\r
+ char gid[33];\r
retry:\r
gid_to_wire_gid(&(im.gid), gid);\r
- r = snprintf(msg, UDP_MSG_LEN, "%04x:%08x:%08x:%s", im.lid, im.qpn, im.psn, gid);\r
- ldout(cct, 20) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << gid << " r=" << r << dendl;\r
- r = ::sendto(sd, msg, sizeof(msg), 0, peeraddr.get_sockaddr(),\r
- peeraddr.get_sockaddr_len());\r
+ sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);\r
+ ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn\r
+ << ", " << im.peer_qpn << ", " << gid << dendl;\r
+ r = ::write(sd, msg, sizeof(msg));\r
// Drop incoming qpt\r
if (cct->_conf->ms_inject_socket_failures && sd >= 0) {\r
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {\r
ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;\r
- r = -1;\r
+ return -EINVAL;\r
}\r
}\r
\r
if ((size_t)r != sizeof(msg)) {\r
+ // FIXME need to handle EAGAIN instead of retry\r
if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {\r
retry++;\r
goto retry;\r
}\r
if (r < 0)\r
lderr(cct) << __func__ << " send returned error " << errno << ": "\r
- << cpp_strerror(errno) << dendl;\r
+ << cpp_strerror(errno) << dendl;\r
else\r
lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;\r
- return -1;\r
+ return -errno;\r
}\r
return 0;\r
}\r
{\r
if (qp)\r
assert(!ibv_destroy_qp(qp));\r
- ldout(infiniband.cct, 20) << __func__ << " successfully destroyed QueuePair." << dendl;\r
}\r
\r
Infiniband::CompletionChannel::~CompletionChannel()\r
{\r
if (channel) {\r
int r = ibv_destroy_comp_channel(channel);\r
- ldout(infiniband.cct, 20) << __func__ << " r: " << r << dendl;\r
+ if (r < 0)\r
+ lderr(infiniband.cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;\r
assert(r == 0);\r
}\r
- ldout(infiniband.cct, 20) << __func__ << " successfully destroyed CompletionChannel." << dendl;\r
}\r
\r
Infiniband::CompletionQueue::~CompletionQueue()\r
{\r
if (cq) {\r
int r = ibv_destroy_cq(cq);\r
- ldout(infiniband.cct, 20) << __func__ << " r: " << cpp_strerror(errno) << dendl;\r
+ if (r < 0)\r
+ lderr(infiniband.cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;\r
assert(r == 0);\r
}\r
- ldout(infiniband.cct, 20) << __func__ << " successfully destroyed CompletionQueue." << dendl;\r
}\r
\r
int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)\r
{\r
ldout(infiniband.cct, 20) << __func__ << " started." << dendl;\r
int r = ibv_req_notify_cq(cq, 0);\r
- if (r) {\r
+ if (r < 0)\r
lderr(infiniband.cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;\r
- }\r
return r;\r
}\r
\r
if (ibv_req_notify_cq(cq, 0)) {\r
lderr(infiniband.cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;\r
ibv_destroy_cq(cq);\r
+ cq = nullptr;\r
return -1;\r
}\r
\r
channel = ibv_create_comp_channel(infiniband.device->ctxt);\r
if (!channel) {\r
lderr(infiniband.cct) << __func__ << " failed to create receive completion channel: "\r
- << cpp_strerror(errno) << dendl;\r
+ << cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
int rc = infiniband.net.set_nonblock(channel->fd);\r
return "<status out of range!>";\r
return lookup[status];\r
}\r
+\r
+const char* Infiniband::qp_state_string(int status) {\r
+ switch(status) {\r
+ case IBV_QPS_RESET : return "IBV_QPS_RESET";\r
+ case IBV_QPS_INIT : return "IBV_QPS_INIT";\r
+ case IBV_QPS_RTR : return "IBV_QPS_RTR";\r
+ case IBV_QPS_RTS : return "IBV_QPS_RTS";\r
+ case IBV_QPS_SQD : return "IBV_QPS_SQD";\r
+ case IBV_QPS_SQE : return "IBV_QPS_SQE";\r
+ case IBV_QPS_ERR : return "IBV_QPS_ERR";\r
+ default: return " out of range.";\r
+ }\r
+}\r
uint16_t lid;\r
uint32_t qpn;\r
uint32_t psn;\r
+ uint32_t peer_qpn;\r
union ibv_gid gid;\r
} __attribute__((packed));\r
\r
const char* get_name() { return name;}\r
uint16_t get_lid() { return active_port->get_lid(); }\r
ibv_gid get_gid() { return active_port->get_gid(); }\r
+ void binding_port(uint8_t port_num);\r
struct ibv_context *ctxt;\r
ibv_device_attr *device_attr;\r
Port* active_port;\r
}\r
}\r
~DeviceList() {\r
- for (int i=0; devices[i] != NULL; ++i) {\r
+ for (int i=0; i < num; ++i) {\r
delete devices[i];\r
}\r
delete []devices;\r
offset = o;\r
}\r
\r
- size_t get_offset() {\r
+ uint32_t get_offset() {\r
return offset;\r
}\r
\r
return bound;\r
}\r
\r
- size_t read(char* buf, size_t len) {\r
- size_t left = bound - offset;\r
- if(left >= len) {\r
+ uint32_t read(char* buf, uint32_t len) {\r
+ uint32_t left = bound - offset;\r
+ if (left >= len) {\r
memcpy(buf, buffer+offset, len);\r
offset += len;\r
return len;\r
}\r
}\r
\r
- size_t write(char* buf, size_t len) {\r
- size_t left = bytes - offset;\r
+ uint32_t write(char* buf, uint32_t len) {\r
+ uint32_t left = bytes - offset;\r
if (left >= len) {\r
memcpy(buffer+offset, buf, len);\r
offset += len;\r
char* buffer;\r
uint32_t bytes;\r
uint32_t bound;\r
- size_t offset;\r
+ uint32_t offset;\r
ibv_mr* mr;\r
uint64_t owner;\r
};\r
delete *c;\r
++c;\r
}\r
+ if (manager.enabled_huge_page)\r
+ delete base;\r
+ else\r
+ manager.free_huge_pages(base);\r
}\r
int add(uint32_t num) {\r
uint32_t bytes = chunk_size * num;\r
}\r
\r
int get_buffers(std::vector<Chunk*> &chunks, size_t bytes) {\r
+ uint32_t num = bytes / chunk_size + 1;\r
+ if (bytes % chunk_size == 0)\r
+ --num;\r
+ int r = num;\r
Mutex::Locker l(lock);\r
+ if (free_chunks.empty())\r
+ return 0;\r
if (!bytes) {\r
free_chunks.swap(chunks);\r
- return 0;\r
+ r = chunks.size();\r
+ return r;\r
+ }\r
+ if (free_chunks.size() < num) {\r
+ num = free_chunks.size();\r
+ r = num;\r
}\r
- uint32_t num = bytes / chunk_size + 1;\r
- if (bytes % chunk_size == 0)\r
- --num;\r
- if (free_chunks.size() < num)\r
- return -EAGAIN;\r
for (uint32_t i = 0; i < num; ++i) {\r
chunks.push_back(free_chunks.back());\r
free_chunks.pop_back();\r
}\r
- return 0;\r
+ return r;\r
}\r
-\r
MemoryManager& manager;\r
uint32_t chunk_size;\r
Mutex lock;\r
enabled_huge_page = cct->_conf->ms_async_rdma_enable_hugepage;\r
}\r
~MemoryManager() {\r
- if(channel)\r
+ if (channel)\r
delete channel;\r
- if(send)\r
+ if (send)\r
delete send;\r
}\r
void* malloc_huge_pages(size_t size) {\r
}\r
\r
int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}\r
+ int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);}\r
bool enabled_huge_page;\r
private:\r
Cluster* channel;//RECV\r
\r
public:\r
NetHandler net;\r
- explicit Infiniband(CephContext *c, const std::string &device_name);\r
+ explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);\r
\r
/**\r
* Destroy an Infiniband object.\r
public:\r
typedef MemoryManager::Cluster Cluster;\r
typedef MemoryManager::Chunk Chunk;\r
- QueuePair* create_queue_pair(CompletionQueue *w, ibv_qp_type type);\r
+ QueuePair* create_queue_pair(CompletionQueue*, CompletionQueue*, ibv_qp_type type);\r
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);\r
int post_chunk(Chunk* chunk);\r
int post_channel_cluster();\r
uint8_t get_ib_physical_port() {\r
return ib_physical_port;\r
}\r
- int send_udp_msg(int sd, IBSYNMsg& msg, entity_addr_t &peeraddr);\r
- int recv_udp_msg(int sd, IBSYNMsg& msg, entity_addr_t *addr);\r
+ int send_msg(int sd, IBSYNMsg& msg);\r
+ int recv_msg(int sd, IBSYNMsg& msg);\r
uint16_t get_lid() { return device->get_lid(); }\r
ibv_gid get_gid() { return device->get_gid(); }\r
MemoryManager* get_memory_manager() { return memory_manager; }\r
Device* get_device() { return device; }\r
int get_async_fd() { return device->ctxt->async_fd; }\r
+ int recall_chunk(Chunk* c) {\r
+ if (memory_manager->is_rx_chunk(c)) {\r
+ post_chunk(c); \r
+ return 1;\r
+ } else if (memory_manager->is_tx_chunk(c)) {\r
+ vector<Chunk*> v;\r
+ v.push_back(c);\r
+ memory_manager->return_tx(v); \r
+ return 2;\r
+ }\r
+ return -1;\r
+ }\r
+ int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); }\r
+ int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); }\r
static const char* wc_status_to_string(int status);\r
+ static const char* qp_state_string(int status);\r
};\r
\r
#endif\r
#undef dout_prefix\r
#define dout_prefix *_dout << " RDMAConnectedSocketImpl "\r
\r
-int RDMAConnectedSocketImpl::activate() {\r
+int RDMAConnectedSocketImpl::activate()\r
+{\r
ibv_qp_attr qpa;\r
int r;\r
\r
IBV_QP_MAX_DEST_RD_ATOMIC);\r
if (r) {\r
lderr(cct) << __func__ << " failed to transition to RTR state: "\r
- << cpp_strerror(errno) << dendl;\r
+ << cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
\r
IBV_QP_MAX_QP_RD_ATOMIC);\r
if (r) {\r
lderr(cct) << __func__ << " failed to transition to RTS state: "\r
- << cpp_strerror(errno) << dendl;\r
+ << cpp_strerror(errno) << dendl;\r
return -1;\r
}\r
\r
// the queue pair should be ready to use once the client has finished\r
// setting up their end.\r
ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;\r
+ ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;\r
+\r
+ if (!is_server) {\r
+ connected = 1; //indicate successfully\r
+ ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;\r
+ submit(false);\r
+ }\r
+ active = true;\r
\r
- connected = 1;//indicate successfully\r
return 0;\r
}\r
\r
+int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {\r
+ ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"\r
+ << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;\r
+ tcp_fd = infiniband->net.connect(peer_addr);\r
+\r
+ if (tcp_fd < 0) {\r
+ return -errno;\r
+ }\r
+ infiniband->net.set_close_on_exec(tcp_fd);\r
+\r
+ int r = infiniband->net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);\r
+ if (r < 0) {\r
+ ::close(tcp_fd);\r
+ return -errno;\r
+ }\r
+\r
+ ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;\r
+ infiniband->net.set_priority(tcp_fd, opts.priority);\r
+ my_msg.peer_qpn = 0;\r
+ r = infiniband->send_msg(tcp_fd, my_msg);\r
+ if (r < 0)\r
+ return r;\r
+\r
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);\r
+ return 0;\r
+}\r
+\r
+void RDMAConnectedSocketImpl::handle_connection() {\r
+ ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;\r
+ int r = infiniband->recv_msg(tcp_fd, peer_msg);\r
+ if (r < 0) {\r
+ if (r != -EAGAIN)\r
+ fault();\r
+ return;\r
+ }\r
+\r
+ if (!is_server) {// syn + ack from server\r
+ my_msg.peer_qpn = peer_msg.qpn;\r
+ ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn\r
+ << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;\r
+ if (!connected) {\r
+ r = activate();\r
+ assert(!r);\r
+ }\r
+ notify();\r
+ r = infiniband->send_msg(tcp_fd, my_msg);\r
+ if (r < 0) {\r
+ ldout(cct, 1) << __func__ << " send client ack failed." << dendl;\r
+ fault();\r
+ }\r
+ } else {\r
+ if (peer_msg.peer_qpn == 0) {// syn from client\r
+ if (active) {\r
+ ldout(cct, 10) << __func__ << " server is already active." << dendl;\r
+ return ;\r
+ }\r
+ r = infiniband->send_msg(tcp_fd, my_msg);\r
+ if (r < 0) {\r
+ ldout(cct, 1) << __func__ << " server ack failed." << dendl;\r
+ fault();\r
+ return ;\r
+ }\r
+ r = activate();\r
+ assert(!r);\r
+ } else { // ack from client\r
+ connected = 1;\r
+ cleanup();\r
+ submit(false);\r
+ notify();\r
+ }\r
+ }\r
+}\r
+\r
ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)\r
{\r
- ldout(cct, 20) << __func__ << " need to read bytes: " << len << " buffers size: " << buffers.size() << dendl;\r
-\r
+ uint64_t i = 0;\r
+ int r = ::read(notify_fd, &i, sizeof(i));\r
+ ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;\r
if (error)\r
return -error;\r
ssize_t read = 0;\r
if (cqe.empty())\r
return read == 0 ? -EAGAIN : read;\r
\r
- ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses."<< dendl;\r
+ ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;\r
for (size_t i = 0; i < cqe.size(); ++i) {\r
ibv_wc* response = &cqe[i];\r
assert(response->status == IBV_WC_SUCCESS);\r
- ldout(cct, 20) << __func__ << " cqe " << response->byte_len << " bytes." << dendl;\r
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);\r
+ ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;\r
chunk->prepare_read(response->byte_len);\r
- assert(!response->byte_len);\r
+ if (response->byte_len == 0) {\r
+ if (connected) {\r
+ error = ECONNRESET;\r
+ assert(infiniband->post_chunk(chunk) == 0);\r
+ ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;\r
+ }\r
+ break;\r
+ }\r
+ //assert(response->byte_len);\r
if (read == (ssize_t)len) {\r
buffers.push_back(chunk);\r
- ldout(cct, 20) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
+ ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;\r
} else if (read + response->byte_len > (ssize_t)len) {\r
read += chunk->read(buf+read, (ssize_t)len-read);\r
buffers.push_back(chunk);\r
- ldout(cct, 20) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
+ ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;\r
} else {\r
read += chunk->read(buf+read, response->byte_len);\r
assert(infiniband->post_chunk(chunk) == 0);\r
}\r
}\r
\r
+ if (is_server && connected == 0) {\r
+ ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;\r
+ connected = 1; //if so, we don't need the last handshake\r
+ cleanup();\r
+ submit(false);\r
+ }\r
+\r
+ if (read == 0 && error)\r
+ return -error;\r
return read == 0 ? -EAGAIN : read;\r
}\r
\r
for (; c != buffers.end() ; ++c) {\r
tmp = (*c)->read(buf+read, len-read);\r
read += tmp;\r
- ldout(cct, 20) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;\r
+ ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;\r
if ((*c)->over()) {\r
assert(infiniband->post_chunk(*c) == 0);\r
- ldout(cct, 20) << __func__ << " one chunk over." << dendl;\r
+ ldout(cct, 25) << __func__ << " one chunk over." << dendl;\r
}\r
if (read == len) {\r
break;\r
if (c != buffers.end() && (*c)->over())\r
c++;\r
buffers.erase(buffers.begin(), c);\r
- ldout(cct, 20) << __func__ << " got " << read << " bytes here. buffers size: " << buffers.size() << dendl;\r
+ ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl;\r
return read;\r
}\r
\r
return -error;\r
static const int MAX_COMPLETIONS = 16;\r
ibv_wc wc[MAX_COMPLETIONS];\r
- ssize_t size;\r
+ ssize_t size = 0;\r
\r
ibv_wc* response;\r
Chunk* chunk;\r
auto iter = buffers.begin();\r
if (iter != buffers.end()) {\r
chunk = *iter;\r
- auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
+ // FIXME need to handle release\r
+ // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
buffers.erase(iter);\r
loaded = true;\r
size = chunk->bound;\r
response = &wc[i];\r
chunk = reinterpret_cast<Chunk*>(response->wr_id);\r
chunk->prepare_read(response->byte_len);\r
- if(!loaded && i == 0) {\r
- auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
+ if (!loaded && i == 0) {\r
+ // FIXME need to handle release\r
+ // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);\r
size = chunk->bound;\r
continue;\r
}\r
\r
ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)\r
{\r
- if (error)\r
+ if (error) {\r
+ if (!active)\r
+ return -EPIPE;\r
return -error;\r
+ }\r
size_t bytes = bl.length();\r
if (!bytes)\r
return 0;\r
- pending_bl.claim_append(bl);\r
+ {\r
+ Mutex::Locker l(lock);\r
+ pending_bl.claim_append(bl);\r
+ if (!connected) {\r
+ ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;\r
+ return bytes;\r
+ }\r
+ }\r
+ ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;\r
ssize_t r = submit(more);\r
if (r < 0 && r != -EAGAIN)\r
return r;\r
{\r
if (error)\r
return -error;\r
+ Mutex::Locker l(lock);\r
std::vector<Chunk*> tx_buffers;\r
size_t bytes = pending_bl.length();\r
- if (worker->reserve_message_buffer(this, tx_buffers, bytes) < 0) {\r
- ldout(cct, 10) << __func__ << " no enough buffers" << dendl;\r
- pending_bl.claim_append(pending_bl);\r
- return -EAGAIN;\r
+ ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "\r
+ << pending_bl.buffers().size() << dendl;\r
+ if (!bytes)\r
+ return 0;\r
+\r
+ int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);\r
+ if (ret == 0) {\r
+ ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl;\r
+ return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough\r
}\r
- ldout(cct, 20) << __func__ << " prepare " << bytes << " bytes, tx buffer count: " << tx_buffers.size() << dendl;\r
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
list<bufferptr>::const_iterator it = pending_bl.buffers().begin();\r
+ unsigned total = 0;\r
while (it != pending_bl.buffers().end()) {\r
const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());\r
- uint32_t copied = 0;\r
- // ldout(cct, 20) << __func__ << " app_buffer: " << addr << " length: " << it->length() << dendl;\r
- while(copied < it->length()) {\r
- // ldout(cct, 20) << __func__ << " current_buffer: " << *current_buffer << " copied: " << copied << dendl;\r
- size_t ret = (*current_buffer)->write((char*)addr+copied, it->length() - copied);\r
- copied += ret;\r
- // ldout(cct, 20) << __func__ << " ret: " << ret << " copied: " << copied << dendl;\r
- if((*current_buffer)->full()){\r
+ unsigned copied = 0;\r
+ while (copied < it->length()) {\r
+ uint32_t r = (*current_buffer)->write((char*)addr+copied, it->length() - copied);\r
+ copied += r;\r
+ total += r;\r
+ if ((*current_buffer)->full()){\r
++current_buffer;\r
+ if (current_buffer == tx_buffers.end())\r
+ goto sending;\r
}\r
}\r
++it;\r
}\r
\r
- ssize_t r = post_work_request(tx_buffers);\r
+ sending:\r
+ assert(total <= pending_bl.length());\r
+ bufferlist swapped;\r
+ if (total < pending_bl.length()) {\r
+ pending_bl.splice(total, pending_bl.length()-total, &swapped);\r
+ pending_bl.swap(swapped);\r
+ } else {\r
+ pending_bl.clear();\r
+ }\r
+\r
+ ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "\r
+ << pending_bl.buffers().size() << dendl;\r
+\r
+ int r = post_work_request(tx_buffers);\r
if (r < 0)\r
return r;\r
\r
ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;\r
- pending_bl.clear();\r
return bytes;\r
}\r
\r
int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)\r
{\r
+ ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;\r
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();\r
ibv_sge isge[tx_buffers.size()];\r
uint32_t current_sge = 0;\r
uint32_t current_swr = 0;\r
ibv_send_wr* pre_wr = NULL;\r
\r
+ memset(iswr, 0, sizeof(iswr));\r
+ memset(isge, 0, sizeof(isge));\r
current_buffer = tx_buffers.begin();\r
while (current_buffer != tx_buffers.end()) {\r
isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);\r
isge[current_sge].length = (*current_buffer)->get_offset();\r
isge[current_sge].lkey = (*current_buffer)->mr->lkey;\r
- ldout(cct, 20) << __func__ << " current_buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;\r
+ ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;\r
\r
iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);\r
iswr[current_swr].next = NULL;\r
iswr[current_swr].num_sge = 1;\r
iswr[current_swr].opcode = IBV_WR_SEND;\r
iswr[current_swr].send_flags = IBV_SEND_SIGNALED;\r
- /*if(isge[current_sge].length < infiniband->max_inline_data) {\r
+ /*if (isge[current_sge].length < infiniband->max_inline_data) {\r
iswr[current_swr].send_flags = IBV_SEND_INLINE;\r
ldout(cct, 20) << __func__ << " send_inline." << dendl;\r
}*/\r
<< cpp_strerror(errno) << dendl;\r
return -errno;\r
}\r
+ ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl;\r
return 0;\r
}\r
+\r
+void RDMAConnectedSocketImpl::fin() {\r
+ ibv_send_wr wr;\r
+ memset(&wr, 0, sizeof(wr));\r
+ wr.wr_id = reinterpret_cast<uint64_t>(qp);\r
+ wr.num_sge = 0;\r
+ wr.opcode = IBV_WR_SEND;\r
+ wr.send_flags = IBV_SEND_SIGNALED;\r
+ ibv_send_wr* bad_tx_work_request;\r
+ if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {\r
+ lderr(cct) << __func__ << " failed to send message="\r
+ << " ibv_post_send failed(most probably should be peer not ready): "\r
+ << cpp_strerror(errno) << dendl;\r
+ return ;\r
+ }\r
+}\r
+\r
+void RDMAConnectedSocketImpl::cleanup() {\r
+ if (con_handler) {\r
+ (static_cast<C_handle_connection*>(con_handler))->close();\r
+ worker->center.submit_to(worker->center.get_id(), [this]() {\r
+ worker->center.delete_file_event(tcp_fd, EVENT_READABLE);\r
+ }, false);\r
+ delete con_handler;\r
+ con_handler = nullptr;\r
+ }\r
+}\r
\r
int RDMAServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)\r
{\r
- server_setup_socket = ::socket(sa.get_family(), SOCK_DGRAM, 0);\r
- if (server_setup_socket == -1) {\r
+ int rc = 0;\r
+ server_setup_socket = infiniband->net.create_socket(sa.get_family(), true);\r
+ if (server_setup_socket < 0) {\r
+ rc = -errno;\r
lderr(cct) << __func__ << " failed to create server socket: "\r
<< cpp_strerror(errno) << dendl;\r
- return -errno;\r
+ return rc;\r
}\r
\r
- int on = 1;\r
- int rc = ::setsockopt(server_setup_socket, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));\r
+ rc = net.set_nonblock(server_setup_socket);\r
if (rc < 0) {\r
- lderr(cct) << __func__ << " unable to setsockopt: " << cpp_strerror(errno) << dendl;\r
goto err;\r
}\r
\r
- rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());\r
+ rc = net.set_socket_options(server_setup_socket, opt.nodelay, opt.rcbuf_size);\r
if (rc < 0) {\r
- lderr(cct) << __func__ << " unable to bind to " << sa.get_sockaddr()\r
- << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;\r
goto err;\r
}\r
+ net.set_close_on_exec(server_setup_socket);\r
\r
- rc = net.set_nonblock(server_setup_socket);\r
+ rc = ::bind(server_setup_socket, sa.get_sockaddr(), sa.get_sockaddr_len());\r
if (rc < 0) {\r
+ rc = -errno;\r
+ ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()\r
+ << " on port " << sa.get_port() << ": " << cpp_strerror(errno) << dendl;\r
goto err;\r
}\r
\r
- net.set_close_on_exec(server_setup_socket);\r
+ rc = ::listen(server_setup_socket, 128);\r
+ if (rc < 0) {\r
+ rc = -errno;\r
+ lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(errno) << dendl;\r
+ goto err;\r
+ }\r
\r
ldout(cct, 20) << __func__ << " bind to " << sa.get_sockaddr() << " on port " << sa.get_port() << dendl;\r
return 0;\r
err:\r
::close(server_setup_socket);\r
server_setup_socket = -1;\r
- return -1;\r
+ return -errno;\r
}\r
\r
-int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out)\r
+int RDMAServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w)\r
{\r
ldout(cct, 15) << __func__ << dendl;\r
- int r;\r
- RDMAConnectedSocketImpl* server;\r
- while (1) {\r
- IBSYNMsg msg;//TODO\r
- entity_addr_t addr;\r
- r = infiniband->recv_udp_msg(server_setup_socket, msg, &addr);\r
- if (r < 0) {\r
- r = -errno;\r
- if (r != -EAGAIN)\r
- ldout(cct, 10) << __func__ << " recv msg failed:" << cpp_strerror(errno)<< dendl;\r
- break;\r
- } else if (r > 0) {\r
- ldout(cct, 1) << __func__ << " recv msg not whole." << dendl;\r
- continue;\r
- } else {\r
- server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, worker, msg);\r
- msg = server->get_my_msg();\r
- r = infiniband->send_udp_msg(server_setup_socket, msg, addr);\r
- server->activate();\r
- std::unique_ptr<RDMAConnectedSocketImpl> csi(server);\r
- *sock = ConnectedSocket(std::move(csi));\r
- if(out)\r
- *out = sa;\r
- return r;\r
- }\r
+\r
+ assert(sock);\r
+ sockaddr_storage ss;\r
+ socklen_t slen = sizeof(ss);\r
+ int sd = ::accept(server_setup_socket, (sockaddr*)&ss, &slen);\r
+ if (sd < 0) {\r
+ return -errno;\r
+ }\r
+ ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;\r
+\r
+ infiniband->net.set_close_on_exec(sd);\r
+ int r = infiniband->net.set_nonblock(sd);\r
+ if (r < 0) {\r
+ ::close(sd);\r
+ return -errno;\r
}\r
\r
- return r;\r
+ r = infiniband->net.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);\r
+ if (r < 0) {\r
+ ::close(sd);\r
+ return -errno;\r
+ }\r
+ infiniband->net.set_priority(sd, opt.priority);\r
+\r
+ RDMAConnectedSocketImpl* server;\r
+ //Worker* w = dispatcher->get_stack()->get_worker();\r
+ server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));\r
+ server->set_accept_fd(sd);\r
+ ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;\r
+ std::unique_ptr<RDMAConnectedSocketImpl> csi(server);\r
+ *sock = ConnectedSocket(std::move(csi));\r
+ if (out)\r
+ out->set_sockaddr((sockaddr*)&ss);\r
+\r
+ return 0;\r
}\r
RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
: Worker(c, i), stack(nullptr), infiniband(NULL),
- tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL)
-{}
+ tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false)
+{
+}
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this);
- entity_addr_t sa;
- memcpy(&sa, &addr, sizeof(addr));
-
- IBSYNMsg msg = p->get_my_msg();
- ldout(cct, 20) << __func__ << " connecting to " << sa.get_sockaddr() << " : " << sa.get_port() << dendl;
- ldout(cct, 20) << __func__ << " my syn msg : < " << msg.qpn << ", " << msg.psn << ", " << msg.lid << ">"<< dendl;
-
- int client_setup_socket = ::socket(PF_INET, SOCK_DGRAM, 0);
- if (client_setup_socket == -1) {
- lderr(cct) << __func__ << " failed to create client socket: " << strerror(errno) << dendl;
- return -errno;
- }
-
- int r = ::connect(client_setup_socket, addr.get_sockaddr(), addr.get_sockaddr_len());
- if (r < 0) {
- lderr(cct) << __func__ << " failed to connect " << addr << ": "
- << strerror(errno) << dendl;
- return -errno;
- }
-
- r = infiniband->send_udp_msg(client_setup_socket, msg, sa);
- if (r < 0) {
- ldout(cct, 0) << __func__ << " send msg failed." << dendl;
- return r;
- }
+ int r = p->try_connect(addr, opts);
- // FIXME: need to make this async
- r = infiniband->recv_udp_msg(client_setup_socket, msg, &sa);
if (r < 0) {
- ldout(cct, 0) << __func__ << " recv msg failed." << dendl;
+ ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
return r;
}
- p->set_peer_msg(msg);
- ldout(cct, 20) << __func__ << " peer msg : < " << msg.qpn << ", " << msg.psn << ", " << msg.lid << "> " << dendl;
- r = p->activate();
- assert(!r);
std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
*socket = ConnectedSocket(std::move(csi));
- ::close(client_setup_socket);
-
return 0;
}
+
RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
{
if (!global_infiniband)
global_infiniband = new Infiniband(
- cct, cct->_conf->ms_async_rdma_device_name);
- dispatcher = new RDMADispatcher(cct, global_infiniband);
+ cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
+ ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
+ dispatcher = new RDMADispatcher(cct, global_infiniband, this);
+ unsigned num = get_num_worker();
+ for (unsigned i = 0; i < num; ++i) {
+ RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
+ w->set_ib(global_infiniband);
+ w->set_stack(this);
+ }
+ ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
}
void RDMAWorker::initialize()
{
- infiniband = global_infiniband;
- tx_cc = infiniband->create_comp_channel();
- tx_cq = infiniband->create_comp_queue(tx_cc);
- center.create_file_event(tx_cc->get_fd(), EVENT_READABLE, tx_handler);
+ dispatcher = stack->get_dispatcher();
+ notify_fd = dispatcher->register_worker(this);
+ center.create_file_event(notify_fd, EVENT_READABLE, tx_handler);
memory_manager = infiniband->get_memory_manager();
}
int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
int r = infiniband->get_tx_buffers(c, bytes);
- if (r == 0) {
+ if (r > 0) {
stack->get_dispatcher()->inflight += c.size();
+ ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
return r;
}
+ assert(r == 0);
if (pending_sent_conns.back() != o)
pending_sent_conns.push_back(o);
- return 0;
+ dispatcher->pending_buffers(this);
+ return r;
}
/**
*/
int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
{
+ if (chunks.empty())
+ return 0;
+
stack->get_dispatcher()->inflight -= chunks.size();
- infiniband->get_memory_manager()->return_tx(chunks);
+ memory_manager->return_tx(chunks);
+ ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
+
+ pended = false;
+ std::set<RDMAConnectedSocketImpl*> done;
while (!pending_sent_conns.empty()) {
RDMAConnectedSocketImpl *o = pending_sent_conns.front();
+ if (done.count(o) == 0) {
+ done.insert(o);
+ } else {
+ pending_sent_conns.pop_front();
+ continue;
+ }
ssize_t r = o->submit(false);
ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
if (r < 0) {
void RDMAWorker::handle_tx_event()
{
- ldout(cct, 20) << __func__ << dendl;
- if (!tx_cc->get_cq_event())
- return ;
-
- static const int MAX_COMPLETIONS = 16;
- ibv_wc wc[MAX_COMPLETIONS];
std::vector<Chunk*> tx_chunks;
- tx_chunks.reserve(MAX_COMPLETIONS);
-
- bool rearmed = false;
- int n;
- again:
- n = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
- ldout(cct, 20) << __func__ << " pool completion queue got " << n
- << " responses."<< dendl;
- for (int i = 0; i < n; ++i) {
- ibv_wc* response = &wc[i];
+ std::vector<ibv_wc> cqe;
+ get_wc(cqe);
+
+ for (size_t i = 0; i < cqe.size(); ++i) {
+ ibv_wc* response = &cqe[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
- ldout(cct, 20) << __func__ << " opcode: " << response->opcode << " len: " << response->byte_len << dendl;
+ ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;
if (response->status != IBV_WC_SUCCESS) {
if (response->status == IBV_WC_RETRY_EXC_ERR) {
- lderr(cct) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
+ ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
} else if (response->status == IBV_WC_WR_FLUSH_ERR) {
- lderr(cct) << __func__ << " Work Request Flushed Error: this connection's qp="
- << response->qp_num << " should be down while this WR=" << response->wr_id
- << " still in flight." << dendl;
+ ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
+ << response->qp_num << " should be down while this WR=" << response->wr_id
+ << " still in flight." << dendl;
} else {
- lderr(cct) << __func__ << " send work request returned error for buffer("
- << response->wr_id << ") status(" << response->status << "): "
- << infiniband->wc_status_to_string(response->status) << dendl;
+ ldout(cct, 1) << __func__ << " send work request returned error for buffer("
+ << response->wr_id << ") status(" << response->status << "): "
+ << infiniband->wc_status_to_string(response->status) << dendl;
}
RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
if (conn) {
+ ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
conn->fault();
} else {
- ldout(cct, 0) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+ ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
}
}
- assert(memory_manager->is_tx_chunk(chunk));
- tx_chunks.push_back(chunk);
- }
-
- if (n) {
- post_tx_buffer(tx_chunks);
- tx_chunks.clear();
- goto again;
+ //assert(memory_manager->is_tx_chunk(chunk));
+ if (memory_manager->is_tx_chunk(chunk)) {
+ tx_chunks.push_back(chunk);
+ } else {
+ ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin
+ }
}
- if (!rearmed) {
- tx_cq->rearm_notify();
- rearmed = true;
- // Clean up cq events after rearm notify ensure no new incoming event
- // arrived between polling and rearm
- goto again;
- }
+ post_tx_buffer(tx_chunks);
- ldout(cct, 20) << __func__ << " leaving handle_tx_event. " << dendl;
+ ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
+ dispatcher->notify_pending_workers();
}
RDMADispatcher::~RDMADispatcher()
{
done = true;
t.join();
- assert(qp_conns.empty());
+ ldout(cct, 20) << __func__ << " ing..." << dendl;
+ auto i = qp_conns.begin();
+ while (i != qp_conns.end()) {
+ delete i->second.first;
+ ++i;
+ }
+
while (!dead_queue_pairs.empty()) {
delete dead_queue_pairs.back();
dead_queue_pairs.pop_back();
if (!conn) {
ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
} else {
- ldout(cct, 0) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
+ ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
conn->fault();
erase_qpn(qpn);
}
} else {
- ldout(cct, 0) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt
+ ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt
<< " evt: " << ibv_event_type_str(async_event.event_type)
<< dendl;
}
}
}
-
void RDMADispatcher::polling()
{
static int MAX_COMPLETIONS = 32;
ibv_wc wc[MAX_COMPLETIONS];
std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
- int i, n;
- while (!done) {
- n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
+ std::vector<ibv_wc> tx_cqe;
+ RDMAWorker* worker;
+ ldout(cct, 20) << __func__ << " going to poll rx cq:" << rx_cq << dendl;
+ RDMAConnectedSocketImpl *conn = nullptr;
+
+ while (true) {
+ int n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
if (!n) {
// NOTE: Has TX just transitioned to idle? We should do it when idle!
// It's now safe to delete queue pairs (see comment by declaration
// Additionally, don't delete qp while outstanding_buffers isn't empty,
// because we need to check qp's state before sending
if (!inflight.load()) {
+ Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
while (!dead_queue_pairs.empty()) {
ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl;
delete dead_queue_pairs.back();
dead_queue_pairs.pop_back();
}
}
- handle_async_event();
+ // handle_async_event();
+ if (done && !inflight)
+ break;
continue;
}
ldout(cct, 20) << __func__ << " pool completion queue got " << n
<< " responses."<< dendl;
- for (i = 0; i < n; ++i) {
+ Mutex::Locker l(lock);//make sure connected socket alive when pass wc
+ for (int i = 0; i < n; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
- ldout(cct, 20) << __func__ << " got chunk=" << response->wr_id << " qp:" << wc[i].qp_num << dendl;
if (response->status != IBV_WC_SUCCESS) {
- lderr(cct) << __func__ << " work request returned error for buffer(" << response->wr_id
- << ") status(" << response->status << ":"
- << ib->wc_status_to_string(response->status) << dendl;
- ib->post_chunk(chunk);
+ ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
+ << ") status(" << response->status << ":"
+ << ib->wc_status_to_string(response->status) << dendl;
+ ib->recall_chunk(chunk);
+ conn = get_conn_lockless(response->qp_num);
+ if (conn && conn->is_connected())
+ conn->fault();
+ notify_pending_workers();
continue;
}
- RDMAConnectedSocketImpl *conn = get_conn_by_qp(response->qp_num);
+ if (wc[i].opcode == IBV_WC_SEND) {
+ tx_cqe.push_back(wc[i]);
+ ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl;
+ continue;
+ }
+ ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
+ conn = get_conn_lockless(response->qp_num);
if (!conn) {
- // discard buffer
- ldout(cct, 0) << __func__ << " missing qp_num " << response->qp_num << ", discard bd "
- << chunk << dendl;
+ int ret = ib->recall_chunk(chunk);
+ ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << ret << dendl;
continue;
}
polled[conn].push_back(*response);
for (auto &&i : polled)
i.first->pass_wc(std::move(i.second));
polled.clear();
+ if (!tx_cqe.empty()) {
+ worker = get_worker_from_list();
+ if (worker == nullptr)
+ worker = dynamic_cast<RDMAWorker*>(stack->get_worker());
+ worker->pass_wc(std::move(tx_cqe));
+ tx_cqe.clear();
+ }
}
}
+
+void RDMADispatcher::notify_pending_workers() {
+ Mutex::Locker l(w_lock);
+ if (pending_workers.empty())
+ return ;
+ pending_workers.front()->pass_wc(std::move(vector<ibv_wc>()));
+ pending_workers.pop_front();
+}
#include "Infiniband.h"
class RDMAConnectedSocketImpl;
+class RDMAServerSocketImpl;
class RDMAStack;
+class RDMAWorker;
class RDMADispatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
EventCallbackRef async_handler;
bool done = false;
Mutex lock; // protect `qp_conns
+ Mutex w_lock; // protect pending workers
// qp_num -> InfRcConnection
// The main usage of `qp_conns` is looking up connection by qp_num,
// so the lifecycle of element in `qp_conns` is the lifecycle of qp.
/// save them in this vector and delete them at a safe time, when there are
/// no outstanding transmit buffers to be lost.
std::vector<QueuePair*> dead_queue_pairs;
-
+ Mutex qp_lock;//for csi reuse qp
+ ceph::unordered_map<RDMAWorker*, int> workers;;
+ std::list<RDMAWorker*> pending_workers;
+ RDMAStack* stack;
class C_handle_cq_async : public EventCallback {
RDMADispatcher *dispatcher;
public:
};
public:
- std::atomic_ulong inflight;
- explicit RDMADispatcher(CephContext* cct, Infiniband* i)
- : t(&RDMADispatcher::polling, this), cct(cct), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock") {
+ std::atomic<uint64_t> inflight = {0};
+ explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
+ : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
+ w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s) {
rx_cc = ib->create_comp_channel();
assert(rx_cc);
rx_cq = ib->create_comp_queue(rx_cc);
assert(rx_cq);
+ t = std::thread(&RDMADispatcher::polling, this);
}
virtual ~RDMADispatcher();
-
void handle_async_event();
void polling();
int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) {
qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
return fd;
}
+ int register_worker(RDMAWorker* w) {
+ int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+ assert(fd >= 0);
+ Mutex::Locker l(w_lock);
+ workers[w] = fd;
+ return fd;
+ }
+ void pending_buffers(RDMAWorker* w) {
+ Mutex::Locker l(w_lock);
+ pending_workers.push_back(w);
+ }
+ RDMAStack* get_stack() {
+ return stack;
+ }
+ RDMAWorker* get_worker_from_list() {
+ Mutex::Locker l(w_lock);
+ if (pending_workers.empty())
+ return nullptr;
+ else {
+ RDMAWorker* w = pending_workers.front();
+ pending_workers.pop_front();
+ return w;
+ }
+ }
RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp) {
Mutex::Locker l(lock);
auto it = qp_conns.find(qp);
if (it == qp_conns.end())
- return NULL;
+ return nullptr;
+ if (it->second.first->is_dead())
+ return nullptr;
+ return it->second.second;
+ }
+ RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp) {
+ auto it = qp_conns.find(qp);
+ if (it == qp_conns.end())
+ return nullptr;
if (it->second.first->is_dead())
- return NULL;
+ return nullptr;
return it->second.second;
}
void erase_qpn(uint32_t qpn) {
dead_queue_pairs.push_back(it->second.first);
qp_conns.erase(it);
}
+ Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
+ void notify_pending_workers();
};
typedef Infiniband::CompletionChannel CompletionChannel;
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::MemoryManager MemoryManager;
+ typedef std::vector<Chunk*>::iterator ChunkIter;
RDMAStack *stack;
Infiniband *infiniband;
- CompletionQueue *tx_cq; // common completion queue for all transmits
- CompletionChannel *tx_cc;
EventCallbackRef tx_handler;
MemoryManager *memory_manager;
std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
-
+ RDMADispatcher* dispatcher;
+ int notify_fd = -1;
+ Mutex lock;
+ std::vector<ibv_wc> wc;
+ bool pended;
class C_handle_cq_tx : public EventCallback {
RDMAWorker *worker;
public:
public:
explicit RDMAWorker(CephContext *c, unsigned i);
virtual ~RDMAWorker() {
- tx_cc->ack_events();
- delete tx_cq;
- delete tx_cc;
delete tx_handler;
+ if (notify_fd >= 0)
+ ::close(notify_fd);
+ }
+ void notify() {
+ uint64_t i = 1;
+ assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
+ }
+ void pass_wc(std::vector<ibv_wc> &&v) {
+ Mutex::Locker l(lock);
+ if (wc.empty())
+ wc = std::move(v);
+ else
+ wc.insert(wc.end(), v.begin(), v.end());
+ notify();
+ }
+ void get_wc(std::vector<ibv_wc> &w) {
+ Mutex::Locker l(lock);
+ if (wc.empty())
+ return ;
+ w.swap(wc);
}
-
virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
virtual void initialize() override;
- CompletionQueue* get_tx_cq() { return tx_cq; }
RDMAStack *get_stack() {
return stack;
}
int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
int post_tx_buffer(std::vector<Chunk*> &chunks);
+ void add_pending_conn(RDMAConnectedSocketImpl* o) {
+ pending_sent_conns.push_back(o);
+ if (!pended) {
+ dispatcher->pending_buffers(this);
+ pended = true;
+ }
+ }
void remove_pending_conn(RDMAConnectedSocketImpl *o) {
pending_sent_conns.remove(o);
}
void handle_tx_event();
+ void set_ib(Infiniband* ib) {
+ infiniband = ib;
+ }
+ void set_stack(RDMAStack *s) {
+ stack = s;
+ }
};
class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
RDMADispatcher* dispatcher;
RDMAWorker* worker;
std::vector<Chunk*> buffers;
- int notify_fd;
+ int notify_fd = -1;
bufferlist pending_bl;
Mutex lock;
std::vector<ibv_wc> wc;
+ bool is_server;
+ RDMAServerSocketImpl* ssi;
+ EventCallbackRef con_handler;
+ int tcp_fd = -1;
+ bool active;// qp is active ?
+ bool detached;
void notify() {
uint64_t i = 1;
public:
RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
- RDMAWorker *w, IBSYNMsg im = IBSYNMsg())
- : cct(cct), peer_msg(im), connected(0), error(0), infiniband(ib),
- dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock") {
- qp = infiniband->create_queue_pair(w->get_tx_cq(), IBV_QPT_RC);
+ RDMAWorker *w)
+ : cct(cct), connected(0), error(0), infiniband(ib),
+ dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
+ is_server(false), con_handler(new C_handle_connection(this)),
+ active(false), detached(false) {
+ qp = infiniband->create_queue_pair(s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC);
my_msg.qpn = qp->get_local_qp_number();
my_msg.psn = qp->get_initial_psn();
my_msg.lid = infiniband->get_lid();
+ my_msg.peer_qpn = 0;
my_msg.gid = infiniband->get_gid();
notify_fd = dispatcher->register_qp(qp, this);
}
+
virtual ~RDMAConnectedSocketImpl() {
worker->remove_pending_conn(this);
+ dispatcher->erase_qpn(my_msg.qpn);
+ cleanup();
+ if (notify_fd >= 0)
+ ::close(notify_fd);
+ if (tcp_fd >= 0)
+ ::close(tcp_fd);
+ error = ECONNRESET;
+ Mutex::Locker l(lock);
+ for (unsigned i=0; i < wc.size(); ++i)
+ infiniband->recall_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
+ for (unsigned i=0; i < buffers.size(); ++i)
+ infiniband->recall_chunk(buffers[i]);
}
void pass_wc(std::vector<ibv_wc> &&v) {
wc.insert(wc.end(), v.begin(), v.end());
notify();
}
+
void get_wc(std::vector<ibv_wc> &w) {
Mutex::Locker l(lock);
if (wc.empty())
return ;
w.swap(wc);
}
+
virtual int is_connected() override {
return connected;
}
+
virtual ssize_t read(char* buf, size_t len) override;
virtual ssize_t zero_copy_read(bufferptr &data) override;
virtual ssize_t send(bufferlist &bl, bool more) override;
virtual void shutdown() override {
- if (qp) {
- qp->to_dead();
- qp = NULL;
- }
+ if (!error)
+ fin();
+ error = ECONNRESET;
+ active = false;
}
virtual void close() override {
- if (qp) {
- qp->to_dead();
- qp = NULL;
- }
+ if (!error)
+ fin();
+ error = ECONNRESET;
+ active = false;
}
virtual int fd() const override {
return notify_fd;
}
void fault() {
- if (qp) {
+ /*if (qp) {
qp->to_dead();
qp = NULL;
- }
+ }*/
error = ECONNRESET;
+ connected = 1;
notify();
}
+ const char* get_qp_state() {
+ return Infiniband::qp_state_string(qp->get_state());
+ }
ssize_t submit(bool more);
int activate();
- IBSYNMsg get_my_msg() { return my_msg; }
- void set_peer_msg(IBSYNMsg m) { peer_msg = m ;}
+ void fin();
+ void handle_connection();
+ void cleanup();
+ void set_accept_fd(int sd) {
+ tcp_fd = sd;
+ is_server = true;
+ worker->center.submit_to(worker->center.get_id(), [this]() {
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+ }, true);
+ }
+ int try_connect(const entity_addr_t&, const SocketOptions &opt);
+ class C_handle_connection : public EventCallback {
+ RDMAConnectedSocketImpl *csi;
+ bool active;
+ public:
+ C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
+ void do_request(int fd) {
+ if (active)
+ csi->handle_connection();
+ }
+ void close() {
+ active = false;
+ }
+ };
};
-
class RDMAServerSocketImpl : public ServerSocketImpl {
CephContext *cct;
NetHandler net;
RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
: cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) {}
int listen(entity_addr_t &sa, const SocketOptions &opt);
- virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out) override;
+ virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
virtual void abort_accept() override {
if (server_setup_socket >= 0)
::close(server_setup_socket);
virtual int fd() const override {
return server_setup_socket;
}
+ int get_fd() { return server_setup_socket; }
};
-
class RDMAStack : public NetworkStack {
vector<std::thread> threads;
RDMADispatcher *dispatcher;
public:
explicit RDMAStack(CephContext *cct, const string &t);
- virtual ~RDMAStack() { delete dispatcher; }
- virtual bool support_zero_copy_read() const override { return true; }
- //virtual bool support_local_listen_table() const { return true; }
+ virtual ~RDMAStack() {
+ delete dispatcher;
+ }
+ virtual bool support_zero_copy_read() const override { return false; }
+ virtual bool nonblock_connect_need_writable_event() const { return false; }
virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override {
threads.resize(i+1);
}
if (is_my_accept) {
- r = bind_socket.accept(&srv_socket, options, &cli_addr);
+ r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
ASSERT_EQ(0, r);
ASSERT_TRUE(srv_socket.fd() > 0);
}
center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb);
r = cli_socket2.is_connected();
if (r == 0) {
- ASSERT_FALSE(cb.poll(500));
+ cb.poll(500);
r = cli_socket2.is_connected();
}
ASSERT_TRUE(r != 1);
ConnectedSocket srv_socket, cli_socket;
if (bind_socket) {
- r = bind_socket.accept(&srv_socket, options, &cli_addr);
+ r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
ASSERT_EQ(-EAGAIN, r);
}
cb.poll(500);
ConnectedSocket srv_socket2;
do {
- r = bind_socket.accept(&srv_socket2, options, &cli_addr);
+ r = bind_socket.accept(&srv_socket2, options, &cli_addr, worker);
usleep(100);
} while (r == -EAGAIN && !*accepted_p);
if (r == 0)
if (bind_socket) {
do {
- r = bind_socket.accept(&srv_socket, options, &cli_addr);
+ r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
usleep(100);
} while (r == -EAGAIN && !*accepted_p);
if (r == 0)
TEST_P(NetworkWorkerTest, ComplexTest) {
entity_addr_t bind_addr;
+ std::atomic_bool listen_done(false);
+ std::atomic_bool *listen_p = &listen_done;
std::atomic_bool accepted(false);
std::atomic_bool *accepted_p = &accepted;
std::atomic_bool done(false);
std::atomic_bool *done_p = &done;
ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
- exec_events([this, bind_addr, accepted_p, done_p](Worker *worker) mutable {
+ exec_events([this, bind_addr, listen_p, accepted_p, done_p](Worker *worker) mutable {
entity_addr_t cli_addr;
EventCenter *center = &worker->center;
SocketOptions options;
if (stack->support_local_listen_table() || worker->id == 0) {
r = worker->listen(bind_addr, options, &bind_socket);
ASSERT_EQ(0, r);
+ *listen_p = true;
}
ConnectedSocket cli_socket, srv_socket;
if (worker->id == 1) {
- r = worker->connect(bind_addr, options, &cli_socket);
- ASSERT_EQ(0, r);
+ while (!*listen_p) {
+ usleep(50);
+ r = worker->connect(bind_addr, options, &cli_socket);
+ ASSERT_EQ(0, r);
+ }
}
if (bind_socket) {
C_poll cb(center);
center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
- if (cb.poll(500)) {
- r = bind_socket.accept(&srv_socket, options, &cli_addr);
- ASSERT_EQ(0, r);
- *accepted_p = true;
+ int count = 3;
+ while (count--) {
+ if (cb.poll(500)) {
+ r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+ ASSERT_EQ(0, r);
+ *accepted_p = true;
+ break;
+ }
}
ASSERT_TRUE(*accepted_p);
center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
StressFactory *factory;
ServerSocket bind_socket;
ThreadData *t_data;
+ Worker *worker;
public:
- C_accept(StressFactory *f, ServerSocket s, ThreadData *data)
- : factory(f), bind_socket(std::move(s)), t_data(data) {}
+ C_accept(StressFactory *f, ServerSocket s, ThreadData *data, Worker *w)
+ : factory(f), bind_socket(std::move(s)), t_data(data), worker(w) {}
void do_request(int id) {
while (true) {
entity_addr_t cli_addr;
ConnectedSocket srv_socket;
SocketOptions options;
- int r = bind_socket.accept(&srv_socket, options, &cli_addr);
+ int r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
if (r == -EAGAIN) {
break;
}
const size_t client_num, queue_depth, max_message_length;
atomic_int message_count, message_left;
entity_addr_t bind_addr;
+ std::atomic_bool already_bind = {false};
bool zero_copy_read;
SocketOptions options;
if (stack->support_local_listen_table() || worker->id == 0) {
r = worker->listen(bind_addr, options, &bind_socket);
ASSERT_EQ(0, r);
+ already_bind = true;
}
+ while (!already_bind)
+ usleep(50);
C_accept *accept_handler = nullptr;
int bind_fd = 0;
if (bind_socket) {
bind_fd = bind_socket.fd();
- accept_handler = new C_accept(this, std::move(bind_socket), &t_data);
+ accept_handler = new C_accept(this, std::move(bind_socket), &t_data, worker);
ASSERT_EQ(0, worker->center.create_file_event(
bind_fd, EVENT_READABLE, accept_handler));
}