From 832fd0e7d51c472416d95afd9c3bd4eae333b709 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 Jan 2008 15:22:12 -0800 Subject: [PATCH] handle_ms_failure in dispatch thread too --- src/client/Client.cc | 5 ----- src/msg/SimpleMessenger.cc | 16 ++++++++++++---- src/msg/SimpleMessenger.h | 13 +++++++++++-- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index cd2a654784160..fa8b093d27c66 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -4010,12 +4010,7 @@ void Client::ms_handle_failure(Message *m, const entity_inst_t& inst) else if (dest.is_osd()) { objecter->ms_handle_failure(m, dest, inst); } - else if (dest.is_mds()) { - dout(0) << "ms_handle_failure " << *m << " to " << inst << dendl; - //failed_mds.insert(dest.num()); - } else { - // client? dout(0) << "ms_handle_failure " << *m << " to " << inst << ", dropping" << dendl; delete m; } diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 5bbc9e93bda49..0950994accd89 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -525,20 +525,27 @@ void Rank::EntityMessenger::dispatch_entry() } Message *m = ls.front(); ls.pop_front(); - if (m == 0) { + if ((long)m == BAD_REMOTE_RESET) { lock.Lock(); entity_addr_t a = remote_reset_q.front().first; entity_name_t n = remote_reset_q.front().second; remote_reset_q.pop_front(); lock.Unlock(); get_dispatcher()->ms_handle_remote_reset(a, n); - } else if ((long)m == 1) { + } else if ((long)m == BAD_RESET) { lock.Lock(); entity_addr_t a = reset_q.front().first; entity_name_t n = reset_q.front().second; - remote_reset_q.pop_front(); + reset_q.pop_front(); lock.Unlock(); get_dispatcher()->ms_handle_reset(a, n); + } else if ((long)m == BAD_FAILED) { + lock.Lock(); + m = failed_q.front().first; + entity_inst_t i = failed_q.front().second; + failed_q.pop_front(); + lock.Unlock(); + get_dispatcher()->ms_handle_failure(m, i); } else { dout(1) << m->get_dest() << " <== " << m->get_source_inst() @@ -925,6 +932,7 @@ int Rank::Pipe::connect() out_seq = 0; for (list::iterator p = q.begin(); p != q.end(); p++) (*p)->set_seq(++out_seq); + in_seq = 0; } else { dout(0) << "WTF" << dendl; assert(0); @@ -1050,7 +1058,7 @@ void Rank::Pipe::report_failures() dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl; } else { dout(10) << "fail on " << *m << dendl; - rank.local[srcrank]->get_dispatcher()->ms_handle_failure(m, m->get_dest_inst()); + rank.local[srcrank]->queue_failure(m, m->get_dest_inst()); } } delete m; diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 47f8aa5d6b72b..11b6be6d21601 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -244,20 +244,29 @@ private: lock.Unlock(); } + enum { BAD_REMOTE_RESET, BAD_RESET, BAD_FAILED }; list > remote_reset_q; list > reset_q; + list > failed_q; void queue_remote_reset(entity_addr_t a, entity_name_t n) { lock.Lock(); remote_reset_q.push_back(pair(a,n)); - dispatch_queue.push_back((Message*)0); + dispatch_queue.push_back((Message*)BAD_REMOTE_RESET); cond.Signal(); lock.Unlock(); } void queue_reset(entity_addr_t a, entity_name_t n) { lock.Lock(); reset_q.push_back(pair(a,n)); - dispatch_queue.push_back((Message*)1); + dispatch_queue.push_back((Message*)BAD_RESET); + cond.Signal(); + lock.Unlock(); + } + void queue_failure(Message *m, entity_inst_t i) { + lock.Lock(); + failed_q.push_back(pair(m,i)); + dispatch_queue.push_back((Message*)BAD_FAILED); cond.Signal(); lock.Unlock(); } -- 2.39.5