return bytes;
}
+size_t RDMAConnectedSocketImpl::tx_copy_chunk(std::vector<Chunk*> &tx_buffers,
+ size_t req_copy_len, decltype(std::cbegin(pending_bl.buffers()))& start,
+ const decltype(std::cbegin(pending_bl.buffers()))& end)
+{
+ ceph_assert(start != end);
+ auto chunk_idx = tx_buffers.size();
+ if (0 == worker->get_reged_mem(this, tx_buffers, req_copy_len)) {
+ ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
+ return 0;
+ }
+
+ Chunk *current_chunk = tx_buffers[chunk_idx];
+ size_t write_len = 0;
+ while (start != end) {
+ const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
+
+ size_t slice_write_len = 0;
+ while (slice_write_len < start->length()) {
+ size_t real_len = current_chunk->write((char*)addr + slice_write_len, start->length() - slice_write_len);
+
+ slice_write_len += real_len;
+ write_len += real_len;
+ req_copy_len -= real_len;
+
+ if (current_chunk->full()) {
+ if (++chunk_idx == tx_buffers.size())
+ return write_len;
+ current_chunk = tx_buffers[chunk_idx];
+ }
+ }
+
+ ++start;
+ }
+ ceph_assert(req_copy_len == 0);
+ return write_len;
+}
+
ssize_t RDMAConnectedSocketImpl::submit(bool more)
{
if (error)
if (!bytes)
return 0;
- auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers,
- unsigned bytes,
- auto& start,
- const auto& end) -> unsigned {
- ceph_assert(start != end);
- auto chunk_idx = tx_buffers.size();
- int ret = worker->get_reged_mem(this, tx_buffers, bytes);
- if (ret == 0) {
- ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
- worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
- return 0;
- }
-
- unsigned total_copied = 0;
- Chunk *current_chunk = tx_buffers[chunk_idx];
- while (start != end) {
- const uintptr_t addr = reinterpret_cast<uintptr_t>(start->c_str());
- unsigned copied = 0;
- while (copied < start->length()) {
- uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied);
- copied += r;
- total_copied += r;
- bytes -= r;
- if (current_chunk->full()){
- if (++chunk_idx == tx_buffers.size())
- return total_copied;
- current_chunk = tx_buffers[chunk_idx];
- }
- }
- ++start;
- }
- ceph_assert(bytes == 0);
- return total_copied;
- };
-
std::vector<Chunk*> tx_buffers;
auto it = std::cbegin(pending_bl.buffers());
auto copy_it = it;
- unsigned total = 0;
- unsigned need_reserve_bytes = 0;
+ size_t total = 0;
+ size_t need_reserve_bytes = 0;
while (it != pending_bl.buffers().end()) {
if (infiniband->is_tx_buffer(it->raw_c_str())) {
if (need_reserve_bytes) {
- unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
+ size_t copied = tx_copy_chunk(tx_buffers, need_reserve_bytes, copy_it, it);
total += copied;
if (copied < need_reserve_bytes)
goto sending;
++it;
}
if (need_reserve_bytes)
- total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
+ total += tx_copy_chunk(tx_buffers, need_reserve_bytes, copy_it, it);
sending:
if (total == 0)