#include "Infiniband.h"
#include "common/errno.h"
#include "common/debug.h"
+#include "RDMAStack.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
}
-Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num): device_list(cct)
+Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
+ : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
{
- device = device_list.get_device(device_name.c_str());
+}
+
+void Infiniband::init()
+{
+ Mutex::Locker l(lock);
+
+ if (initialized)
+ return;
+
+ device_list = new DeviceList(cct);
+ initialized = true;
+
+ device = device_list->get_device(device_name.c_str());
device->binding_port(cct, port_num);
assert(device);
ib_physical_port = device->active_port->get_port_num();
srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
post_channel_cluster();
+
+ dispatcher->polling_start();
}
Infiniband::~Infiniband()
{
+ if (!initialized)
+ return;
+
+ if (dispatcher)
+ dispatcher->polling_stop();
+
assert(ibv_destroy_srq(srq) == 0);
delete memory_manager;
delete pd;
}
+void Infiniband::set_dispatcher(RDMADispatcher *d)
+{
+ assert(!d ^ !dispatcher);
+
+ dispatcher = d;
+}
+
/**
* Create a shared receive queue. This basically wraps the verbs call.
*
};
+class RDMADispatcher;
+
class Infiniband {
public:
class ProtectionDomain {
ibv_srq* srq; // shared receive work queue
Device *device;
ProtectionDomain *pd;
- DeviceList device_list;
+ DeviceList *device_list = nullptr;
+ RDMADispatcher *dispatcher = nullptr;
void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
+ CephContext *cct;
+ Mutex lock;
+ bool initialized = false;
+ const std::string &device_name;
+ uint8_t port_num;
public:
explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
~Infiniband();
+ void init();
+
+ void set_dispatcher(RDMADispatcher *d);
class CompletionChannel {
static const uint32_t MAX_ACK_EVENT = 5000;
RDMADispatcher::~RDMADispatcher()
{
done = true;
- t.join();
+ polling_stop();
ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
assert(qp_conns.empty());
delete tx_cc;
delete rx_cc;
delete async_handler;
+
+ global_infiniband->set_dispatcher(nullptr);
}
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
: cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
w_lock("RDMADispatcher::for worker pending list"), stack(s)
{
- tx_cc = global_infiniband->create_comp_channel(c);
- assert(tx_cc);
- rx_cc = global_infiniband->create_comp_channel(c);
- assert(rx_cc);
- tx_cq = global_infiniband->create_comp_queue(c, tx_cc);
- assert(tx_cq);
- rx_cq = global_infiniband->create_comp_queue(c, rx_cc);
- assert(rx_cq);
-
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
+}
+
+void RDMADispatcher::polling_start()
+{
+ tx_cc = global_infiniband->create_comp_channel(cct);
+ assert(tx_cc);
+ rx_cc = global_infiniband->create_comp_channel(cct);
+ assert(rx_cc);
+ tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
+ assert(tx_cq);
+ rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
+ assert(rx_cq);
t = std::thread(&RDMADispatcher::polling, this);
- cct->register_fork_watcher(this);
+}
+
+void RDMADispatcher::polling_stop()
+{
+ if (t.joinable())
+ t.join();
}
void RDMADispatcher::handle_async_event()
erase_qpn_lockless(qpn);
}
-void RDMADispatcher::handle_pre_fork()
-{
- done = true;
- t.join();
- done = false;
-
- tx_cc->ack_events();
- rx_cc->ack_events();
- delete tx_cq;
- delete rx_cq;
- delete tx_cc;
- delete rx_cc;
-
- global_infiniband.destroy();
-}
-
-void RDMADispatcher::handle_post_fork()
-{
- if (!global_infiniband)
- global_infiniband.construct(
- cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
-
- tx_cc = global_infiniband->create_comp_channel(cct);
- assert(tx_cc);
- rx_cc = global_infiniband->create_comp_channel(cct);
- assert(rx_cc);
- tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
- assert(tx_cq);
- rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
- assert(rx_cq);
-
- t = std::thread(&RDMADispatcher::polling, this);
-}
-
void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
{
std::vector<Chunk*> tx_chunks;
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
+ global_infiniband->init();
+
auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
+ global_infiniband->init();
+
RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
int r = p->try_connect(addr, opts);
cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
+ global_infiniband->set_dispatcher(dispatcher);
+
unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
};
-class RDMADispatcher : public CephContext::ForkWatcher {
+class RDMADispatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::QueuePair QueuePair;
explicit RDMADispatcher(CephContext* c, RDMAStack* s);
virtual ~RDMADispatcher();
void handle_async_event();
+
+ void polling_start();
+ void polling_stop();
void polling();
int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
void make_pending_worker(RDMAWorker* w) {
Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
void notify_pending_workers();
- virtual void handle_pre_fork() override;
- virtual void handle_post_fork() override;
void handle_tx_event(ibv_wc *cqe, int n);
void post_tx_buffer(std::vector<Chunk*> &chunks);