Infiniband::Infiniband(CephContext *cct)
- : cct(cct), lock("IB lock")
+ : device_list(new DeviceList(cct, this))
{
}
delete device_list;
}
-void Infiniband::init()
-{
- Mutex::Locker l(lock);
-
- if (initialized)
- return;
-
- device_list = new DeviceList(cct, this);
- initialized = true;
-
- dispatcher->polling_start();
-}
-
void Infiniband::set_dispatcher(RDMADispatcher *d)
{
assert(!d ^ !dispatcher);
perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
+
+ cct->register_fork_watcher(this);
}
void RDMADispatcher::polling_start()
erase_qpn_lockless(qpn);
}
+void RDMADispatcher::handle_pre_fork()
+{
+ polling_stop();
+ done = false;
+
+ global_infiniband->handle_pre_fork();
+
+ global_infiniband.destroy();
+}
+
+void RDMADispatcher::handle_post_fork()
+{
+ if (!global_infiniband) {
+ global_infiniband.construct(cct);
+ global_infiniband->set_dispatcher(this);
+ }
+
+ polling_start();
+}
+
void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
{
std::vector<Chunk*> tx_chunks;
int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
- global_infiniband->init();
-
auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
- global_infiniband->init();
-
RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
int r = p->try_connect(addr, opts);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
global_infiniband->set_dispatcher(dispatcher);
+ dispatcher->polling_start();
unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
};
-class RDMADispatcher {
+class RDMADispatcher : public CephContext::ForkWatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::QueuePair QueuePair;
void erase_qpn_lockless(uint32_t qpn);
void erase_qpn(uint32_t qpn);
void notify_pending_workers();
+ virtual void handle_pre_fork() override;
+ virtual void handle_post_fork() override;
void handle_tx_event(Device *ibdev, ibv_wc *cqe, int n);
void post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks);