class RDMAWorker;
class RDMADispatcher;
+struct RDMAConnTCPInfo {
+ int sd;
+};
+
class RDMAConnTCP : public RDMAConnMgr {
class C_handle_connection : public EventCallback {
RDMAConnTCP *cst;
public:
RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
- Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
+ Infiniband* ib, RDMADispatcher* s, RDMAWorker *w,
+ void *info);
virtual ~RDMAConnTCP();
virtual ostream &print(ostream &out) const override;
}
RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
- RDMAWorker *w)
+ RDMAWorker *w, void *info)
: cct(cct), infiniband(ib), dispatcher(s), worker(w),
error(0), lock("RDMAConnectedSocketImpl::lock")
{
- cmgr = new RDMAConnTCP(cct, this, ib, s, w);
+ cmgr = new RDMAConnTCP(cct, this, ib, s, w, info);
}
QueuePair *RDMAConnectedSocketImpl::create_queue_pair(Device *d, int p)
}
RDMAConnTCP::RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
- Infiniband* ib, RDMADispatcher* s, RDMAWorker *w)
+ Infiniband* ib, RDMADispatcher* s, RDMAWorker *w,
+ void *_info)
: RDMAConnMgr(cct, sock, ib, s, w), con_handler(new C_handle_connection(this))
{
Device *ibdev = ib->get_device(cct->_conf->ms_async_rdma_device_name.c_str());
my_msg.peer_qpn = 0;
my_msg.gid = ibdev->get_gid(ibport);
socket->register_qp(qp);
+
+ if (_info) {
+ RDMAConnTCPInfo *info = (struct RDMAConnTCPInfo *)_info;
+
+ tcp_fd = info->sd;
+ is_server = true;
+ worker->center.submit_to(worker->center.get_id(), [this]() {
+ worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+ }, true);
+ }
}
void RDMAConnectedSocketImpl::register_qp(QueuePair *qp)
cmgr->connected = 1;
notify();
}
-
-void RDMAConnTCP::set_accept_fd(int sd)
-{
- tcp_fd = sd;
- is_server = true;
- worker->center.submit_to(worker->center.get_id(), [this]() {
- worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
- }, true);
-}
virtual void cleanup() = 0;
virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) = 0;
- virtual void set_accept_fd(int sd) = 0;
void post_read();
uint32_t remote_qpn = 0;
RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
- RDMAWorker *w);
+ RDMAWorker *w, void *info = nullptr);
virtual ~RDMAConnectedSocketImpl();
ostream &print(ostream &out) const {
void notify();
QueuePair *create_queue_pair(Device *d, int p);
- void set_accept_fd(int sd) {cmgr->set_accept_fd(sd); };
int try_connect(const entity_addr_t &sa, const SocketOptions &opt) { return cmgr->try_connect(sa, opt); };
};
inline ostream& operator<<(ostream& out, const RDMAConnectedSocketImpl &s)
RDMAConnectedSocketImpl *server;
//Worker* w = dispatcher->get_stack()->get_worker();
- server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w));
- server->set_accept_fd(sd);
+ RDMAConnTCPInfo conn_info = { sd };
+ server = new RDMAConnectedSocketImpl(cct, infiniband, dispatcher, dynamic_cast<RDMAWorker*>(w), &conn_info);
ldout(cct, 20) << __func__ << " accepted a new QP, tcp_fd: " << sd << dendl;
std::unique_ptr<RDMAConnectedSocketImpl> csi(server);
*sock = ConnectedSocket(std::move(csi));