reader_needs_join = false;
}
-
-void Pipe::queue_received(Message *m, int priority)
-{
- assert(pipe_lock.is_locked());
-
- if (delay_thread) {
- utime_t release;
- if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
- release = m->get_recv_stamp();
- release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
- lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
- }
- delay_thread->queue(release, m);
- return;
- }
-
- in_q->enqueue(m, priority, conn_id);
-}
-
void *Pipe::DelayedDelivery::entry()
{
Mutex::Locker locker(delay_lock);
ldout(msgr->cct,10) << "reader got message "
<< m->get_seq() << " " << m << " " << *m
<< dendl;
- queue_received(m);
+
+ if (delay_thread) {
+ utime_t release;
+ if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
+ release = m->get_recv_stamp();
+ release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+ lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
+ }
+ delay_thread->queue(release, m);
+ } else {
+ in_q->enqueue(m, m->get_priority(), conn_id);
+ }
}
else if (tag == CEPH_MSGR_TAG_CLOSE) {
static const Pipe& Server(int s);
static const Pipe& Client(const entity_addr_t& pi);
- //we have two queue_received's to allow local signal delivery
- // via Message * (that doesn't actually point to a Message)
- void queue_received(Message *m, int priority);
-
- void queue_received(Message *m) {
- // this is just to make sure that a changeset is working
- // properly; if you start using the refcounting more and have
- // multiple people hanging on to a message, ditch the assert!
- assert(m->nref.read() == 1);
-
- queue_received(m, m->get_priority());
- }
-
- void delayed_delivery();
-
__u32 get_out_seq() { return out_seq; }
bool is_queued() { return !out_q.empty() || keepalive; }