timer.add_event_after(1.0, new C_Tick(this));
-
- // finishers?
- finished_lock.Lock();
- if (finished.empty()) {
- finished_lock.Unlock();
- } else {
- list<Message*> waiting;
- waiting.splice(waiting.begin(), finished);
-
- finished_lock.Unlock();
- osd_lock.Unlock();
-
- for (list<Message*>::iterator it = waiting.begin();
- it != waiting.end();
- it++) {
- dispatch(*it);
- }
-
- osd_lock.Lock();
- }
+ do_waiters();
}
// =========================================
return true;
}
-
// lock!
osd_lock.Lock();
- dout(20) << "dispatch " << m << dendl;
+ _dispatch(m);
+ do_waiters();
+ osd_lock.Unlock();
+ return true;
+}
+
+void OSD::do_waiters()
+{
+ assert(osd_lock.is_locked());
+
+ finished_lock.Lock();
+ if (finished.empty()) {
+ finished_lock.Unlock();
+ } else {
+ list<Message*> waiting;
+ waiting.splice(waiting.begin(), finished);
+
+ finished_lock.Unlock();
+
+ dout(2) << "do_waiters -- start" << dendl;
+ for (list<Message*>::iterator it = waiting.begin();
+ it != waiting.end();
+ it++)
+ _dispatch(*it);
+ dout(2) << "do_waiters -- finish" << dendl;
+ }
+}
+
+
+void OSD::_dispatch(Message *m)
+{
+ assert(osd_lock.is_locked());
+ dout(20) << "_dispatch " << m << " " << *m << dendl;
switch (m->get_type()) {
handle_sub_op_reply((MOSDSubOpReply*)m);
break;
-
- default:
- return false;
}
}
}
-
- // finishers?
- finished_lock.Lock();
- if (!finished.empty()) {
- list<Message*> waiting;
- waiting.splice(waiting.begin(), finished);
-
- finished_lock.Unlock();
- osd_lock.Unlock();
-
- while (!waiting.empty()) {
- dout(20) << "doing finished " << waiting.front() << dendl;
- dispatch(waiting.front());
- waiting.pop_front();
- }
- return true;
- }
-
- finished_lock.Unlock();
- osd_lock.Unlock();
- return true;
}
state = STATE_ACTIVE;
- wait_for_no_ops();
+ // pause, requeue op queue
+ //wait_for_no_ops();
+ op_tp.pause();
+ op_wq.lock();
+ list<Message*> rq;
+ while (!op_queue.empty()) {
+ PG *pg = op_queue.back();
+ op_queue.pop_back();
+ Message *m = pg->op_queue.back();
+ pg->op_queue.pop_back();
+ pg->put();
+ dout(15) << " will requeue " << *m << dendl;
+ rq.push_front(m);
+ }
+ op_wq.unlock();
+ push_waiters(rq);
+
recovery_tp.pause();
disk_tp.pause_new(); // _process() may be waiting for a replica message
map_lock.get_write();
map_lock.put_write();
+ op_tp.unpause();
recovery_tp.unpause();
disk_tp.unpause();
void tick();
+ void _dispatch(Message *m);
+
public:
int get_nodeid() { return whoami; }
finished.splice(finished.end(), ls);
finished_lock.Unlock();
}
+ void push_waiters(list<class Message*>& ls) {
+ finished_lock.Lock();
+ finished.splice(finished.begin(), ls);
+ finished_lock.Unlock();
+ }
+ void do_waiters();
// -- op queue --
deque<PG*> op_queue;