From: Sage Weil Date: Wed, 9 Sep 2009 19:27:19 +0000 (-0700) Subject: msgr: per connection state X-Git-Tag: v0.15~95 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7026e35d898516c954fa64529ba013dfaae118a5;p=ceph.git msgr: per connection state --- diff --git a/src/msg/Message.h b/src/msg/Message.h index 8eef46bf3cf..f33c1a75f9f 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -134,8 +134,61 @@ using std::list; // ====================================================== -// abstract Message class +// abstract Connection, for keeping per-connection state +struct RefCountedObject { + atomic_t nref; + RefCountedObject() : nref(1) {} + virtual ~RefCountedObject() {} + + RefCountedObject *get() { + nref.inc(); + return this; + } + void put() { + if (nref.dec() == 0) + delete this; + } +}; + +struct Connection : public RefCountedObject { + atomic_t nref; + Mutex lock; + RefCountedObject *priv; + +public: + Connection() : nref(1), lock("Connection::lock"), priv(NULL) {} + ~Connection() { + if (priv) + priv->put(); + } + + Connection *get() { + nref.inc(); + return this; + } + void put() { + if (nref.dec() == 0) + delete this; + } + + void set_priv(RefCountedObject *o) { + Mutex::Locker l(lock); + if (priv) + priv->put(); + priv = o->get(); + } + RefCountedObject *get_priv() { + Mutex::Locker l(lock); + if (priv) + return priv->get(); + return NULL; + } +}; + + + +// abstract Message class class Message { protected: @@ -146,17 +199,18 @@ protected: bufferlist data; // data payload (page-alignment will be preserved where possible) utime_t recv_stamp; + Connection *connection; friend class Messenger; - + public: atomic_t nref; - Message() : nref(0) { + Message() : connection(NULL), nref(0) { memset(&header, 0, sizeof(header)); memset(&footer, 0, sizeof(footer)); }; - Message(int t) : nref(0) { + Message(int t) : connection(NULL), nref(0) { memset(&header, 0, sizeof(header)); header.type = t; header.priority = 0; // undef @@ -165,13 +219,16 @@ public: } virtual ~Message() { assert(nref.test() == 0); + if (connection) + connection->put(); } - void get() { + Message *get() { //int r = - nref.inc(); + nref.inc(); //*_dout << dbeginl << "message(" << this << ").get " << (r-1) << " -> " << r << std::endl; //_dout_end_line(); + return this; } void put() { int r = nref.dec(); @@ -181,6 +238,9 @@ public: delete this; } + Connection *get_connection() { return connection; } + void set_connection(Connection *c) { connection = c; } + ceph_msg_header &get_header() { return header; } void set_header(const ceph_msg_header &e) { header = e; } void set_footer(const ceph_msg_footer &e) { footer = e; } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 77b675bfea0..14136e8abc4 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -31,6 +31,7 @@ using namespace std; class MDS; class Timer; + class Messenger { private: Dispatcher *dispatcher; @@ -42,7 +43,7 @@ protected: atomic_t nref; public: - Messenger(entity_name_t w) : dispatcher(0), + Messenger(entity_name_t w) : dispatcher(0), default_send_priority(CEPH_MSG_PRIO_DEFAULT), nref(1) { _my_name = w; @@ -61,7 +62,7 @@ protected: virtual void destroy() { put(); } - + // accessors entity_name_t get_myname() { return _my_name; } virtual entity_addr_t get_myaddr() = 0; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 189038d1f41..4bee0298440 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1253,6 +1253,9 @@ void SimpleMessenger::Pipe::reader() else if (tag == CEPH_MSGR_TAG_MSG) { dout(20) << "reader got MSG" << dendl; Message *m = read_message(); + + m->set_connection(connection_state->get()); + lock.Lock(); if (!m) { diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index f33c2626e2b..7f3aed16b3f 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -136,6 +136,7 @@ private: int state; protected: + Connection *connection_state; utime_t first_fault; // time of original failure utime_t last_attempt; // time of last reconnect attempt @@ -193,6 +194,7 @@ private: sd(-1), lock("SimpleMessenger::Pipe::lock"), state(st), + connection_state(new Connection), reader_running(false), writer_running(false), keepalive(false), connect_seq(0), peer_global_seq(0), @@ -201,6 +203,7 @@ private: ~Pipe() { assert(q.empty()); assert(sent.empty()); + connection_state->put(); }