return -EAGAIN;
}
ssize_t read = 0;
- if (!buffers.empty())
- read = read_buffers(buf,len);
+ read = read_buffers(buf,len);
+ if (is_server && connected == 0) {
+ ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
+ connected = 1; //if so, we don't need the last handshake
+ cleanup();
+ submit(false);
+ }
+
+ if (!buffers.empty()) {
+ notify();
+ }
+
+ if (read == 0 && error)
+ return -error;
+ return read == 0 ? -EAGAIN : read;
+}
+
+void RDMAConnectedSocketImpl::buffer_prefetch(void)
+{
std::vector<ibv_wc> cqe;
get_wc(cqe);
- if (cqe.empty()) {
- if (!buffers.empty()) {
- notify();
- }
- if (read > 0) {
- return read;
- }
- if (error) {
- return -error;
- } else {
- return -EAGAIN;
- }
- }
+ if(cqe.empty())
+ return;
- ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
- for (size_t i = 0; i < cqe.size(); ++i) {
+ for(size_t i = 0; i < cqe.size(); ++i) {
ibv_wc* response = &cqe[i];
ceph_assert(response->status == IBV_WC_SUCCESS);
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
chunk->prepare_read(response->byte_len);
+
if (chunk->get_size() == 0) {
chunk->reset_read_chunk();
dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
}
dispatcher->post_chunk_to_pool(chunk);
continue;
- }
- ldout(cct, 25) << __func__ << " chunk " << chunk << " has bytes " << chunk->get_size() << dendl;
- worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
-
- read += chunk->read(buf + read, len - read);
-
- if (chunk->get_size()) {
+ } else {
buffers.push_back(chunk);
ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
- } else {
- chunk->reset_read_chunk();
- dispatcher->post_chunk_to_pool(chunk);
- update_post_backlog();
}
}
-
worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
- if (is_server && connected == 0) {
- ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
- connected = 1; //if so, we don't need the last handshake
- cleanup();
- submit(false);
- }
-
- if (!buffers.empty()) {
- notify();
- }
-
- if (read == 0 && error)
- return -error;
- return read == 0 ? -EAGAIN : read;
}
ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
{
size_t read_size = 0, tmp = 0;
+ buffer_prefetch();
auto pchunk = buffers.begin();
while (pchunk != buffers.end()) {
tmp = (*pchunk)->read(buf + read_size, len - read_size);
buffers.erase(buffers.begin(), pchunk);
ldout(cct, 25) << __func__ << " got " << read_size << " bytes, buffers size: " << buffers.size() << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size);
return read_size;
}