reader_needs_join = false;
}
+void Pipe::DelayedDelivery::discard()
+{
+ Mutex::Locker l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ pipe->msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
+ m->put();
+ delay_queue.pop_front();
+ }
+}
+
void *Pipe::DelayedDelivery::entry()
{
Mutex::Locker locker(delay_lock);
out_q.clear();
}
-
void Pipe::fault(bool onread)
{
const md_config_t *conf = msgr->cct->_conf;
delay_queue.push_back(make_pair(release, m));
delay_cond.Signal();
}
- void discard() {
- Mutex::Locker l(delay_lock);
- while (!delay_queue.empty()) {
- delay_queue.front().second->put();
- delay_queue.pop_front();
- }
- }
+ void discard();
void stop() {
delay_lock.Lock();
stop_delayed_delivery = true;