void RDMAWorker::initialize()
{
- dispatcher = stack->get_dispatcher();
- notify_fd = dispatcher->register_worker(this);
- center.create_file_event(notify_fd, EVENT_READABLE, tx_handler);
- memory_manager = infiniband->get_memory_manager();
+ if (!dispatcher) {
+ dispatcher = stack->get_dispatcher();
+ notify_fd = dispatcher->register_worker(this);
+ center.create_file_event(notify_fd, EVENT_READABLE, tx_handler);
+ memory_manager = infiniband->get_memory_manager();
+ }
}
int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
class RDMAStack;
class RDMAWorker;
-class RDMADispatcher {
+class RDMADispatcher : public CephContext::ForkWatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::QueuePair QueuePair;
rx_cq = ib->create_comp_queue(rx_cc);
assert(rx_cq);
t = std::thread(&RDMADispatcher::polling, this);
+ cct->register_fork_watcher(this);
}
virtual ~RDMADispatcher();
void handle_async_event();
}
Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
void notify_pending_workers();
+ virtual void handle_pre_fork() override {
+ done = true;
+ t.join();
+ done = false;
+ }
+ virtual void handle_post_fork() override {
+ t = std::thread(&RDMADispatcher::polling, this);
+ }
};
EventCallbackRef tx_handler;
MemoryManager *memory_manager;
std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
- RDMADispatcher* dispatcher;
+ RDMADispatcher* dispatcher = nullptr;
int notify_fd = -1;
Mutex lock;
std::vector<ibv_wc> wc;