If a wakeup for an outbound message has already been scheduled or is
currently executing within the worker thread, avoid re-adding a wakeup.
For small IO sizes under high queue depths, these extra syscalls start
to add up. For larger IO sizes or small queue depths, it doesn't hurt
performance.
fio --ioengine=rbd results:
IOPS pre-change post-change
4K: 84.9k 98.3k
32K: 58.4k 59.5k
256K: 12.1k 12.2k
4M: 803 802
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit
294c41f18adada6ab762d5b3f9f45d7c8bed4316)
out_q[m->get_priority()].emplace_back(std::move(bl), m);
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
<< dendl;
- if (can_write != WriteStatus::REPLACING) {
+ if (can_write != WriteStatus::REPLACING && !write_in_progress) {
+ write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
}
}
} else if (r > 0)
break;
} while (can_write == WriteStatus::CANWRITE);
+ write_in_progress = false;
connection->write_lock.unlock();
// if r > 0 mean data still lefted, so no need _try_send.
return;
}
} else {
+ write_in_progress = false;
connection->write_lock.unlock();
connection->lock.lock();
connection->write_lock.lock();
}
void ProtocolV1::requeue_sent() {
+ write_in_progress = false;
if (sent.empty()) {
return;
}
}
}
out_q.clear();
+ write_in_progress = false;
}
void ProtocolV1::reset_recv_state() {
<< __func__ << " stop myself to swap existing" << dendl;
exproto->can_write = WriteStatus::REPLACING;
exproto->replacing = true;
+ exproto->write_in_progress = false;
existing->state_offset = 0;
// avoid previous thread modify event
exproto->state = NONE;
// priority queue for outbound msgs
std::map<int, std::list<std::pair<bufferlist, Message *>>> out_q;
bool keepalive;
+ bool write_in_progress = false;
__u32 connect_seq, peer_global_seq;
std::atomic<uint64_t> in_seq{0};
}
}
out_queue.clear();
+ write_in_progress = false;
}
void ProtocolV2::reset_session() {
void ProtocolV2::fault() { _fault(); }
void ProtocolV2::requeue_sent() {
+ write_in_progress = false;
if (sent.empty()) {
return;
}
out_queue_entry_t{is_prepared, m});
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
<< dendl;
- if ((!replacing && can_write) || state == STANDBY) {
+ if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
+ write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
}
}
} else if (r > 0)
break;
} while (can_write);
+ write_in_progress = false;
// if r > 0 mean data still lefted, so no need _try_send.
if (r == 0) {
return;
}
} else {
+ write_in_progress = false;
connection->write_lock.unlock();
connection->lock.lock();
connection->write_lock.lock();
connection->dispatch_queue->queue_reset(connection);
exproto->can_write = false;
+ exproto->write_in_progress = false;
exproto->reconnecting = reconnecting;
exproto->replacing = true;
std::swap(exproto->session_stream_handlers, session_stream_handlers);
} pre_auth;
bool keepalive;
+ bool write_in_progress = false;
ostream &_conn_prefix(std::ostream *_dout);
void run_continuation(Ct<ProtocolV2> *pcontinuation);