From: Sage Weil Date: Tue, 27 Nov 2012 23:27:18 +0000 (-0800) Subject: msg/Pipe: drop queue helpers X-Git-Tag: v0.56~113^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f0bf61687870da4d430482eaf5f1f9d6e0b2645f;p=ceph.git msg/Pipe: drop queue helpers There is a single caller; these only obfuscate. Signed-off-by: Sage Weil --- diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 34ded27d77fc..404edb589c61 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -157,25 +157,6 @@ void Pipe::join_reader() reader_needs_join = false; } - -void Pipe::queue_received(Message *m, int priority) -{ - assert(pipe_lock.is_locked()); - - if (delay_thread) { - utime_t release; - if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { - release = m->get_recv_stamp(); - release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; - lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; - } - delay_thread->queue(release, m); - return; - } - - in_q->enqueue(m, priority, conn_id); -} - void *Pipe::DelayedDelivery::entry() { Mutex::Locker locker(delay_lock); @@ -1294,7 +1275,18 @@ void Pipe::reader() ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; - queue_received(m); + + if (delay_thread) { + utime_t release; + if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { + release = m->get_recv_stamp(); + release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; + lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; + } + delay_thread->queue(release, m); + } else { + in_q->enqueue(m, m->get_priority(), conn_id); + } } else if (tag == CEPH_MSGR_TAG_CLOSE) { diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index 8b09ec0f27d7..c489ec806106 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -217,21 +217,6 @@ class DispatchQueue; static const Pipe& Server(int s); static const Pipe& Client(const entity_addr_t& pi); - //we have two queue_received's to allow local signal delivery - // via Message * (that doesn't actually point to a Message) - void queue_received(Message *m, int priority); - - void queue_received(Message *m) { - // this is just to make sure that a changeset is working - // properly; if you start using the refcounting more and have - // multiple people hanging on to a message, ditch the assert! - assert(m->nref.read() == 1); - - queue_received(m, m->get_priority()); - } - - void delayed_delivery(); - __u32 get_out_seq() { return out_seq; } bool is_queued() { return !out_q.empty() || keepalive; }