]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: per connection state
authorSage Weil <sage@newdream.net>
Wed, 9 Sep 2009 19:27:19 +0000 (12:27 -0700)
committerSage Weil <sage@newdream.net>
Wed, 9 Sep 2009 19:27:19 +0000 (12:27 -0700)
src/msg/Message.h
src/msg/Messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 8eef46bf3cf6d51be5c8f9461499c4a3e5132bda..f33c1a75f9fc88e1826b638ac7dddf546014b437 100644 (file)
@@ -134,8 +134,61 @@ using std::list;
 
 // ======================================================
 
-// abstract Message class
+// abstract Connection, for keeping per-connection state
 
+struct RefCountedObject {
+  atomic_t nref;
+  RefCountedObject() : nref(1) {}
+  virtual ~RefCountedObject() {}
+  
+  RefCountedObject *get() {
+    nref.inc();
+    return this;
+  }
+  void put() {
+    if (nref.dec() == 0)
+      delete this;
+  }
+};
+
+struct Connection : public RefCountedObject {
+  atomic_t nref;
+  Mutex lock;
+  RefCountedObject *priv;
+
+public:
+  Connection() : nref(1), lock("Connection::lock"), priv(NULL) {}
+  ~Connection() {
+    if (priv)
+      priv->put();
+  }
+
+  Connection *get() {
+    nref.inc();
+    return this;
+  }
+  void put() {
+    if (nref.dec() == 0)
+      delete this;
+  }
+
+  void set_priv(RefCountedObject *o) {
+    Mutex::Locker l(lock);
+    if (priv)
+      priv->put();
+    priv = o->get();
+  }
+  RefCountedObject *get_priv() {
+    Mutex::Locker l(lock);
+    if (priv)
+      return priv->get();
+    return NULL;
+  }
+};
+
+
+
+// abstract Message class
 
 class Message {
 protected:
@@ -146,17 +199,18 @@ protected:
   bufferlist       data;     // data payload (page-alignment will be preserved where possible)
   
   utime_t recv_stamp;
+  Connection *connection;
 
   friend class Messenger;
-
+  
 public:
   atomic_t nref;
 
-  Message() : nref(0) {
+  Message() : connection(NULL), nref(0) {
     memset(&header, 0, sizeof(header));
     memset(&footer, 0, sizeof(footer));
   };
-  Message(int t) : nref(0) {
+  Message(int t) : connection(NULL), nref(0) {
     memset(&header, 0, sizeof(header));
     header.type = t;
     header.priority = 0;  // undef
@@ -165,13 +219,16 @@ public:
   }
   virtual ~Message() { 
     assert(nref.test() == 0);
+    if (connection)
+      connection->put();
   }
 
-  void get() {
+  Message *get() {
     //int r = 
-      nref.inc();
+    nref.inc();
     //*_dout << dbeginl << "message(" << this << ").get " << (r-1) << " -> " << r << std::endl;
     //_dout_end_line();
+    return this;
   }
   void put() {
     int r = nref.dec();
@@ -181,6 +238,9 @@ public:
       delete this;
   }
 
+  Connection *get_connection() { return connection; }
+  void set_connection(Connection *c) { connection = c; }
   ceph_msg_header &get_header() { return header; }
   void set_header(const ceph_msg_header &e) { header = e; }
   void set_footer(const ceph_msg_footer &e) { footer = e; }
index 77b675bfea02c100bb0d32389ed8eb2f99e9cb7d..14136e8abc494cd89da137213512bd5dc9f8a27b 100644 (file)
@@ -31,6 +31,7 @@ using namespace std;
 class MDS;
 class Timer;
 
+
 class Messenger {
  private:
   Dispatcher          *dispatcher;
@@ -42,7 +43,7 @@ protected:
   atomic_t nref;
 
  public:
-  Messenger(entity_name_t w) : dispatcher(0),
+  Messenger(entity_name_t w) : dispatcher(0), 
                               default_send_priority(CEPH_MSG_PRIO_DEFAULT),
                               nref(1) {
     _my_name = w;
@@ -61,7 +62,7 @@ protected:
   virtual void destroy() {
     put();
   }
-  
+
   // accessors
   entity_name_t get_myname() { return _my_name; }
   virtual entity_addr_t get_myaddr() = 0;
index 189038d1f41520ac118f23606501c3e67caf1eeb..4bee029844033dfb3251c07032ffe7f1e4525f86 100644 (file)
@@ -1253,6 +1253,9 @@ void SimpleMessenger::Pipe::reader()
     else if (tag == CEPH_MSGR_TAG_MSG) {
       dout(20) << "reader got MSG" << dendl;
       Message *m = read_message();
+
+      m->set_connection(connection_state->get());
+
       lock.Lock();
       
       if (!m) {
index f33c2626e2b9dd3502a299c370ccc26b2ff454df..7f3aed16b3f857f9a531fc121d9ac666667099de 100644 (file)
@@ -136,6 +136,7 @@ private:
     int state;
 
   protected:
+    Connection *connection_state;
 
     utime_t first_fault;   // time of original failure
     utime_t last_attempt;  // time of last reconnect attempt
@@ -193,6 +194,7 @@ private:
       sd(-1),
       lock("SimpleMessenger::Pipe::lock"),
       state(st), 
+      connection_state(new Connection),
       reader_running(false), writer_running(false),
       keepalive(false),
       connect_seq(0), peer_global_seq(0),
@@ -201,6 +203,7 @@ private:
     ~Pipe() {
       assert(q.empty());
       assert(sent.empty());
+      connection_state->put();
     }