Infiniband::Infiniband(CephContext *cct)
- : device_list(new DeviceList(cct, this))
+ : cct(cct), lock("IB lock")
{
}
delete device_list;
}
+void Infiniband::init()
+{
+ Mutex::Locker l(lock);
+
+ if (initialized)
+ return;
+
+ device_list = new DeviceList(cct, this);
+ initialized = true;
+
+ dispatcher->polling_start();
+}
+
void Infiniband::set_dispatcher(RDMADispatcher *d)
{
assert(!d ^ !dispatcher);
perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
-
- cct->register_fork_watcher(this);
}
void RDMADispatcher::polling_start()
erase_qpn_lockless(qpn);
}
-void RDMADispatcher::handle_pre_fork()
-{
- polling_stop();
- done = false;
-
- global_infiniband->handle_pre_fork();
-
- global_infiniband.destroy();
-}
-
-void RDMADispatcher::handle_post_fork()
-{
- if (!global_infiniband) {
- global_infiniband.construct(cct);
- global_infiniband->set_dispatcher(this);
- }
-
- polling_start();
-}
-
void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
{
std::vector<Chunk*> tx_chunks;
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
+ global_infiniband->init();
+
auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
+ global_infiniband->init();
+
RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
int r = p->try_connect(addr, opts);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
global_infiniband->set_dispatcher(dispatcher);
- dispatcher->polling_start();
unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
};
-class RDMADispatcher : public CephContext::ForkWatcher {
+class RDMADispatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::QueuePair QueuePair;
void erase_qpn_lockless(uint32_t qpn);
void erase_qpn(uint32_t qpn);
void notify_pending_workers();
- virtual void handle_pre_fork() override;
- virtual void handle_post_fork() override;
void handle_tx_event(Device *ibdev, ibv_wc *cqe, int n);
void post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks);