assert(m);
new(chunk) Chunk(m, chunk_size, base+offset);
free_chunks.push_back(chunk);
- all_chunks.insert(chunk);
+ all_buffers.insert(chunk->buffer);
ptr += sizeof(Chunk);
}
return 0;
return memory_manager->get_send_buffers(c, bytes);
}
-int Infiniband::recall_chunk(Chunk* c)
-{
- if (memory_manager->is_rx_chunk(c)) {
- post_chunk(c);
- return 1;
- } else if (memory_manager->is_tx_chunk(c)) {
- vector<Chunk*> v;
- v.push_back(c);
- memory_manager->return_tx(v);
- return 2;
- }
- return -1;
-}
-
/**
* Create a new QueuePair. This factory should be used in preference to
* the QueuePair constructor directly, since this lets derivatives of
int add(uint32_t num);
void take_back(std::vector<Chunk*> &ck);
int get_buffers(std::vector<Chunk*> &chunks, size_t bytes);
+ Chunk *get_chunk_by_buffer(const char *c) {
+ uint32_t idx = (c - base) / chunk_size;
+ Chunk *chunk = reinterpret_cast<Chunk*>(chunk_base + sizeof(Chunk) * idx);
+ return chunk;
+ }
MemoryManager& manager;
uint32_t chunk_size;
Mutex lock;
std::vector<Chunk*> free_chunks;
- std::set<Chunk*> all_chunks;
+ std::set<const char*> all_buffers;
char* base;
char* chunk_base;
};
void return_tx(std::vector<Chunk*> &chunks);
int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes);
- int is_tx_chunk(Chunk* c) { return send->all_chunks.count(c);}
- int is_rx_chunk(Chunk* c) { return channel->all_chunks.count(c);}
+ // TODO: optimize via address judgement
+ bool is_tx_buffer(const char* c) { return send->all_buffers.count(c); }
+ bool is_rx_buffer(const char* c) { return channel->all_buffers.count(c); }
+ Chunk *get_tx_chunk_by_buffer(const char *c) {
+ return send->get_chunk_by_buffer(c);
+ }
+ uint32_t get_tx_chunk_size() const {
+ return send->chunk_size;
+ }
bool enabled_huge_page;
MemoryManager* get_memory_manager() { return memory_manager; }
Device* get_device() { return device; }
int get_async_fd() { return device->ctxt->async_fd; }
- int recall_chunk(Chunk* c);
- int is_tx_chunk(Chunk* c) { return memory_manager->is_tx_chunk(c); }
- int is_rx_chunk(Chunk* c) { return memory_manager->is_rx_chunk(c); }
+ bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
+ bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
+ Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
};
{
ldout(cct, 20) << __func__ << " destruct." << dendl;
dispatcher->perf_logger->dec(l_msgr_rdma_active_queue_pair);
+ cleanup();
worker->remove_pending_conn(this);
dispatcher->erase_qpn(my_msg.qpn);
- cleanup();
+ Mutex::Locker l(lock);
if (notify_fd >= 0)
::close(notify_fd);
if (tcp_fd >= 0)
::close(tcp_fd);
error = ECONNRESET;
- Mutex::Locker l(lock);
- for (unsigned i=0; i < wc.size(); ++i)
- infiniband->recall_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
- for (unsigned i=0; i < buffers.size(); ++i)
- infiniband->recall_chunk(buffers[i]);
+ int ret = 0;
+ for (unsigned i=0; i < wc.size(); ++i) {
+ ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
+ assert(ret == 0);
+ }
+ for (unsigned i=0; i < buffers.size(); ++i) {
+ ret = infiniband->post_chunk(buffers[i]);
+ assert(ret == 0);
+ }
}
void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
chunk->prepare_read(response->byte_len);
+ worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
if (response->byte_len == 0) {
dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
if (connected) {
error = ECONNRESET;
- assert(infiniband->post_chunk(chunk) == 0);
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
- break;
- }
- worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
- //assert(response->byte_len);
- if (read == (ssize_t)len) {
- buffers.push_back(chunk);
- ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
- } else if (read + response->byte_len > (ssize_t)len) {
- read += chunk->read(buf+read, (ssize_t)len-read);
- buffers.push_back(chunk);
- ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
- } else {
- read += chunk->read(buf+read, response->byte_len);
assert(infiniband->post_chunk(chunk) == 0);
+ } else {
+ if (read == (ssize_t)len) {
+ buffers.push_back(chunk);
+ ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
+ } else if (read + response->byte_len > (ssize_t)len) {
+ read += chunk->read(buf+read, (ssize_t)len-read);
+ buffers.push_back(chunk);
+ ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
+ } else {
+ read += chunk->read(buf+read, response->byte_len);
+ assert(infiniband->post_chunk(chunk) == 0);
+ }
}
}
ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
{
size_t read = 0, tmp = 0;
- vector<Chunk*>::iterator c = buffers.begin();
+ auto c = buffers.begin();
for (; c != buffers.end() ; ++c) {
tmp = (*c)->read(buf+read, len-read);
read += tmp;
if (error)
return -error;
Mutex::Locker l(lock);
- std::vector<Chunk*> tx_buffers;
size_t bytes = pending_bl.length();
ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
<< pending_bl.buffers().size() << dendl;
if (!bytes)
return 0;
- int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
- if (ret == 0) {
- ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
- worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
- return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough
- }
- vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
- list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
+ auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, unsigned bytes,
+ std::list<bufferptr>::const_iterator &start,
+ std::list<bufferptr>::const_iterator &end) -> unsigned {
+ assert(start != end);
+ auto chunk_idx = tx_buffers.size();
+ int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
+ if (ret == 0) {
+ ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
+ worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
+ return 0;
+ }
+
+ unsigned total_copied = 0;
+ Chunk *current_chunk = tx_buffers[chunk_idx];
+ while (start != end) {
+ const uintptr_t addr = reinterpret_cast<const uintptr_t>(start->c_str());
+ unsigned copied = 0;
+ while (copied < start->length()) {
+ uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied);
+ copied += r;
+ total_copied += r;
+ bytes -= r;
+ if (current_chunk->full()){
+ current_chunk = tx_buffers[++chunk_idx];
+ if (chunk_idx == tx_buffers.size())
+ return total_copied;
+ }
+ }
+ ++start;
+ }
+ assert(bytes == 0);
+ return total_copied;
+ };
+
+ std::vector<Chunk*> tx_buffers;
+ std::list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
+ std::list<bufferptr>::const_iterator copy_it = it;
unsigned total = 0;
+ unsigned need_reserve_bytes = 0;
while (it != pending_bl.buffers().end()) {
- const uintptr_t addr = reinterpret_cast<const uintptr_t>(it->c_str());
- unsigned copied = 0;
- while (copied < it->length()) {
- uint32_t r = (*current_buffer)->write((char*)addr+copied, it->length() - copied);
- copied += r;
- total += r;
- if ((*current_buffer)->full()){
- ++current_buffer;
- if (current_buffer == tx_buffers.end())
+ if (infiniband->is_tx_buffer(it->raw_c_str())) {
+ if (need_reserve_bytes) {
+ unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
+ total += copied;
+ if (copied < need_reserve_bytes)
goto sending;
+ need_reserve_bytes = 0;
}
+ assert(copy_it == it);
+ tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
+ total += it->length();
+ ++copy_it;
+ } else {
+ need_reserve_bytes += it->length();
}
++it;
}
+ if (need_reserve_bytes)
+ total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
sending:
+ if (total == 0)
+ return -EAGAIN;
assert(total <= pending_bl.length());
bufferlist swapped;
if (total < pending_bl.length()) {
}
ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
- << pending_bl.buffers().size() << dendl;
+ << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
int r = post_work_request(tx_buffers);
if (r < 0)
return -errno;
}
worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
- ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+ ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
return 0;
}
void RDMAConnectedSocketImpl::notify()
{
uint64_t i = 1;
- assert(write(notify_fd, &i, sizeof(i)) == sizeof(i));
+ write(notify_fd, &i, sizeof(i));
}
void RDMAConnectedSocketImpl::shutdown()
#include <poll.h>
#include "include/str_list.h"
+#include "common/deleter.h"
#include "RDMAStack.h"
#define dout_subsys ceph_subsys_ms
RDMAConnectedSocketImpl *conn = nullptr;
utime_t last_inactive = ceph_clock_now();
bool rearmed = false;
+ int ret = 0;
while (true) {
int n = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
+
+ if (wc[i].opcode == IBV_WC_SEND) {
+ tx_cqe.push_back(wc[i]);
+ ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl;
+ continue;
+ }
+
if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
- << ib->wc_status_to_string(response->status) << dendl;
- ib->recall_chunk(chunk);
- conn = get_conn_lockless(response->qp_num);
- if (conn && conn->is_connected())
- conn->fault();
- notify_pending_workers();
+ << ib->wc_status_to_string(response->status) << ")" << dendl;
+ if (ib->is_rx_buffer(chunk->buffer)) {
+ ret = ib->post_chunk(chunk);
+ if (ret) {
+ ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(ret) << dendl;
+ assert(ret == 0);
+ }
+ conn = get_conn_lockless(response->qp_num);
+ if (conn && conn->is_connected())
+ conn->fault();
+ notify_pending_workers();
+ } else if (ib->is_tx_buffer(chunk->buffer)) {
+ tx_cqe.push_back(wc[i]);
+ } else {
+ ldout(cct, 0) << __func__ << " unknown chunk: " << chunk << dendl;
+ }
continue;
}
- if (wc[i].opcode == IBV_WC_SEND) {
- tx_cqe.push_back(wc[i]);
- ldout(cct, 25) << " got a tx cqe, bytes:" << wc[i].byte_len << dendl;
- continue;
- }
- ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
conn = get_conn_lockless(response->qp_num);
if (!conn) {
- int ret = ib->recall_chunk(chunk);
+ ret = ib->post_chunk(chunk);
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << ret << dendl;
+ assert(ret == 0);
continue;
}
polled[conn].push_back(*response);
}
void RDMADispatcher::notify_pending_workers() {
- Mutex::Locker l(w_lock);
- if (pending_workers.empty())
- return ;
- pending_workers.front()->pass_wc(std::move(vector<ibv_wc>()));
- pending_workers.pop_front();
+ Mutex::Locker l(w_lock);
+ if (pending_workers.empty())
+ return ;
+ pending_workers.front()->pass_wc(std::move(vector<ibv_wc>()));
+ pending_workers.pop_front();
}
int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
+ plb.add_u64_counter(l_msgr_rdma_rx_no_registered_mem, "rx_no_registered_mem", "The count of no registered buffer when receiving");
plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
notify();
}
-void RDMAWorker::add_pending_conn(RDMAConnectedSocketImpl* o)
-{
- pending_sent_conns.push_back(o);
- if (!pended) {
- dispatcher->pending_buffers(this);
- pended = true;
- }
-}
-
void RDMAWorker::get_wc(std::vector<ibv_wc> &w)
{
Mutex::Locker l(lock);
{
int r = infiniband->get_tx_buffers(c, bytes);
if (r > 0) {
- stack->get_dispatcher()->inflight += c.size();
- ldout(cct, 30) << __func__ << " reserve " << c.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
+ stack->get_dispatcher()->inflight += r;
+ ldout(cct, 30) << __func__ << " reserve " << r << " chunks, inflight " << dispatcher->inflight << dendl;
return r;
}
assert(r == 0);
- if (pending_sent_conns.back() != o)
- pending_sent_conns.push_back(o);
- dispatcher->pending_buffers(this);
+ if (o) {
+ {
+ Mutex::Locker l(lock);
+ if (pending_sent_conns.back() != o)
+ pending_sent_conns.push_back(o);
+ }
+ dispatcher->pending_buffers(this);
+ }
return r;
}
* \return
* 0 if success or -1 for failure
*/
-int RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
+void RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
{
if (chunks.empty())
- return 0;
+ return ;
- stack->get_dispatcher()->inflight -= chunks.size();
+ dispatcher->inflight -= chunks.size();
memory_manager->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
pended = false;
std::set<RDMAConnectedSocketImpl*> done;
+ Mutex::Locker l(lock);
while (!pending_sent_conns.empty()) {
RDMAConnectedSocketImpl *o = pending_sent_conns.front();
- if (done.count(o) == 0) {
+ pending_sent_conns.pop_front();
+ if (!done.count(o)) {
+ lock.Unlock();
done.insert(o);
- } else {
- pending_sent_conns.pop_front();
- continue;
- }
- ssize_t r = o->submit(false);
- ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
- if (r < 0) {
- if (r == -EAGAIN)
- break;
- o->fault();
+ ssize_t r = o->submit(false);
+ ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
+ lock.Lock();
+ if (r < 0) {
+ if (r == -EAGAIN) {
+ pending_sent_conns.push_front(o);
+ break;
+ }
+ o->fault();
+ }
}
- pending_sent_conns.pop_front();
}
- return 0;
}
void RDMAWorker::handle_tx_event()
<< infiniband->wc_status_to_string(response->status) << dendl;
}
RDMAConnectedSocketImpl *conn = stack->get_dispatcher()->get_conn_by_qp(response->qp_num);
- if (conn) {
+
+ if (conn && conn->is_connected()) {
ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
conn->fault();
} else {
}
}
- //assert(memory_manager->is_tx_chunk(chunk));
- if (memory_manager->is_tx_chunk(chunk)) {
+ // FIXME: why not tx?
+ if (memory_manager->is_tx_buffer(chunk->buffer))
tx_chunks.push_back(chunk);
- } else {
- ldout(cct, 1) << __func__ << " a outter chunk: " << chunk << dendl;//fin
- }
}
perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size());
l_msgr_rdma_tx_no_mem,
l_msgr_rdma_tx_parital_mem,
l_msgr_rdma_tx_failed,
+ l_msgr_rdma_rx_no_registered_mem,
l_msgr_rdma_tx_chunks,
l_msgr_rdma_tx_bytes,
virtual void initialize() override;
RDMAStack *get_stack() { return stack; }
int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
- int post_tx_buffer(std::vector<Chunk*> &chunks);
- void add_pending_conn(RDMAConnectedSocketImpl* o);
- void remove_pending_conn(RDMAConnectedSocketImpl *o) { pending_sent_conns.remove(o); }
+ void post_tx_buffer(std::vector<Chunk*> &chunks);
+ void remove_pending_conn(RDMAConnectedSocketImpl *o) {
+ Mutex::Locker l(lock);
+ pending_sent_conns.remove(o);
+ }
void handle_tx_event();
void set_ib(Infiniband* ib) { infiniband = ib; }
void set_stack(RDMAStack *s) { stack = s; }