static Infiniband* global_infiniband;
-RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
- : Worker(c, i), stack(nullptr), infiniband(NULL),
- tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false)
-{
-}
-
-int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
-{
- auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa);
- int r = p->listen(sa, opt);
- if (r < 0) {
- delete p;
- return r;
- }
-
- *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
- return 0;
-}
-
-int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
-{
- RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this);
- int r = p->try_connect(addr, opts);
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
- return r;
- }
- std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
- *socket = ConnectedSocket(std::move(csi));
- return 0;
-}
-
-
-RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
-{
- if (!global_infiniband)
- global_infiniband = new Infiniband(
- cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
- ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
- dispatcher = new RDMADispatcher(cct, global_infiniband, this);
- unsigned num = get_num_worker();
- for (unsigned i = 0; i < num; ++i) {
- RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
- w->set_ib(global_infiniband);
- w->set_stack(this);
- }
- ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
-}
-
-void RDMAWorker::initialize()
-{
- if (!dispatcher) {
- dispatcher = stack->get_dispatcher();
- notify_fd = dispatcher->register_worker(this);
- center.create_file_event(notify_fd, EVENT_READABLE, tx_handler);
- memory_manager = infiniband->get_memory_manager();
- }
-}
-
-int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
-{
- int r = infiniband->get_tx_buffers(c, bytes);
- if (r > 0) {
- stack->get_dispatcher()->inflight += c.size();
- ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
- return r;
- }
- assert(r == 0);
-
- if (pending_sent_conns.back() != o)
- pending_sent_conns.push_back(o);
- dispatcher->pending_buffers(this);
- return r;
-}
-
-/**
- * Add the given Chunks to the given free queue.
- *
- * \param[in] chunks
- * The Chunks to enqueue.
- * \return
- * 0 if success or -1 for failure
- */
-int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
-{
- if (chunks.empty())
- return 0;
-
- stack->get_dispatcher()->inflight -= chunks.size();
- memory_manager->return_tx(chunks);
- ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
-
- pended = false;
- std::set<RDMAConnectedSocketImpl*> done;
- while (!pending_sent_conns.empty()) {
- RDMAConnectedSocketImpl *o = pending_sent_conns.front();
- if (done.count(o) == 0) {
- done.insert(o);
- } else {
- pending_sent_conns.pop_front();
- continue;
- }
- ssize_t r = o->submit(false);
- ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
- if (r < 0) {
- if (r == -EAGAIN)
- break;
- o->fault();
- }
- pending_sent_conns.pop_front();
- }
- return 0;
-}
-
-void RDMAWorker::handle_tx_event()
-{
- std::vector<Chunk*> tx_chunks;
- std::vector<ibv_wc> cqe;
- get_wc(cqe);
-
- for (size_t i = 0; i < cqe.size(); ++i) {
- ibv_wc* response = &cqe[i];
- Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
- ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;
-
- if (response->status != IBV_WC_SUCCESS) {
- if (response->status == IBV_WC_RETRY_EXC_ERR) {
- ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
- } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
- ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
- << response->qp_num << " should be down while this WR=" << response->wr_id
- << " still in flight." << dendl;
- } else {
- ldout(cct, 1) << __func__ << " send work request returned error for buffer("
- << response->wr_id << ") status(" << response->status << "): "
- << infiniband->wc_status_to_string(response->status) << dendl;
- }
- RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
- if (conn) {
- ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
- conn->fault();
- } else {
- ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
- }
- }
-
- //assert(memory_manager->is_tx_chunk(chunk));
- if (memory_manager->is_tx_chunk(chunk)) {
- tx_chunks.push_back(chunk);
- } else {
- ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin
- }
- }
-
- post_tx_buffer(tx_chunks);
-
- ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
- dispatcher->notify_pending_workers();
-}
-
RDMADispatcher::~RDMADispatcher()
{
done = true;
delete async_handler;
}
+RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
+ : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
+ w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s)
+{
+ rx_cc = ib->create_comp_channel(c);
+ assert(rx_cc);
+ rx_cq = ib->create_comp_queue(c, rx_cc);
+ assert(rx_cq);
+ t = std::thread(&RDMADispatcher::polling, this);
+ cct->register_fork_watcher(this);
+}
+
void RDMADispatcher::handle_async_event()
{
ldout(cct, 20) << __func__ << dendl;
pending_workers.front()->pass_wc(std::move(vector<ibv_wc>()));
pending_workers.pop_front();
}
+
+int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
+{
+ int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+ assert(fd >= 0);
+ Mutex::Locker l(lock);
+ assert(!qp_conns.count(qp->get_local_qp_number()));
+ qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
+ return fd;
+}
+
+int RDMADispatcher::register_worker(RDMAWorker* w)
+{
+ int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+ assert(fd >= 0);
+ Mutex::Locker l(w_lock);
+ workers[w] = fd;
+ return fd;
+}
+
+void RDMADispatcher::pending_buffers(RDMAWorker* w)
+{
+ Mutex::Locker l(w_lock);
+ pending_workers.push_back(w);
+}
+
+RDMAWorker* RDMADispatcher::get_worker_from_list()
+{
+ Mutex::Locker l(w_lock);
+ if (pending_workers.empty())
+ return nullptr;
+ else {
+ RDMAWorker* w = pending_workers.front();
+ pending_workers.pop_front();
+ return w;
+ }
+}
+
+RDMAConnectedSocketImpl* RDMADispatcher::get_conn_by_qp(uint32_t qp)
+{
+ Mutex::Locker l(lock);
+ auto it = qp_conns.find(qp);
+ if (it == qp_conns.end())
+ return nullptr;
+ if (it->second.first->is_dead())
+ return nullptr;
+ return it->second.second;
+}
+
+RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
+{
+ auto it = qp_conns.find(qp);
+ if (it == qp_conns.end())
+ return nullptr;
+ if (it->second.first->is_dead())
+ return nullptr;
+ return it->second.second;
+}
+
+void RDMADispatcher::erase_qpn(uint32_t qpn)
+{
+ Mutex::Locker l(lock);
+ auto it = qp_conns.find(qpn);
+ if (it == qp_conns.end())
+ return ;
+ dead_queue_pairs.push_back(it->second.first);
+ qp_conns.erase(it);
+}
+
+void RDMADispatcher::handle_pre_fork()
+{
+ done = true;
+ t.join();
+ done = false;
+}
+
+void RDMADispatcher::handle_post_fork()
+{
+ t = std::thread(&RDMADispatcher::polling, this);
+}
+
+
+RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
+ : Worker(c, i), stack(nullptr), infiniband(NULL),
+ tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false)
+{
+}
+
+RDMAWorker::~RDMAWorker()
+{
+ delete tx_handler;
+ if (notify_fd >= 0)
+ ::close(notify_fd);
+}
+
+void RDMAWorker::initialize()
+{
+ if (!dispatcher) {
+ dispatcher = stack->get_dispatcher();
+ notify_fd = dispatcher->register_worker(this);
+ center.create_file_event(notify_fd, EVENT_READABLE, tx_handler);
+ memory_manager = infiniband->get_memory_manager();
+ }
+}
+
+void RDMAWorker::notify()
+{
+ uint64_t i = 1;
+ assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
+}
+
+void RDMAWorker::pass_wc(std::vector<ibv_wc> &&v)
+{
+ Mutex::Locker l(lock);
+ if (wc.empty())
+ wc = std::move(v);
+ else
+ wc.insert(wc.end(), v.begin(), v.end());
+ notify();
+}
+
+void RDMAWorker::add_pending_conn(RDMAConnectedSocketImpl* o)
+{
+ pending_sent_conns.push_back(o);
+ if (!pended) {
+ dispatcher->pending_buffers(this);
+ pended = true;
+ }
+}
+
+void RDMAWorker::get_wc(std::vector<ibv_wc> &w)
+{
+ Mutex::Locker l(lock);
+ if (wc.empty())
+ return ;
+ w.swap(wc);
+}
+
+int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
+{
+ auto p = new RDMAServerSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this, sa);
+ int r = p->listen(sa, opt);
+ if (r < 0) {
+ delete p;
+ return r;
+ }
+
+ *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
+ return 0;
+}
+
+int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
+{
+ RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, infiniband, get_stack()->get_dispatcher(), this);
+ int r = p->try_connect(addr, opts);
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
+ return r;
+ }
+ std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
+ *socket = ConnectedSocket(std::move(csi));
+ return 0;
+}
+
+int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
+{
+ int r = infiniband->get_tx_buffers(c, bytes);
+ if (r > 0) {
+ stack->get_dispatcher()->inflight += c.size();
+ ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
+ return r;
+ }
+ assert(r == 0);
+
+ if (pending_sent_conns.back() != o)
+ pending_sent_conns.push_back(o);
+ dispatcher->pending_buffers(this);
+ return r;
+}
+
+/**
+ * Add the given Chunks to the given free queue.
+ *
+ * \param[in] chunks
+ * The Chunks to enqueue.
+ * \return
+ * 0 if success or -1 for failure
+ */
+int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
+{
+ if (chunks.empty())
+ return 0;
+
+ stack->get_dispatcher()->inflight -= chunks.size();
+ memory_manager->return_tx(chunks);
+ ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
+
+ pended = false;
+ std::set<RDMAConnectedSocketImpl*> done;
+ while (!pending_sent_conns.empty()) {
+ RDMAConnectedSocketImpl *o = pending_sent_conns.front();
+ if (done.count(o) == 0) {
+ done.insert(o);
+ } else {
+ pending_sent_conns.pop_front();
+ continue;
+ }
+ ssize_t r = o->submit(false);
+ ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
+ if (r < 0) {
+ if (r == -EAGAIN)
+ break;
+ o->fault();
+ }
+ pending_sent_conns.pop_front();
+ }
+ return 0;
+}
+
+void RDMAWorker::handle_tx_event()
+{
+ std::vector<Chunk*> tx_chunks;
+ std::vector<ibv_wc> cqe;
+ get_wc(cqe);
+
+ for (size_t i = 0; i < cqe.size(); ++i) {
+ ibv_wc* response = &cqe[i];
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;
+
+ if (response->status != IBV_WC_SUCCESS) {
+ if (response->status == IBV_WC_RETRY_EXC_ERR) {
+ ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
+ } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
+ ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
+ << response->qp_num << " should be down while this WR=" << response->wr_id
+ << " still in flight." << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " send work request returned error for buffer("
+ << response->wr_id << ") status(" << response->status << "): "
+ << infiniband->wc_status_to_string(response->status) << dendl;
+ }
+ RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
+ if (conn) {
+ ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
+ conn->fault();
+ } else {
+ ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
+ }
+ }
+
+ //assert(memory_manager->is_tx_chunk(chunk));
+ if (memory_manager->is_tx_chunk(chunk)) {
+ tx_chunks.push_back(chunk);
+ } else {
+ ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin
+ }
+ }
+
+ post_tx_buffer(tx_chunks);
+
+ ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
+ dispatcher->notify_pending_workers();
+}
+
+
+RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
+{
+ if (!global_infiniband)
+ global_infiniband = new Infiniband(
+ cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
+ ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
+ dispatcher = new RDMADispatcher(cct, global_infiniband, this);
+ unsigned num = get_num_worker();
+ for (unsigned i = 0; i < num; ++i) {
+ RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
+ w->set_ib(global_infiniband);
+ w->set_stack(this);
+ }
+ ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
+}
+
+RDMAStack::~RDMAStack()
+{
+ delete dispatcher;
+}
+
+void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
+{
+ threads.resize(i+1);
+ threads[i] = std::move(std::thread(func));
+}
+
+void RDMAStack::join_worker(unsigned i)
+{
+ assert(threads.size() > i && threads[i].joinable());
+ threads[i].join();
+}
ceph::unordered_map<RDMAWorker*, int> workers;;
std::list<RDMAWorker*> pending_workers;
RDMAStack* stack;
+
class C_handle_cq_async : public EventCallback {
RDMADispatcher *dispatcher;
public:
};
public:
- std::atomic<uint64_t> inflight = {0};
- explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
- : cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
- w_lock("RDMADispatcher::for worker pending list"), qp_lock("for qp lock"), stack(s) {
- rx_cc = ib->create_comp_channel(c);
- assert(rx_cc);
- rx_cq = ib->create_comp_queue(c, rx_cc);
- assert(rx_cq);
- t = std::thread(&RDMADispatcher::polling, this);
- cct->register_fork_watcher(this);
- }
+ explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s);
virtual ~RDMADispatcher();
void handle_async_event();
void polling();
- int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi) {
- int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
- assert(fd >= 0);
- Mutex::Locker l(lock);
- assert(!qp_conns.count(qp->get_local_qp_number()));
- qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
- return fd;
- }
- int register_worker(RDMAWorker* w) {
- int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
- assert(fd >= 0);
- Mutex::Locker l(w_lock);
- workers[w] = fd;
- return fd;
- }
- void pending_buffers(RDMAWorker* w) {
- Mutex::Locker l(w_lock);
- pending_workers.push_back(w);
- }
- RDMAStack* get_stack() {
- return stack;
- }
- RDMAWorker* get_worker_from_list() {
- Mutex::Locker l(w_lock);
- if (pending_workers.empty())
- return nullptr;
- else {
- RDMAWorker* w = pending_workers.front();
- pending_workers.pop_front();
- return w;
- }
- }
- RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp) {
- Mutex::Locker l(lock);
- auto it = qp_conns.find(qp);
- if (it == qp_conns.end())
- return nullptr;
- if (it->second.first->is_dead())
- return nullptr;
- return it->second.second;
- }
- RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp) {
- auto it = qp_conns.find(qp);
- if (it == qp_conns.end())
- return nullptr;
- if (it->second.first->is_dead())
- return nullptr;
- return it->second.second;
- }
- void erase_qpn(uint32_t qpn) {
- Mutex::Locker l(lock);
- auto it = qp_conns.find(qpn);
- if (it == qp_conns.end())
- return ;
- dead_queue_pairs.push_back(it->second.first);
- qp_conns.erase(it);
- }
+ int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
+ int register_worker(RDMAWorker* w);
+ void pending_buffers(RDMAWorker* w);
+ RDMAStack* get_stack() { return stack; }
+ RDMAWorker* get_worker_from_list();
+ RDMAConnectedSocketImpl* get_conn_by_qp(uint32_t qp);
+ RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
+ void erase_qpn(uint32_t qpn);
Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
void notify_pending_workers();
- virtual void handle_pre_fork() override {
- done = true;
- t.join();
- done = false;
- }
- virtual void handle_post_fork() override {
- t = std::thread(&RDMADispatcher::polling, this);
- }
+ virtual void handle_pre_fork() override;
+ virtual void handle_post_fork() override;
+
+ std::atomic<uint64_t> inflight = {0};
};
Mutex lock;
std::vector<ibv_wc> wc;
bool pended;
+
class C_handle_cq_tx : public EventCallback {
RDMAWorker *worker;
public:
public:
explicit RDMAWorker(CephContext *c, unsigned i);
- virtual ~RDMAWorker() {
- delete tx_handler;
- if (notify_fd >= 0)
- ::close(notify_fd);
- }
- void notify() {
- uint64_t i = 1;
- assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
- }
- void pass_wc(std::vector<ibv_wc> &&v) {
- Mutex::Locker l(lock);
- if (wc.empty())
- wc = std::move(v);
- else
- wc.insert(wc.end(), v.begin(), v.end());
- notify();
- }
- void get_wc(std::vector<ibv_wc> &w) {
- Mutex::Locker l(lock);
- if (wc.empty())
- return ;
- w.swap(wc);
- }
+ virtual ~RDMAWorker();
+ void notify();
+ void pass_wc(std::vector<ibv_wc> &&v);
+ void get_wc(std::vector<ibv_wc> &w);
virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
virtual void initialize() override;
- RDMAStack *get_stack() {
- return stack;
- }
+ RDMAStack *get_stack() { return stack; }
int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
int post_tx_buffer(std::vector<Chunk*> &chunks);
- void add_pending_conn(RDMAConnectedSocketImpl* o) {
- pending_sent_conns.push_back(o);
- if (!pended) {
- dispatcher->pending_buffers(this);
- pended = true;
- }
- }
- void remove_pending_conn(RDMAConnectedSocketImpl *o) {
- pending_sent_conns.remove(o);
- }
+ void add_pending_conn(RDMAConnectedSocketImpl* o);
+ void remove_pending_conn(RDMAConnectedSocketImpl *o) { pending_sent_conns.remove(o); }
void handle_tx_event();
- void set_ib(Infiniband* ib) {
- infiniband = ib;
- }
- void set_stack(RDMAStack *s) {
- stack = s;
- }
+ void set_ib(Infiniband* ib) { infiniband = ib; }
+ void set_stack(RDMAStack *s) { stack = s; }
};
class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
bool active;// qp is active ?
bool detached;
- void notify() {
- uint64_t i = 1;
- assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
- }
+ void notify();
ssize_t read_buffers(char* buf, size_t len);
int post_work_request(std::vector<Chunk*>&);
public:
RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
- RDMAWorker *w)
- : cct(cct), connected(0), error(0), infiniband(ib),
- dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
- is_server(false), con_handler(new C_handle_connection(this)),
- active(false), detached(false) {
- qp = infiniband->create_queue_pair(
- cct, s->get_rx_cq(), s->get_rx_cq(), IBV_QPT_RC);
- my_msg.qpn = qp->get_local_qp_number();
- my_msg.psn = qp->get_initial_psn();
- my_msg.lid = infiniband->get_lid();
- my_msg.peer_qpn = 0;
- my_msg.gid = infiniband->get_gid();
- notify_fd = dispatcher->register_qp(qp, this);
- }
-
- virtual ~RDMAConnectedSocketImpl() {
- worker->remove_pending_conn(this);
- dispatcher->erase_qpn(my_msg.qpn);
- cleanup();
- if (notify_fd >= 0)
- ::close(notify_fd);
- if (tcp_fd >= 0)
- ::close(tcp_fd);
- error = ECONNRESET;
- Mutex::Locker l(lock);
- for (unsigned i=0; i < wc.size(); ++i)
- infiniband->recall_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
- for (unsigned i=0; i < buffers.size(); ++i)
- infiniband->recall_chunk(buffers[i]);
- }
-
- void pass_wc(std::vector<ibv_wc> &&v) {
- Mutex::Locker l(lock);
- if (wc.empty())
- wc = std::move(v);
- else
- wc.insert(wc.end(), v.begin(), v.end());
- notify();
- }
+ RDMAWorker *w);
+ virtual ~RDMAConnectedSocketImpl();
- void get_wc(std::vector<ibv_wc> &w) {
- Mutex::Locker l(lock);
- if (wc.empty())
- return ;
- w.swap(wc);
- }
-
- virtual int is_connected() override {
- return connected;
- }
+ void pass_wc(std::vector<ibv_wc> &&v);
+ void get_wc(std::vector<ibv_wc> &w);
+ virtual int is_connected() override { return connected; }
virtual ssize_t read(char* buf, size_t len) override;
virtual ssize_t zero_copy_read(bufferptr &data) override;
virtual ssize_t send(bufferlist &bl, bool more) override;
- virtual void shutdown() override {
- if (!error)
- fin();
- error = ECONNRESET;
- active = false;
- }
- virtual void close() override {
- if (!error)
- fin();
- error = ECONNRESET;
- active = false;
- }
- virtual int fd() const override {
- return notify_fd;
- }
- void fault() {
- /*if (qp) {
- qp->to_dead();
- qp = NULL;
- }*/
- error = ECONNRESET;
- connected = 1;
- notify();
- }
- const char* get_qp_state() {
- return Infiniband::qp_state_string(qp->get_state());
- }
+ virtual void shutdown() override;
+ virtual void close() override;
+ virtual int fd() const override { return notify_fd; }
+ void fault();
+ const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
ssize_t submit(bool more);
int activate();
void fin();
void handle_connection();
void cleanup();
- void set_accept_fd(int sd) {
- tcp_fd = sd;
- is_server = true;
- worker->center.submit_to(worker->center.get_id(), [this]() {
- worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
- }, true);
- }
+ void set_accept_fd(int sd);
int try_connect(const entity_addr_t&, const SocketOptions &opt);
+
class C_handle_connection : public EventCallback {
RDMAConnectedSocketImpl *csi;
bool active;
entity_addr_t sa;
public:
- RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
- : cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) {}
+ RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+
int listen(entity_addr_t &sa, const SocketOptions &opt);
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
- virtual void abort_accept() override {
- if (server_setup_socket >= 0)
- ::close(server_setup_socket);
- }
- virtual int fd() const override {
- return server_setup_socket;
- }
+ virtual void abort_accept() override;
+ virtual int fd() const override { return server_setup_socket; }
int get_fd() { return server_setup_socket; }
};
public:
explicit RDMAStack(CephContext *cct, const string &t);
- virtual ~RDMAStack() {
- delete dispatcher;
- }
+ virtual ~RDMAStack();
virtual bool support_zero_copy_read() const override { return false; }
virtual bool nonblock_connect_need_writable_event() const { return false; }
- virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override {
- threads.resize(i+1);
- threads[i] = std::move(std::thread(func));
- }
- virtual void join_worker(unsigned i) override {
- assert(threads.size() > i && threads[i].joinable());
- threads[i].join();
- }
+ virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
+ virtual void join_worker(unsigned i) override;
RDMADispatcher *get_dispatcher() { return dispatcher; }
};