auto fast_dispatch_time = ceph::mono_clock::now();
logger->tinc(l_msgr_running_recv_time, fast_dispatch_time - recv_start_time);
if (delay_state) {
- utime_t release = message->get_recv_stamp();
double delay_period = 0;
if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
- release += delay_period;
- ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on "
- << message << " " << *message << dendl;
+ ldout(async_msgr->cct, 1) << "queue_received will delay after " << (ceph_clock_now() + delay_period)
+ << " on " << message << " " << *message << dendl;
}
- delay_state->queue(delay_period, release, message);
+ delay_state->queue(delay_period, message);
} else if (async_msgr->ms_can_fast_dispatch(message)) {
lock.unlock();
dispatch_queue->fast_dispatch(message);
return ;
if (delay_queue.empty())
return ;
- utime_t release = delay_queue.front().first;
- m = delay_queue.front().second;
- string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type;
- utime_t now = ceph_clock_now();
- if ((release > now &&
- (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
- utime_t t = release - now;
- t.sleep();
- }
+ m = delay_queue.front();
delay_queue.pop_front();
}
if (msgr->ms_can_fast_dispatch(m)) {
center->get_id(), [this] () mutable {
std::lock_guard<std::mutex> l(delay_lock);
while (!delay_queue.empty()) {
- Message *m = delay_queue.front().second;
+ Message *m = delay_queue.front();
if (msgr->ms_can_fast_dispatch(m)) {
dispatch_queue->fast_dispatch(m);
} else {
*/
class DelayedDelivery : public EventCallback {
std::set<uint64_t> register_time_events; // need to delete it if stop
- std::deque<std::pair<utime_t, Message*> > delay_queue;
+ std::deque<Message*> delay_queue;
std::mutex delay_lock;
AsyncMessenger *msgr;
EventCenter *center;
}
void set_center(EventCenter *c) { center = c; }
void do_request(uint64_t id) override;
- void queue(double delay_period, utime_t release, Message *m) {
+ void queue(double delay_period, Message *m) {
std::lock_guard<std::mutex> l(delay_lock);
- delay_queue.push_back(std::make_pair(release, m));
+ delay_queue.push_back(m);
register_time_events.insert(center->create_time_event(delay_period*1000000, this));
}
void discard() {
center->submit_to(center->get_id(), [this] () mutable {
std::lock_guard<std::mutex> l(delay_lock);
while (!delay_queue.empty()) {
- Message *m = delay_queue.front().second;
+ Message *m = delay_queue.front();
dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
m->put();
delay_queue.pop_front();