From: Sage Weil Date: Mon, 21 Sep 2009 21:24:35 +0000 (-0700) Subject: msgr: pass Connection* to msgr error handlers X-Git-Tag: v0.15~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f65d127ee3e6b25c9ed3e3cc5dbf665e710bdf15;p=ceph.git msgr: pass Connection* to msgr error handlers This lets the error handlers get at session state (if any). --- diff --git a/src/ceph.cc b/src/ceph.cc index d32ee9e42c7..8a280544db8 100644 --- a/src/ceph.cc +++ b/src/ceph.cc @@ -359,9 +359,9 @@ class Admin : public Dispatcher { return true; } - void ms_handle_failure(Message *m, const entity_addr_t& addr) {} + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr) {} - bool ms_handle_reset(const entity_addr_t& peer) { + bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { lock.Lock(); if (observe) send_observe_requests(); @@ -371,7 +371,7 @@ class Admin : public Dispatcher { return true; } - void ms_handle_remote_reset(const entity_addr_t& peer) {} + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {} } dispatcher; diff --git a/src/client/Client.cc b/src/client/Client.cc index fc4c7928b64..9e62577f568 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -5675,19 +5675,19 @@ int Client::enumerate_layout(int fd, vector& result, // =============================== -void Client::ms_handle_failure(Message *m, const entity_addr_t& addr) +void Client::ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr) { dout(0) << "ms_handle_failure " << *m << " to " << addr << dendl; } -bool Client::ms_handle_reset(const entity_addr_t& addr) +bool Client::ms_handle_reset(Connection *con, const entity_addr_t& addr) { dout(0) << "ms_handle_reset on " << addr << dendl; return false; } -void Client::ms_handle_remote_reset(const entity_addr_t& addr) +void Client::ms_handle_remote_reset(Connection *con, const entity_addr_t& addr) { dout(0) << "ms_handle_remote_reset on " << addr << dendl; #if 0 diff --git a/src/client/Client.h b/src/client/Client.h index ef4b9128457..e190f835b80 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -1045,9 +1045,9 @@ protected: friend class SyntheticClient; bool ms_dispatch(Message *m); - bool ms_handle_reset(const entity_addr_t& peer); - void ms_handle_failure(Message *m, const entity_addr_t& peer); - void ms_handle_remote_reset(const entity_addr_t& peer); + bool ms_handle_reset(Connection *con, const entity_addr_t& peer); + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer); + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer); public: diff --git a/src/common/LogClient.h b/src/common/LogClient.h index 7eb5402721a..51833e12cdd 100644 --- a/src/common/LogClient.h +++ b/src/common/LogClient.h @@ -36,9 +36,9 @@ class LogClient : public Dispatcher { bool is_synchronous; void _send_log(); - bool ms_handle_reset(const entity_addr_t& peer) { return false; } - void ms_handle_failure(Message *m, const entity_addr_t& peer) { } - void ms_handle_remote_reset(const entity_addr_t& peer) {} + bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; } + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {} public: diff --git a/src/dumpjournal.cc b/src/dumpjournal.cc index 2c0494cf90b..447aaa68997 100644 --- a/src/dumpjournal.cc +++ b/src/dumpjournal.cc @@ -62,9 +62,9 @@ class Dumper : public Dispatcher { } return true; } - bool ms_handle_reset(const entity_addr_t& peer) { return false; } - void ms_handle_failure(Message *m, const entity_addr_t& peer) { } - void ms_handle_remote_reset(const entity_addr_t& peer) {} + bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; } + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {} } dispatcher; diff --git a/src/librados.cc b/src/librados.cc index f4497e2ef9e..a4d4cfce21b 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -59,9 +59,9 @@ class RadosClient : public Dispatcher bool _dispatch(Message *m); bool ms_dispatch(Message *m); - bool ms_handle_reset(const entity_addr_t& peer) { return false; } - void ms_handle_failure(Message *m, const entity_addr_t& peer) { } - void ms_handle_remote_reset(const entity_addr_t& peer) {} + bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; } + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {} Objecter *objecter; diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 20d9d3ce45f..07d38260504 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -1395,21 +1395,21 @@ bool MDS::_dispatch(Message *m) -void MDS::ms_handle_failure(Message *m, const entity_addr_t& addr) +void MDS::ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr) { mds_lock.Lock(); dout(0) << "ms_handle_failure to " << addr << " on " << *m << dendl; mds_lock.Unlock(); } -bool MDS::ms_handle_reset(const entity_addr_t& addr) +bool MDS::ms_handle_reset(Connection *con, const entity_addr_t& addr) { dout(0) << "ms_handle_reset on " << addr << dendl; return false; } -void MDS::ms_handle_remote_reset(const entity_addr_t& addr) +void MDS::ms_handle_remote_reset(Connection *con, const entity_addr_t& addr) { dout(0) << "ms_handle_remote_reset on " << addr << dendl; objecter->ms_handle_remote_reset(addr); diff --git a/src/mds/MDS.h b/src/mds/MDS.h index a0be1128da9..d1c361a5ee1 100644 --- a/src/mds/MDS.h +++ b/src/mds/MDS.h @@ -361,9 +361,9 @@ class MDS : public Dispatcher { // messages bool _dispatch(Message *m); - bool ms_handle_reset(const entity_addr_t& peer); - void ms_handle_failure(Message *m, const entity_addr_t& peer); - void ms_handle_remote_reset(const entity_addr_t& peer); + bool ms_handle_reset(Connection *con, const entity_addr_t& peer); + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer); + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer); // special message types void handle_mds_map(class MMDSMap *m); diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 798e80277c1..ec8e9ab94cc 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -313,7 +313,7 @@ void MonClient::_reopen_session() } } -bool MonClient::ms_handle_reset(const entity_addr_t& peer) +bool MonClient::ms_handle_reset(Connection *con, const entity_addr_t& peer) { dout(10) << "ms_handle_reset " << peer << dendl; if (hunting) diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index 74edf8874d8..6ff016ddf3b 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -44,10 +44,10 @@ private: SafeTimer timer; bool ms_dispatch(Message *m); - bool ms_handle_reset(const entity_addr_t& peer); + bool ms_handle_reset(Connection *con, const entity_addr_t& peer); - void ms_handle_failure(Message *m, const entity_addr_t& peer) { } - void ms_handle_remote_reset(const entity_addr_t& peer) {} + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {} void handle_monmap(MMonMap *m); diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 83281b54ccc..4413ec9c92e 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -167,9 +167,9 @@ public: private: bool ms_dispatch(Message *m); - bool ms_handle_reset(const entity_addr_t& peer) { return false; } - void ms_handle_failure(Message *m, const entity_addr_t& peer) { } - void ms_handle_remote_reset(const entity_addr_t& peer) {} + bool ms_handle_reset(Connection *con, const entity_addr_t& peer); + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {} public: Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map); diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index a999a856f54..822a8f33c4e 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -30,18 +30,18 @@ public: virtual bool ms_dispatch(Message *m) = 0; // how i deal with transmission failures. - virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) = 0; + virtual void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr) = 0; /* * on any connection reset. * this indicates that the ordered+reliable delivery semantics have * been violated. messages may have been lost. */ - virtual bool ms_handle_reset(const entity_addr_t& peer) = 0; + virtual bool ms_handle_reset(Connection *con, const entity_addr_t& peer) = 0; // on deliberate reset of connection by remote // implies incoming messages dropped; possibly/probably some of our previous outgoing too. - virtual void ms_handle_remote_reset(const entity_addr_t& peer) = 0; + virtual void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) = 0; }; #endif diff --git a/src/msg/Message.h b/src/msg/Message.h index f33c1a75f9f..19fce70db12 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -142,10 +142,12 @@ struct RefCountedObject { virtual ~RefCountedObject() {} RefCountedObject *get() { + //generic_dout(0) << "RefCountedObject::get " << this << " " << nref.test() << " -> " << (nref.test() + 1) << dendl; nref.inc(); return this; } void put() { + //generic_dout(0) << "RefCountedObject::put " << this << " " << nref.test() << " -> " << (nref.test() - 1) << dendl; if (nref.dec() == 0) delete this; } @@ -159,8 +161,11 @@ struct Connection : public RefCountedObject { public: Connection() : nref(1), lock("Connection::lock"), priv(NULL) {} ~Connection() { - if (priv) + //generic_dout(0) << "~Connection " << this << dendl; + if (priv) { + //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl; priv->put(); + } } Connection *get() { @@ -176,7 +181,7 @@ public: Mutex::Locker l(lock); if (priv) priv->put(); - priv = o->get(); + priv = o; } RefCountedObject *get_priv() { Mutex::Locker l(lock); diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index c41c2d0663c..855dc3477cf 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -33,7 +33,7 @@ class Timer; class Messenger { - private: +private: list dispatchers; protected: @@ -104,24 +104,24 @@ protected: << dendl; assert(0); } - void ms_deliver_handle_reset(const entity_addr_t& peer) { + void ms_deliver_handle_reset(Connection *con, const entity_addr_t& peer) { for (list::iterator p = dispatchers.begin(); p != dispatchers.end(); p++) - if ((*p)->ms_handle_reset(peer)) + if ((*p)->ms_handle_reset(con, peer)) return; } - void ms_deliver_handle_remote_reset(const entity_addr_t& peer) { + void ms_deliver_handle_remote_reset(Connection *con, const entity_addr_t& peer) { for (list::iterator p = dispatchers.begin(); p != dispatchers.end(); p++) - (*p)->ms_handle_remote_reset(peer); + (*p)->ms_handle_remote_reset(con, peer); } - void ms_deliver_handle_failure(Message *m, const entity_addr_t& peer) { + void ms_deliver_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { for (list::iterator p = dispatchers.begin(); p != dispatchers.end(); p++) - (*p)->ms_handle_failure(m, peer); + (*p)->ms_handle_failure(con, m, peer); } // shutdown diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 615928037be..ff04a52cb35 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -290,23 +290,29 @@ void SimpleMessenger::Endpoint::dispatch_entry() ls.pop_front(); if ((long)m == BAD_REMOTE_RESET) { lock.Lock(); - entity_addr_t a = remote_reset_q.front(); + Connection *con = remote_reset_q.front().first; + entity_addr_t a = remote_reset_q.front().second; remote_reset_q.pop_front(); lock.Unlock(); - ms_deliver_handle_remote_reset(a); + ms_deliver_handle_remote_reset(con, a); + con->put(); } else if ((long)m == BAD_RESET) { lock.Lock(); - entity_addr_t a = reset_q.front(); + Connection *con = reset_q.front().first; + entity_addr_t a = reset_q.front().second; reset_q.pop_front(); lock.Unlock(); - ms_deliver_handle_reset(a); + ms_deliver_handle_reset(con, a); + con->put(); } else if ((long)m == BAD_FAILED) { lock.Lock(); - m = failed_q.front().first; - entity_addr_t a = failed_q.front().second; + Connection *con = failed_q.front().con; + m = failed_q.front().msg; + entity_addr_t a = failed_q.front().addr; failed_q.pop_front(); lock.Unlock(); - ms_deliver_handle_failure(m, a); + ms_deliver_handle_failure(con, m, a); + con->put(); m->put(); } else { dout(1) << "<== " << m->get_source_inst() @@ -1183,7 +1189,7 @@ void SimpleMessenger::Pipe::fail() for (unsigned i=0; ilocal.size(); i++) if (rank->local[i]) - rank->local[i]->queue_reset(peer_addr); + rank->local[i]->queue_reset(connection_state->get(), peer_addr); // unregister lock.Unlock(); @@ -1201,7 +1207,7 @@ void SimpleMessenger::Pipe::was_session_reset() report_failures(); for (unsigned i=0; ilocal.size(); i++) if (rank->local[i]) - rank->local[i]->queue_remote_reset(peer_addr); + rank->local[i]->queue_remote_reset(connection_state->get(), peer_addr); out_seq = 0; in_seq = 0; @@ -1225,7 +1231,7 @@ void SimpleMessenger::Pipe::report_failures() dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl; } else { dout(10) << "fail on " << *m << dendl; - rank->local[srcrank]->queue_failure(m, peer_addr); + rank->local[srcrank]->queue_failure(connection_state->get(), m, peer_addr); } } m->put(); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 531708ed9ee..b8ecea1dc29 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -323,28 +323,34 @@ private: } enum { BAD_REMOTE_RESET, BAD_RESET, BAD_FAILED }; - list remote_reset_q; - list reset_q; - list > failed_q; + list > remote_reset_q; + list > reset_q; + struct fail_item { + Connection *con; + Message *msg; + entity_addr_t addr; + fail_item(Connection *c, Message *m, entity_addr_t a) : con(c), msg(m), addr(a) {} + }; + list failed_q; - void queue_remote_reset(entity_addr_t a) { + void queue_remote_reset(Connection *con, entity_addr_t a) { lock.Lock(); - remote_reset_q.push_back(a); + remote_reset_q.push_back(pair(con, a)); dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET); cond.Signal(); lock.Unlock(); } - void queue_reset(entity_addr_t a) { + void queue_reset(Connection *con, entity_addr_t a) { lock.Lock(); - reset_q.push_back(a); + reset_q.push_back(pair(con, a)); dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET); cond.Signal(); lock.Unlock(); } - void queue_failure(Message *m, entity_addr_t a) { + void queue_failure(Connection *con, Message *m, entity_addr_t a) { lock.Lock(); m->get(); - failed_q.push_back(pair(m, a)); + failed_q.push_back(fail_item(con, m, a)); dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_FAILED); cond.Signal(); lock.Unlock(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 5a1b88d3b77..5ba0f198e79 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -207,9 +207,9 @@ public: bool ms_dispatch(Message *m) { return osd->heartbeat_dispatch(m); }; - bool ms_handle_reset(const entity_addr_t& peer) { return false; } - void ms_handle_failure(Message *m, const entity_addr_t& peer) { } - void ms_handle_remote_reset(const entity_addr_t& peer) {} + bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; } + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {} public: OSD *osd; HeartbeatDispatcher(OSD *o) : osd(o) {} @@ -816,9 +816,9 @@ protected: private: bool ms_dispatch(Message *m); - bool ms_handle_reset(const entity_addr_t& peer) { return false; } - void ms_handle_failure(Message *m, const entity_addr_t& peer) { } - void ms_handle_remote_reset(const entity_addr_t& peer) {} + bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; } + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {} public: OSD(int id, Messenger *m, Messenger *hbm, MonClient *mc, const char *dev = 0, const char *jdev = 0); diff --git a/src/testmsgr.cc b/src/testmsgr.cc index 368d34dd7b3..f4f15c329db 100644 --- a/src/testmsgr.cc +++ b/src/testmsgr.cc @@ -57,9 +57,9 @@ class Admin : public Dispatcher { return true; } - bool ms_handle_reset(const entity_addr_t& peer) { return false; } - void ms_handle_failure(Message *m, const entity_addr_t& peer) { } - void ms_handle_remote_reset(const entity_addr_t& peer) {} + bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; } + void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { } + void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {} } dispatcher;