{
lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl;
Mutex::Locker l(delay_lock);
- while (!delay_queue.empty()) {
- Message *m = delay_queue.front().second;
- delay_queue.pop_front();
- if (pipe->in_q->can_fast_dispatch(m)) {
- delay_lock.Unlock();
- pipe->in_q->fast_dispatch(m);
- delay_lock.Lock();
- } else {
- pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
- }
- }
+ flush_count = delay_queue.size();
+ delay_cond.Signal();
}
void *Pipe::DelayedDelivery::entry()
utime_t release = delay_queue.front().first;
Message *m = delay_queue.front().second;
string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
- if (release > ceph_clock_now(pipe->msgr->cct) &&
- (delay_msg_type.empty() || m->get_type_name() == delay_msg_type)) {
+ if (!flush_count &&
+ (release > ceph_clock_now(pipe->msgr->cct) &&
+ (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
delay_cond.WaitUntil(delay_lock, release);
continue;
}
lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
delay_queue.pop_front();
+ if (flush_count > 0) {
+ --flush_count;
+ active_flush = true;
+ }
if (pipe->in_q->can_fast_dispatch(m)) {
delay_lock.Unlock();
pipe->in_q->fast_dispatch(m);
} else {
pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
}
+ active_flush = false;
}
lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl;
return NULL;
if (existing->connection_state->clear_pipe(existing))
msgr->dispatch_queue.queue_reset(existing->connection_state.get());
} else {
- // queue a reset on the old connection
+ // queue a reset on the new connection, which we're dumping for the old
msgr->dispatch_queue.queue_reset(connection_state.get());
// drop my Connection, and take a ref to the existing one. do not
// make existing Connection reference us
connection_state->reset_pipe(this);
- // flush/queue any existing delayed messages
- if (existing->delay_thread)
- existing->delay_thread->flush();
+ if (existing->delay_thread) {
+ existing->delay_thread->steal_for_pipe(this);
+ delay_thread = existing->delay_thread;
+ existing->delay_thread = NULL;
+ delay_thread->flush();
+ }
// steal incoming queue
uint64_t replaced_conn_id = conn_id;
if (!reader_running && !writer_running) {
shutdown_socket();
pipe_lock.Unlock();
+ if (delay_thread && delay_thread->is_flushing()) {
+ delay_thread->wait_for_flush();
+ }
msgr->queue_reap(this);
} else {
pipe_lock.Unlock();
std::deque< pair<utime_t,Message*> > delay_queue;
Mutex delay_lock;
Cond delay_cond;
+ int flush_count;
+ bool active_flush;
bool stop_delayed_delivery;
public:
DelayedDelivery(Pipe *p)
: pipe(p),
- delay_lock("Pipe::DelayedDelivery::delay_lock"),
+ delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
+ active_flush(false),
stop_delayed_delivery(false) { }
~DelayedDelivery() {
discard();
}
void discard();
void flush();
+ bool is_flushing() {
+ Mutex::Locker l(delay_lock);
+ return flush_count > 0 || active_flush;
+ }
+ void wait_for_flush() {
+ Mutex::Locker l(delay_lock);
+ while (flush_count > 0 || active_flush)
+ delay_cond.Wait(delay_lock);
+ }
void stop() {
delay_lock.Lock();
stop_delayed_delivery = true;
delay_cond.Signal();
delay_lock.Unlock();
}
+ void steal_for_pipe(Pipe *new_owner) {
+ Mutex::Locker l(delay_lock);
+ pipe = new_owner;
+ }
} *delay_thread;
friend class DelayedDelivery;