}
}
- if (c != buffers.end() && (*c)->over())
- ++c;
- buffers.erase(buffers.begin(), c);
- ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl;
- return read;
+ buffers.erase(buffers.begin(), pchunk);
+ ldout(cct, 25) << __func__ << " got " << read_size << " bytes, buffers size: " << buffers.size() << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_rx_bytes, read_size);
+ return read_size;
}
- ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
- {
- if (error)
- return -error;
- static const int MAX_COMPLETIONS = 16;
- ibv_wc wc[MAX_COMPLETIONS];
- ssize_t size = 0;
-
- ibv_wc* response;
- Chunk* chunk;
- bool loaded = false;
- auto iter = buffers.begin();
- if (iter != buffers.end()) {
- chunk = *iter;
- // FIXME need to handle release
- // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
- buffers.erase(iter);
- loaded = true;
- size = chunk->bound;
- }
-
- std::vector<ibv_wc> cqe;
- get_wc(cqe);
- if (cqe.empty())
- return size == 0 ? -EAGAIN : size;
-
- ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
-
- for (size_t i = 0; i < cqe.size(); ++i) {
- response = &wc[i];
- chunk = reinterpret_cast<Chunk*>(response->wr_id);
- chunk->prepare_read(response->byte_len);
- if (!loaded && i == 0) {
- // FIXME need to handle release
- // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
- size = chunk->bound;
- continue;
- }
- buffers.push_back(chunk);
- iter++;
- }
-
- if (size == 0)
- return -EAGAIN;
- return size;
- }
-
ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
{
if (error) {