txcq(txcq),
rxcq(rxcq),
initial_psn(lrand48() & PSN_MSK),
- max_send_wr(tx_queue_len),
+ // One extra WR for beacon
+ max_send_wr(tx_queue_len + 1),
max_recv_wr(rx_queue_len),
q_key(q_key),
dead(false)
}
/**
- * Change RC QueuePair into the ERROR state. This is necessary modify
- * the Queue Pair into the Error state and poll all of the relevant
- * Work Completions prior to destroying a Queue Pair.
- * Since destroying a Queue Pair does not guarantee that its Work
- * Completions are removed from the CQ upon destruction. Even if the
- * Work Completions are already in the CQ, it might not be possible to
- * retrieve them. If the Queue Pair is associated with an SRQ, it is
- * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
+ * Switch QP to ERROR state and then post a beacon to be able to drain all
+ * WCEs and then safely destroy QP. See RDMADispatcher::handle_tx_event()
+ * for details.
*
* \return
* -errno if the QueuePair can't switch to ERROR
{
if (dead)
return 0;
- ibv_qp_attr qpa;
- memset(&qpa, 0, sizeof(qpa));
- qpa.qp_state = IBV_QPS_ERR;
- int mask = IBV_QP_STATE;
- int ret = ibv_modify_qp(qp, &qpa, mask);
- if (ret) {
- lderr(cct) << __func__ << " failed to transition to ERROR state: "
- << cpp_strerror(errno) << dendl;
+ if (modify_qp_to_error()) {
+ return -1;
+ }
+ ldout(cct, 20) << __func__ << " force trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn
+ << " bound remote QueuePair, qp number: " << local_cm_meta.peer_qpn << dendl;
+
+ struct ibv_send_wr *bad_wr = nullptr, beacon;
+ memset(&beacon, 0, sizeof(beacon));
+ beacon.wr_id = BEACON_WRID;
+ beacon.opcode = IBV_WR_SEND;
+ beacon.send_flags = IBV_SEND_SIGNALED;
+ if (ibv_post_send(qp, &beacon, &bad_wr)) {
+ lderr(cct) << __func__ << " failed to send a beacon: " << cpp_strerror(errno) << dendl;
return -errno;
}
+ ldout(cct, 20) << __func__ << " trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn << " Beacon sent " << dendl;
dead = true;
- return ret;
+
+ return 0;
}
int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
ceph_abort();
}
- tx_queue_len = device->device_attr.max_qp_wr;
+ // Keep extra one WR for a beacon to indicate all WCEs were consumed
+ tx_queue_len = device->device_attr.max_qp_wr - 1;
if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl;
#define PSN_LEN 24
#define PSN_MSK ((1 << PSN_LEN) - 1)
+#define BEACON_WRID 0xDEADBEEF
+
struct ib_cm_meta_t {
uint16_t lid;
uint32_t local_qpn;
ldout(cct, 20) << __func__ << " destruct." << dendl;
cleanup();
worker->remove_pending_conn(this);
- dispatcher->erase_qpn(local_qpn);
+ dispatcher->schedule_qp_destroy(local_qpn);
for (unsigned i=0; i < wc.size(); ++i) {
dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
ceph_assert(qp_conns.empty());
ceph_assert(num_qp_conn == 0);
ceph_assert(dead_queue_pairs.empty());
- ceph_assert(num_dead_queue_pair == 0);
delete async_handler;
}
ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
conn->fault();
if (!cct->_conf->ms_async_rdma_cm)
- erase_qpn_lockless(qpn);
+ enqueue_dead_qp(qpn);
}
}
break;
}
if (!tx_ret && !rx_ret) {
- // NOTE: Has TX just transitioned to idle? We should do it when idle!
- // It's now safe to delete queue pairs (see comment by declaration
- // for dead_queue_pairs).
- // Additionally, don't delete qp while outstanding_buffers isn't empty,
- // because we need to check qp's state before sending
perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
- if (num_dead_queue_pair) {
- std::lock_guard l{lock}; // FIXME reuse dead qp because creating one qp costs 1 ms
- auto it = dead_queue_pairs.begin();
- while (it != dead_queue_pairs.end()) {
- auto i = *it;
- // Bypass QPs that do not collect all Tx completions yet.
- if (i->get_tx_wr()) {
- ldout(cct, 20) << __func__ << " bypass qp=" << i << " tx_wr=" << i->get_tx_wr() << dendl;
- ++it;
- } else {
- ldout(cct, 10) << __func__ << " finally delete qp=" << i << dendl;
- delete i;
- it = dead_queue_pairs.erase(it);
- perf_logger->dec(l_msgr_rdma_active_queue_pair);
- --num_dead_queue_pair;
- }
+ //
+ // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
+ // we can destroy QPs even earlier, just when beacon has been received,
+ // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
+ // CQ before other WCs are fully consumed from rx CQ. For safety, we
+ // wait for beacon and then "no-events" from CQs.
+ //
+ // Calling size() on vector without locks is totally fine, since we
+ // use it as a hint (accuracy is not important here)
+ //
+ if (!dead_queue_pairs.empty()) {
+ decltype(dead_queue_pairs) dead_qps;
+ {
+ std::lock_guard l{lock};
+ dead_queue_pairs.swap(dead_qps);
+ }
+
+ for (auto& qp: dead_qps) {
+ perf_logger->dec(l_msgr_rdma_active_queue_pair);
+ ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl;
+ delete qp;
}
}
+
if (!num_qp_conn && done && dead_queue_pairs.empty())
break;
return get_qp_lockless(qp);
}
-void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
+void RDMADispatcher::enqueue_dead_qp(uint32_t qpn)
{
+ std::lock_guard l{lock};
auto it = qp_conns.find(qpn);
- if (it == qp_conns.end())
+ if (it == qp_conns.end()) {
+ lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
return ;
- ++num_dead_queue_pair;
- dead_queue_pairs.push_back(it->second.first);
+ }
+ QueuePair *qp = it->second.first;
+ dead_queue_pairs.push_back(qp);
qp_conns.erase(it);
--num_qp_conn;
}
-void RDMADispatcher::erase_qpn(uint32_t qpn)
+void RDMADispatcher::schedule_qp_destroy(uint32_t qpn)
{
std::lock_guard l{lock};
- erase_qpn_lockless(qpn);
+ auto it = qp_conns.find(qpn);
+ if (it == qp_conns.end()) {
+ lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
+ return;
+ }
+ QueuePair *qp = it->second.first;
+ if (qp->to_dead()) {
+ //
+ // Failed to switch to dead. This is abnormal, but we can't
+ // do anything, so just destroy QP.
+ //
+ dead_queue_pairs.push_back(qp);
+ qp_conns.erase(it);
+ --num_qp_conn;
+ } else {
+ //
+ // Successfully switched to dead, thus keep entry in the map.
+ // But only zero out socked pointer in order to return null from
+ // get_conn_lockless();
+ it->second.second = nullptr;
+ }
}
void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
for (int i = 0; i < n; ++i) {
ibv_wc* response = &cqe[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+
+ // If it's beacon WR, enqueue the QP to be destroyed later
+ if (response->wr_id == BEACON_WRID) {
+ enqueue_dead_qp(response->qp_num);
+ continue;
+ }
+
ldout(cct, 25) << __func__ << " QP: " << response->qp_num
<< " len: " << response->byte_len << " , addr:" << chunk
<< " " << ib->wc_status_to_string(response->status) << dendl;
Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr;
EventCallbackRef async_handler;
bool done = false;
- std::atomic<uint64_t> num_dead_queue_pair = {0};
std::atomic<uint64_t> num_qp_conn = {0};
// protect `qp_conns`, `dead_queue_pairs`
ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock");
/**
* 1. Connection call mark_down
* 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
- * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event)
- * 4. Wait for CQ to be empty(handle_tx_event)
- * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event)
+ * 3. Post a beacon
+ * 4. Wait for beacon which indicates queues are drained
+ * 5. Destroy the QP by calling ibv_destroy_qp()
*
* @param qp The qp needed to dead
*/
ceph::make_mutex("RDMADispatcher::for worker pending list");
// fixme: lockfree
std::list<RDMAWorker*> pending_workers;
+ void enqueue_dead_qp(uint32_t qp);
class C_handle_cq_async : public EventCallback {
RDMADispatcher *dispatcher;
RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
QueuePair* get_qp_lockless(uint32_t qp);
QueuePair* get_qp(uint32_t qp);
- void erase_qpn_lockless(uint32_t qpn);
- void erase_qpn(uint32_t qpn);
+ void schedule_qp_destroy(uint32_t qp);
Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
void notify_pending_workers();