*/
#include "RDMAStack.h"
+class C_handle_connection_established : public EventCallback {
+ RDMAConnectedSocketImpl *csi;
+ bool active = true;
+ public:
+ C_handle_connection_established(RDMAConnectedSocketImpl *w) : csi(w) {}
+ void do_request(uint64_t fd) final {
+ if (active)
+ csi->handle_connection_established();
+ }
+ void close() {
+ active = false;
+ }
+};
+
+
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
: cct(cct), connected(0), error(0), ib(ib),
dispatcher(rdma_dispatcher), worker(w),
is_server(false), con_handler(new C_handle_connection(this)),
+ established_handler(new C_handle_connection_established(this)),
active(false), pending(false)
{
if (!cct->_conf->ms_async_rdma_cm) {
ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
<< opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
NetHandler net(cct);
- tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
+
+ // we construct a socket to transport ib sync message
+ // but we shouldn't block in tcp connecting
+ if (opts.nonblock) {
+ tcp_fd = net.nonblock_connect(peer_addr, opts.connect_bind_addr);
+ } else {
+ tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
+ }
if (tcp_fd < 0) {
return -errno;
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
+ r = 0;
+ if (opts.nonblock) {
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE , established_handler);
+ } else {
+ r = handle_connection_established(false);
+ }
+ return r;
+}
+
+int RDMAConnectedSocketImpl::handle_connection_established(bool need_set_fault) {
+ ldout(cct, 20) << __func__ << " start " << dendl;
+ // delete read event
+ worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
+ if (1 == connected) {
+ ldout(cct, 1) << __func__ << " warnning: logic failed " << dendl;
+ if (need_set_fault) {
+ fault();
+ }
+ return -1;
+ }
+ // send handshake msg to server
qp->get_local_cm_meta().peer_qpn = 0;
- r = qp->send_cm_meta(cct, tcp_fd);
- if (r < 0)
+ int r = qp->send_cm_meta(cct, tcp_fd);
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " send handshake msg failed." << r << dendl;
+ if (need_set_fault) {
+ fault();
+ }
return r;
-
+ }
worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+ ldout(cct, 20) << __func__ << " finish " << dendl;
return 0;
}
if (con_handler && tcp_fd >= 0) {
(static_cast<C_handle_connection*>(con_handler))->close();
worker->center.submit_to(worker->center.get_id(), [this]() {
- worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
+ worker->center.delete_file_event(tcp_fd, EVENT_READABLE | EVENT_WRITABLE);
}, false);
delete con_handler;
con_handler = nullptr;
}
+ if (established_handler) {
+ (static_cast<C_handle_connection_established*>(established_handler))->close();
+ delete established_handler;
+ established_handler = nullptr;
+ }
}
void RDMAConnectedSocketImpl::notify()