OPTION(ms_pq_min_cost, OPT_U64, 65536)
OPTION(ms_inject_socket_failures, OPT_U64, 0)
OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed
+OPTION(ms_inject_delay_msg_type, OPT_STR, "") // the type of message to delay, as returned by Message::get_type_name(). This is an additional restriction on the general type filter ms_inject_delay_type.
OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds
OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds
continue;
}
utime_t release = delay_queue.front().first;
- if (release > ceph_clock_now(pipe->msgr->cct)) {
+ Message *m = delay_queue.front().second;
+ string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
+ if (release > ceph_clock_now(pipe->msgr->cct) &&
+ (delay_msg_type.empty() || m->get_type_name() == delay_msg_type)) {
lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
delay_cond.WaitUntil(delay_lock, release);
continue;
}
- Message *m = delay_queue.front().second;
lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
delay_queue.pop_front();
pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);