From 802d0e269123b0cc54004bb9a96895a33827d7ab Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 20 Nov 2008 11:31:12 -0800 Subject: [PATCH] msgr: ref count message while they are owned by the messenger Users still assume they hold the only reference, at least until they call send_message. One caveat is that ms_handle_failure is passed a message with an unknown number of refs. The method should not try to free or re-use the message. --- src/include/atomic.h | 5 +++-- src/msg/Dispatcher.h | 2 +- src/msg/Message.h | 25 ++++++++++++++++++++++--- src/msg/SimpleMessenger.cc | 15 +++++++++++---- src/msg/SimpleMessenger.h | 6 +++++- src/osd/OSD.cc | 16 +++------------- src/testmsgr.cc | 2 -- src/vstart.sh | 2 +- 8 files changed, 46 insertions(+), 27 deletions(-) diff --git a/src/include/atomic.h b/src/include/atomic.h index f4f085a9ca526..7567dde354639 100644 --- a/src/include/atomic.h +++ b/src/include/atomic.h @@ -48,10 +48,11 @@ class atomic_t { public: atomic_t(int i=0) : lock("atomic_t::lock", false, false /* no lockdep */), nref(i) {} atomic_t(const atomic_t& other); - void inc() { + int inc() { lock.Lock(); - ++nref; + int r = ++nref; lock.Unlock(); + return r; } int dec() { lock.Lock(); diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index 42dedf2392726..cbaf7aaaaa591 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -28,7 +28,7 @@ class Dispatcher { virtual void dispatch(Message *m) = 0; // how i deal with transmission failures. - virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) { delete m; } + virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) { } /* * on any connection reset. diff --git a/src/msg/Message.h b/src/msg/Message.h index 3e8982c909a77..71f0be67b931a 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -105,6 +105,7 @@ using std::list; #include "include/buffer.h" #include "msg_types.h" +#include "common/debug.h" @@ -125,18 +126,36 @@ protected: friend class Messenger; public: - Message() { + atomic_t nref; + + Message() : nref(0) { memset(&header, 0, sizeof(header)); memset(&footer, 0, sizeof(footer)); }; - Message(int t) { + Message(int t) : nref(0) { memset(&header, 0, sizeof(header)); header.type = t; header.priority = 0; // undef header.data_off = 0; memset(&footer, 0, sizeof(footer)); } - virtual ~Message() { } + virtual ~Message() { + assert(nref.test() == 0); + } + + void get() { + //int r = + nref.inc(); + //*_dout << dbeginl << "message(" << this << ").get " << (r-1) << " -> " << r << std::endl; + //_dout_end_line(); + } + void put() { + int r = nref.dec(); + //*_dout << dbeginl << "message(" << this << ").put " << (r+1) << " -> " << r << std::endl; + //_dout_end_line(); + if (r == 0) + delete this; + } ceph_msg_header &get_header() { return header; } void set_header(const ceph_msg_header &e) { header = e; } diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 76cf703385ab0..5775774880dbe 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -448,6 +448,8 @@ void Rank::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy) { const entity_name_t dest = m->get_dest(); + assert(m->nref.test() == 0); + // lookup entity_addr_t dest_proc_addr = dest_addr; dest_proc_addr.erank = 0; @@ -623,6 +625,7 @@ void Rank::EntityMessenger::dispatch_entry() failed_q.pop_front(); lock.Unlock(); get_dispatcher()->ms_handle_failure(m, i); + m->put(); } else { dout(1) << m->get_dest() << " <== " << m->get_source_inst() @@ -1454,15 +1457,14 @@ void Rank::Pipe::report_failures() unsigned srcrank = m->get_source_inst().addr.erank; if (srcrank >= rank.max_local || rank.local[srcrank] == 0) { dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl; - delete m; } else if (rank.local[srcrank]->is_stopped()) { dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl; - delete m; } else { dout(10) << "fail on " << *m << dendl; rank.local[srcrank]->queue_failure(m, m->get_dest_inst()); } } + m->put(); } } @@ -1530,7 +1532,7 @@ void Rank::Pipe::reader() sent.pop_front(); dout(10) << "reader got ack seq " << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl; - delete m; + m->put(); } } continue; @@ -1589,6 +1591,7 @@ void Rank::Pipe::reader() if (erank < rank.max_local && rank.local[erank]) { // find entity entity = rank.local[erank]; + entity->get(); // first message? if (entity->need_addr) { @@ -1610,8 +1613,10 @@ void Rank::Pipe::reader() } rank.lock.Unlock(); - if (entity) + if (entity) { entity->queue_message(m); // queue + entity->put(); + } lock.Lock(); } @@ -1723,6 +1728,7 @@ void Rank::Pipe::writer() if (m) { m->set_seq(++out_seq); sent.push_back(m); // move to sent list + m->get(); lock.Unlock(); dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; @@ -1741,6 +1747,7 @@ void Rank::Pipe::writer() << errno << ": " << strerror(errno) << dendl; fault(); } + m->put(); } continue; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index b9a107c8ab1ba..b7ed36600da41 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -227,6 +227,7 @@ private: lock.Unlock(); } void _send(Message *m) { + m->get(); q[m->get_priority()].push_back(m); last_dest_name = m->get_dest(); cond.Signal(); @@ -282,7 +283,9 @@ private: void queue_message(Message *m) { // set recv stamp m->set_recv_stamp(g_clock.now()); - + + assert(m->nref.test() == 0); + lock.Lock(); qlen++; dispatch_queue[m->get_priority()].push_back(m); @@ -311,6 +314,7 @@ private: } void queue_failure(Message *m, entity_inst_t i) { lock.Lock(); + m->get(); failed_q.push_back(pair(m,i)); dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_FAILED); cond.Signal(); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 07a1289b0f0be..c1d10d06edadf 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1422,19 +1422,9 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst) { entity_name_t dest = inst.name; - if (g_conf.ms_die_on_failure) { - dout(0) << "ms_handle_failure " << inst << " on " << *m << dendl; - exit(0); - } - - if (is_stopping()) { - delete m; - return; - } - - dout(1) << "ms_handle_failure " << inst - << ", dropping " << *m << dendl; - delete m; + dout(1) << "ms_handle_failure " << inst << " on " << *m << dendl; + if (g_conf.ms_die_on_failure) + assert(0); } diff --git a/src/testmsgr.cc b/src/testmsgr.cc index fd83c7068177f..2ab1ed8b659b2 100644 --- a/src/testmsgr.cc +++ b/src/testmsgr.cc @@ -57,8 +57,6 @@ class Admin : public Dispatcher { } void ms_handle_failure(Message *m, const entity_inst_t& inst) { - - } } dispatcher; diff --git a/src/vstart.sh b/src/vstart.sh index 732726216d160..c9e17c0ebc016 100755 --- a/src/vstart.sh +++ b/src/vstart.sh @@ -118,7 +118,7 @@ done # mds for mds in `seq 0 $((CEPH_NUM_MDS-1))` do - $CEPH_BIN/crun $norestart $CEPH_BIN/cmds $ARGS $CMDS_ARGS & + $CEPH_BIN/crun $norestart $valgrind $CEPH_BIN/cmds $ARGS $CMDS_ARGS & #valgrind --tool=massif $CEPH_BIN/cmds $ARGS --mds_log_max_segments 2 --mds_thrash_fragments 0 --mds_thrash_exports 0 > m #--debug_ms 20 #$CEPH_BIN/cmds -d $ARGS --mds_thrash_fragments 0 --mds_thrash_exports 0 #--debug_ms 20 -- 2.39.5