]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: ref count message while they are owned by the messenger
authorSage Weil <sage@newdream.net>
Thu, 20 Nov 2008 19:31:12 +0000 (11:31 -0800)
committerSage Weil <sage@newdream.net>
Thu, 20 Nov 2008 19:43:54 +0000 (11:43 -0800)
Users still assume they hold the only reference, at least until
they call send_message.

One caveat is that ms_handle_failure is passed a message with an
unknown number of refs.  The method should not try to free or
re-use the message.

src/include/atomic.h
src/msg/Dispatcher.h
src/msg/Message.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h
src/osd/OSD.cc
src/testmsgr.cc
src/vstart.sh

index f4f085a9ca5262fdecf5edc25694d77ad446eb1b..7567dde35463949c58fc3048a02ce77d2d0f6395 100644 (file)
@@ -48,10 +48,11 @@ class atomic_t {
 public:
   atomic_t(int i=0) : lock("atomic_t::lock", false, false /* no lockdep */), nref(i) {}
   atomic_t(const atomic_t& other);
-  void inc() { 
+  int inc() { 
     lock.Lock();
-    ++nref;
+    int r = ++nref;
     lock.Unlock();
+    return r;
   }
   int dec() {
     lock.Lock();
index 42dedf23927261fefc24e7c2af2b2784220579d0..cbaf7aaaaa591c8b4216d646c924b18e7018a0bf 100644 (file)
@@ -28,7 +28,7 @@ class Dispatcher {
   virtual void dispatch(Message *m) = 0;
 
   // how i deal with transmission failures.
-  virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) { delete m; }
+  virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) {  }
 
   /*
    * on any connection reset.
index 3e8982c909a77867301375a69800a301fa31f78b..71f0be67b931a6bcf154925d1a7764f81872a6d7 100644 (file)
@@ -105,6 +105,7 @@ using std::list;
 #include "include/buffer.h"
 #include "msg_types.h"
 
+#include "common/debug.h"
 
 
 
@@ -125,18 +126,36 @@ protected:
   friend class Messenger;
 
 public:
-  Message() {
+  atomic_t nref;
+
+  Message() : nref(0) {
     memset(&header, 0, sizeof(header));
     memset(&footer, 0, sizeof(footer));
   };
-  Message(int t) {
+  Message(int t) : nref(0) {
     memset(&header, 0, sizeof(header));
     header.type = t;
     header.priority = 0;  // undef
     header.data_off = 0;
     memset(&footer, 0, sizeof(footer));
   }
-  virtual ~Message() { }
+  virtual ~Message() { 
+    assert(nref.test() == 0);
+  }
+
+  void get() {
+    //int r = 
+      nref.inc();
+    //*_dout << dbeginl << "message(" << this << ").get " << (r-1) << " -> " << r << std::endl;
+    //_dout_end_line();
+  }
+  void put() {
+    int r = nref.dec();
+    //*_dout << dbeginl << "message(" << this << ").put " << (r+1) << " -> " << r << std::endl;
+    //_dout_end_line();
+    if (r == 0)
+      delete this;
+  }
 
   ceph_msg_header &get_header() { return header; }
   void set_header(const ceph_msg_header &e) { header = e; }
index 76cf703385ab0b9a37ba077625245f6f60436c59..5775774880dbe07edee5e339facb006c95ff0cf2 100644 (file)
@@ -448,6 +448,8 @@ void Rank::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy)
 {
   const entity_name_t dest = m->get_dest();
 
+  assert(m->nref.test() == 0);
+
   // lookup
   entity_addr_t dest_proc_addr = dest_addr;
   dest_proc_addr.erank = 0;
@@ -623,6 +625,7 @@ void Rank::EntityMessenger::dispatch_entry()
            failed_q.pop_front();
            lock.Unlock();
            get_dispatcher()->ms_handle_failure(m, i);
+           m->put();
          } else {
            dout(1) << m->get_dest() 
                    << " <== " << m->get_source_inst()
@@ -1454,15 +1457,14 @@ void Rank::Pipe::report_failures()
       unsigned srcrank = m->get_source_inst().addr.erank;
       if (srcrank >= rank.max_local || rank.local[srcrank] == 0) {
        dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl;
-       delete m;
       } else if (rank.local[srcrank]->is_stopped()) {
        dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
-       delete m;
       } else {
        dout(10) << "fail on " << *m << dendl;
        rank.local[srcrank]->queue_failure(m, m->get_dest_inst());
       }
     }
+    m->put();
   }
 }
 
@@ -1530,7 +1532,7 @@ void Rank::Pipe::reader()
          sent.pop_front();
          dout(10) << "reader got ack seq " 
                    << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
-         delete m;
+         m->put();
        }
       }
       continue;
@@ -1589,6 +1591,7 @@ void Rank::Pipe::reader()
        if (erank < rank.max_local && rank.local[erank]) {
          // find entity
          entity = rank.local[erank];
+         entity->get();
 
          // first message?
          if (entity->need_addr) {
@@ -1610,8 +1613,10 @@ void Rank::Pipe::reader()
       }
       rank.lock.Unlock();
       
-      if (entity)
+      if (entity) {
        entity->queue_message(m);        // queue
+       entity->put();
+      }
 
       lock.Lock();
     } 
@@ -1723,6 +1728,7 @@ void Rank::Pipe::writer()
       if (m) {
        m->set_seq(++out_seq);
        sent.push_back(m); // move to sent list
+       m->get();
        lock.Unlock();
 
         dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
@@ -1741,6 +1747,7 @@ void Rank::Pipe::writer()
                  << errno << ": " << strerror(errno) << dendl;
          fault();
         }
+       m->put();
       }
       continue;
     }
index b9a107c8ab1ba2f4329886b94073e53e39c65938..b7ed36600da419b285203b5a5883b791cf196388 100644 (file)
@@ -227,6 +227,7 @@ private:
       lock.Unlock();
     }    
     void _send(Message *m) {
+      m->get();
       q[m->get_priority()].push_back(m);
       last_dest_name = m->get_dest();
       cond.Signal();
@@ -282,7 +283,9 @@ private:
     void queue_message(Message *m) {
       // set recv stamp
       m->set_recv_stamp(g_clock.now());
-      
+
+      assert(m->nref.test() == 0);
+
       lock.Lock();
       qlen++;
       dispatch_queue[m->get_priority()].push_back(m);
@@ -311,6 +314,7 @@ private:
     }
     void queue_failure(Message *m, entity_inst_t i) {
       lock.Lock();
+      m->get();
       failed_q.push_back(pair<Message*,entity_inst_t>(m,i));
       dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_FAILED);
       cond.Signal();
index 07a1289b0f0be77e2780596d4eaa0c9a918a90bb..c1d10d06edadf8ca26160c487fb9fe808ea50438 100644 (file)
@@ -1422,19 +1422,9 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst)
 {
   entity_name_t dest = inst.name;
 
-  if (g_conf.ms_die_on_failure) {
-    dout(0) << "ms_handle_failure " << inst << " on " << *m << dendl;
-    exit(0);
-  }
-
-  if (is_stopping()) {
-    delete m;
-    return;
-  }
-
-  dout(1) << "ms_handle_failure " << inst 
-         << ", dropping " << *m << dendl;
-  delete m;
+  dout(1) << "ms_handle_failure " << inst << " on " << *m << dendl;
+  if (g_conf.ms_die_on_failure)
+    assert(0);
 }
 
 
index fd83c7068177f602a88c38f5c7d9b4f065bd25b4..2ab1ed8b659b24d3f028e4d41c743ae3a158068b 100644 (file)
@@ -57,8 +57,6 @@ class Admin : public Dispatcher {
   }
 
   void ms_handle_failure(Message *m, const entity_inst_t& inst) { 
-    
-
   }
 
 } dispatcher;
index 732726216d160bbbaee5f112e79b46309ad34c62..c9e17c0ebc016927bf2960afd5c76c418d37bdcc 100755 (executable)
@@ -118,7 +118,7 @@ done
 # mds
 for mds in `seq 0 $((CEPH_NUM_MDS-1))`
 do
-    $CEPH_BIN/crun $norestart $CEPH_BIN/cmds $ARGS $CMDS_ARGS &
+    $CEPH_BIN/crun $norestart $valgrind $CEPH_BIN/cmds $ARGS $CMDS_ARGS &
 
 #valgrind --tool=massif $CEPH_BIN/cmds $ARGS --mds_log_max_segments 2 --mds_thrash_fragments 0 --mds_thrash_exports 0 > m  #--debug_ms 20
 #$CEPH_BIN/cmds -d $ARGS --mds_thrash_fragments 0 --mds_thrash_exports 0 #--debug_ms 20