// ======================================================
-// abstract Message class
+// abstract Connection, for keeping per-connection state
+struct RefCountedObject {
+ atomic_t nref;
+ RefCountedObject() : nref(1) {}
+ virtual ~RefCountedObject() {}
+
+ RefCountedObject *get() {
+ nref.inc();
+ return this;
+ }
+ void put() {
+ if (nref.dec() == 0)
+ delete this;
+ }
+};
+
+struct Connection : public RefCountedObject {
+ atomic_t nref;
+ Mutex lock;
+ RefCountedObject *priv;
+
+public:
+ Connection() : nref(1), lock("Connection::lock"), priv(NULL) {}
+ ~Connection() {
+ if (priv)
+ priv->put();
+ }
+
+ Connection *get() {
+ nref.inc();
+ return this;
+ }
+ void put() {
+ if (nref.dec() == 0)
+ delete this;
+ }
+
+ void set_priv(RefCountedObject *o) {
+ Mutex::Locker l(lock);
+ if (priv)
+ priv->put();
+ priv = o->get();
+ }
+ RefCountedObject *get_priv() {
+ Mutex::Locker l(lock);
+ if (priv)
+ return priv->get();
+ return NULL;
+ }
+};
+
+
+
+// abstract Message class
class Message {
protected:
bufferlist data; // data payload (page-alignment will be preserved where possible)
utime_t recv_stamp;
+ Connection *connection;
friend class Messenger;
-
+
public:
atomic_t nref;
- Message() : nref(0) {
+ Message() : connection(NULL), nref(0) {
memset(&header, 0, sizeof(header));
memset(&footer, 0, sizeof(footer));
};
- Message(int t) : nref(0) {
+ Message(int t) : connection(NULL), nref(0) {
memset(&header, 0, sizeof(header));
header.type = t;
header.priority = 0; // undef
}
virtual ~Message() {
assert(nref.test() == 0);
+ if (connection)
+ connection->put();
}
- void get() {
+ Message *get() {
//int r =
- nref.inc();
+ nref.inc();
//*_dout << dbeginl << "message(" << this << ").get " << (r-1) << " -> " << r << std::endl;
//_dout_end_line();
+ return this;
}
void put() {
int r = nref.dec();
delete this;
}
+ Connection *get_connection() { return connection; }
+ void set_connection(Connection *c) { connection = c; }
+
ceph_msg_header &get_header() { return header; }
void set_header(const ceph_msg_header &e) { header = e; }
void set_footer(const ceph_msg_footer &e) { footer = e; }
class MDS;
class Timer;
+
class Messenger {
private:
Dispatcher *dispatcher;
atomic_t nref;
public:
- Messenger(entity_name_t w) : dispatcher(0),
+ Messenger(entity_name_t w) : dispatcher(0),
default_send_priority(CEPH_MSG_PRIO_DEFAULT),
nref(1) {
_my_name = w;
virtual void destroy() {
put();
}
-
+
// accessors
entity_name_t get_myname() { return _my_name; }
virtual entity_addr_t get_myaddr() = 0;
int state;
protected:
+ Connection *connection_state;
utime_t first_fault; // time of original failure
utime_t last_attempt; // time of last reconnect attempt
sd(-1),
lock("SimpleMessenger::Pipe::lock"),
state(st),
+ connection_state(new Connection),
reader_running(false), writer_running(false),
keepalive(false),
connect_seq(0), peer_global_seq(0),
~Pipe() {
assert(q.empty());
assert(sent.empty());
+ connection_state->put();
}