void DispatchQueue::local_delivery(Message *m, int priority)
{
- Mutex::Locker l(lock);
m->set_connection(msgr->local_connection.get());
m->set_recv_stamp(ceph_clock_now(msgr->cct));
- add_arrival(m);
- if (priority >= CEPH_MSG_PRIO_LOW) {
- mqueue.enqueue_strict(
- 0, priority, QueueItem(m));
- } else {
- mqueue.enqueue(
- 0, priority, m->get_cost(), QueueItem(m));
+ Mutex::Locker l(local_delivery_lock);
+ if (local_messages.empty())
+ local_delivery_cond.Signal();
+ local_messages.push_back(make_pair(m, priority));
+ return;
+}
+
+void DispatchQueue::run_local_delivery()
+{
+ local_delivery_lock.Lock();
+ while (true) {
+ if (stop_local_delivery)
+ break;
+ if (local_messages.empty()) {
+ local_delivery_cond.Wait(local_delivery_lock);
+ continue;
+ }
+ pair<Message *, int> mp = local_messages.front();
+ local_messages.pop_front();
+ local_delivery_lock.Unlock();
+ Message *m = mp.first;
+ int priority = mp.second;
+ fast_preprocess(m);
+ if (can_fast_dispatch(m)) {
+ fast_dispatch(m);
+ } else {
+ Mutex::Locker l(lock);
+ add_arrival(m);
+ if (priority >= CEPH_MSG_PRIO_LOW) {
+ mqueue.enqueue_strict(
+ 0, priority, QueueItem(m));
+ } else {
+ mqueue.enqueue(
+ 0, priority, m->get_cost(), QueueItem(m));
+ }
+ cond.Signal();
+ }
+ local_delivery_lock.Lock();
}
- cond.Signal();
+ local_delivery_lock.Unlock();
}
/*
assert(!stop);
assert(!dispatch_thread.is_started());
dispatch_thread.create();
+ local_delivery_thread.create();
}
void DispatchQueue::wait()
{
+ local_delivery_thread.join();
dispatch_thread.join();
}
void DispatchQueue::shutdown()
{
+ // stop my local delivery thread
+ local_delivery_lock.Lock();
+ stop_local_delivery = true;
+ local_delivery_cond.Signal();
+ local_delivery_lock.Unlock();
+
// stop my dispatch thread
lock.Lock();
stop = true;
}
} dispatch_thread;
+ Mutex local_delivery_lock;
+ Cond local_delivery_cond;
+ bool stop_local_delivery;
+ list<pair<Message *, int> > local_messages;
+ class LocalDeliveryThread : public Thread {
+ DispatchQueue *dq;
+ public:
+ LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {}
+ void *entry() {
+ dq->run_local_delivery();
+ return 0;
+ }
+ } local_delivery_thread;
+
uint64_t pre_dispatch(Message *m);
void post_dispatch(Message *m, uint64_t msize);
public:
bool stop;
void local_delivery(Message *m, int priority);
+ void run_local_delivery();
double get_max_age(utime_t now);
cct->_conf->ms_pq_min_cost),
next_pipe_id(1),
dispatch_thread(this),
+ local_delivery_lock("SimpleMessenger::DispatchQueue::local_delivery_lock"),
+ stop_local_delivery(false),
+ local_delivery_thread(this),
stop(false)
{}
};