#undef dout_prefix
#define dout_prefix *_dout << "-- " << msgr->get_myaddr() << " "
-double DispatchQueue::get_max_age(utime_t now) {
+double DispatchQueue::get_max_age(utime_t now) const {
Mutex::Locker l(lock);
if (marrival.empty())
return 0;
ldout(cct,20) << "done calling dispatch on " << m << dendl;
}
-bool DispatchQueue::can_fast_dispatch(Message *m)
+bool DispatchQueue::can_fast_dispatch(Message *m) const
{
return msgr->ms_can_fast_dispatch(m);
}
bool is_code() const {
return type != -1;
}
- int get_code () {
+ int get_code () const {
assert(is_code());
return type;
}
- Message *get_message() {
+ Message *get_message() const {
assert(!is_code());
return m.get();
}
- Connection *get_connection() {
+ Connection *get_connection() const {
assert(is_code());
return con.get();
}
CephContext *cct;
SimpleMessenger *msgr;
- Mutex lock;
+ mutable Mutex lock;
Cond cond;
PrioritizedQueue<QueueItem, uint64_t> mqueue;
void local_delivery(Message *m, int priority);
void run_local_delivery();
- double get_max_age(utime_t now);
+ double get_max_age(utime_t now) const;
- int get_queue_len() {
+ int get_queue_len() const {
Mutex::Locker l(lock);
return mqueue.length();
}
cond.Signal();
}
- bool can_fast_dispatch(Message *m);
+ bool can_fast_dispatch(Message *m) const;
void fast_dispatch(Message *m);
void fast_preprocess(Message *m);
void enqueue(Message *m, int priority, uint64_t id);
void entry();
void wait();
void shutdown();
- bool is_started() {return dispatch_thread.is_started();}
+ bool is_started() const {return dispatch_thread.is_started();}
DispatchQueue(CephContext *cct, SimpleMessenger *msgr)
: cct(cct), msgr(msgr),
- lock("SimpleMessenger::DispatchQeueu::lock"),
+ lock("SimpleMessenger::DispatchQueue::lock"),
mqueue(cct->_conf->ms_pq_max_tokens_per_priority,
cct->_conf->ms_pq_min_cost),
next_pipe_id(1),