int RDMAWorker::reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
+ assert(center.in_thread());
int r = infiniband->get_tx_buffers(c, bytes);
if (r > 0) {
stack->get_dispatcher()->inflight += r;
assert(r == 0);
if (o) {
- {
- Mutex::Locker l(lock);
- if (pending_sent_conns.back() != o)
- pending_sent_conns.push_back(o);
- }
+ if (pending_sent_conns.back() != o)
+ pending_sent_conns.push_back(o);
dispatcher->pending_buffers(this);
}
return r;
*/
void RDMAWorker::post_tx_buffer(std::vector<Chunk*> &chunks)
{
+ assert(center.in_thread());
if (chunks.empty())
return ;
ldout(cct, 30) << __func__ << " release " << chunks.size() << " chunks, inflight " << stack->get_dispatcher()->inflight << dendl;
std::set<RDMAConnectedSocketImpl*> done;
- Mutex::Locker l(lock);
while (!pending_sent_conns.empty()) {
RDMAConnectedSocketImpl *o = pending_sent_conns.front();
pending_sent_conns.pop_front();
if (!done.count(o)) {
- lock.Unlock();
done.insert(o);
ssize_t r = o->submit(false);
ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
- lock.Lock();
if (r < 0) {
if (r == -EAGAIN) {
pending_sent_conns.push_front(o);
int reserve_message_buffer(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
void post_tx_buffer(std::vector<Chunk*> &chunks);
void remove_pending_conn(RDMAConnectedSocketImpl *o) {
- Mutex::Locker l(lock);
+ assert(center.in_thread());
pending_sent_conns.remove(o);
}
void handle_tx_event();