]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: pass Connection* to msgr error handlers
authorSage Weil <sage@newdream.net>
Mon, 21 Sep 2009 21:24:35 +0000 (14:24 -0700)
committerSage Weil <sage@newdream.net>
Mon, 21 Sep 2009 21:24:35 +0000 (14:24 -0700)
This lets the error handlers get at session state (if any).

18 files changed:
src/ceph.cc
src/client/Client.cc
src/client/Client.h
src/common/LogClient.h
src/dumpjournal.cc
src/librados.cc
src/mds/MDS.cc
src/mds/MDS.h
src/mon/MonClient.cc
src/mon/MonClient.h
src/mon/Monitor.h
src/msg/Dispatcher.h
src/msg/Message.h
src/msg/Messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h
src/osd/OSD.h
src/testmsgr.cc

index d32ee9e42c7ad07580cb99e29bfccec673ea9962..8a280544db8e61b3f892a900bdf4a3ef5d153d11 100644 (file)
@@ -359,9 +359,9 @@ class Admin : public Dispatcher {
     return true;
   }
 
-  void ms_handle_failure(Message *m, const entity_addr_t& addr) {}
+  void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr) {}
 
-  bool ms_handle_reset(const entity_addr_t& peer) {
+  bool ms_handle_reset(Connection *con, const entity_addr_t& peer) {
     lock.Lock();
     if (observe)
       send_observe_requests();
@@ -371,7 +371,7 @@ class Admin : public Dispatcher {
     return true;
   }
 
-  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+  void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
 
 } dispatcher;
 
index fc4c7928b645c36ef2f53cbf93d0a11946eef41d..9e62577f5689bc84c7106a27d3bc7ad008919501 100644 (file)
@@ -5675,19 +5675,19 @@ int Client::enumerate_layout(int fd, vector<ObjectExtent>& result,
 
 // ===============================
 
-void Client::ms_handle_failure(Message *m, const entity_addr_t& addr)
+void Client::ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr)
 {
   dout(0) << "ms_handle_failure " << *m << " to " << addr << dendl;
 }
 
-bool Client::ms_handle_reset(const entity_addr_t& addr) 
+bool Client::ms_handle_reset(Connection *con, const entity_addr_t& addr) 
 {
   dout(0) << "ms_handle_reset on " << addr << dendl;
   return false;
 }
 
 
-void Client::ms_handle_remote_reset(const entity_addr_t& addr) 
+void Client::ms_handle_remote_reset(Connection *con, const entity_addr_t& addr) 
 {
   dout(0) << "ms_handle_remote_reset on " << addr << dendl;
 #if 0
index ef4b9128457d2b03753b988e3337ac4c1f999e81..e190f835b80efba0647d8d5024c7900cd3d99ab1 100644 (file)
@@ -1045,9 +1045,9 @@ protected:
   friend class SyntheticClient;
   bool ms_dispatch(Message *m);
 
-  bool ms_handle_reset(const entity_addr_t& peer);
-  void ms_handle_failure(Message *m, const entity_addr_t& peer);
-  void ms_handle_remote_reset(const entity_addr_t& peer);
+  bool ms_handle_reset(Connection *con, const entity_addr_t& peer);
+  void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer);
+  void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer);
 
 
  public:
index 7eb5402721a8a441daa4e75f6f0dfe07c58192c3..51833e12cdd1ed734aea2d63c6fda3d6b6c58bc2 100644 (file)
@@ -36,9 +36,9 @@ class LogClient : public Dispatcher {
   bool is_synchronous;
   void _send_log();
 
-  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
-  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
-  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+  bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
 
 
  public:
index 2c0494cf90b4fc7ac028046bbd85bc2d709376b1..447aaa689971b01fd7c9ae2b87dd4cbcb7c41bd1 100644 (file)
@@ -62,9 +62,9 @@ class Dumper : public Dispatcher {
     }
     return true;
   }
-  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
-  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
-  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+  bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
 
 } dispatcher;
 
index f4497e2ef9e7bdbe97c838f2fa2863466e8d70eb..a4d4cfce21b40fa98968c2275e4496dd3dfa3090 100644 (file)
@@ -59,9 +59,9 @@ class RadosClient : public Dispatcher
   bool _dispatch(Message *m);
   bool ms_dispatch(Message *m);
 
-  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
-  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
-  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+  bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
 
   Objecter *objecter;
 
index 20d9d3ce45f4b5fcd0e0139d4cb33537d44652ec..07d382605045d9ed3120d6836d8a67b8dbf638fd 100644 (file)
@@ -1395,21 +1395,21 @@ bool MDS::_dispatch(Message *m)
 
 
 
-void MDS::ms_handle_failure(Message *m, const entity_addr_t& addr) 
+void MDS::ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr) 
 {
   mds_lock.Lock();
   dout(0) << "ms_handle_failure to " << addr << " on " << *m << dendl;
   mds_lock.Unlock();
 }
 
-bool MDS::ms_handle_reset(const entity_addr_t& addr) 
+bool MDS::ms_handle_reset(Connection *con, const entity_addr_t& addr) 
 {
   dout(0) << "ms_handle_reset on " << addr << dendl;
   return false;
 }
 
 
-void MDS::ms_handle_remote_reset(const entity_addr_t& addr) 
+void MDS::ms_handle_remote_reset(Connection *con, const entity_addr_t& addr) 
 {
   dout(0) << "ms_handle_remote_reset on " << addr << dendl;
   objecter->ms_handle_remote_reset(addr);
index a0be1128da9b6139dcac7940efffa26dd5e55c79..d1c361a5ee1728dbddaf956f5382d88b5a295a0a 100644 (file)
@@ -361,9 +361,9 @@ class MDS : public Dispatcher {
   // messages
   bool _dispatch(Message *m);
   
-  bool ms_handle_reset(const entity_addr_t& peer);
-  void ms_handle_failure(Message *m, const entity_addr_t& peer);
-  void ms_handle_remote_reset(const entity_addr_t& peer);
+  bool ms_handle_reset(Connection *con, const entity_addr_t& peer);
+  void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer);
+  void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer);
 
   // special message types
   void handle_mds_map(class MMDSMap *m);
index 798e80277c15703b7405ef04e48a5c726dae84d3..ec8e9ab94cc90bfe7b71e0e77ae0376273483baf 100644 (file)
@@ -313,7 +313,7 @@ void MonClient::_reopen_session()
   }
 }
 
-bool MonClient::ms_handle_reset(const entity_addr_t& peer)
+bool MonClient::ms_handle_reset(Connection *con, const entity_addr_t& peer)
 {
   dout(10) << "ms_handle_reset " << peer << dendl;
   if (hunting)
index 74edf8874d85fd2f3249246179e4f0803adbd048..6ff016ddf3bca7b8c18fd7680d464d9c19af9027 100644 (file)
@@ -44,10 +44,10 @@ private:
   SafeTimer timer;
 
   bool ms_dispatch(Message *m);
-  bool ms_handle_reset(const entity_addr_t& peer);
+  bool ms_handle_reset(Connection *con, const entity_addr_t& peer);
 
-  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
-  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+  void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
 
   void handle_monmap(MMonMap *m);
 
index 83281b54ccc5e2899c99e82ad44ff685fb348fa5..4413ec9c92e40a332d28b61b40ee11809d1cea64 100644 (file)
@@ -167,9 +167,9 @@ public:
 
  private:
   bool ms_dispatch(Message *m);
-  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
-  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
-  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+  bool ms_handle_reset(Connection *con, const entity_addr_t& peer);
+  void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
 
  public:
   Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map);
index a999a856f5441844cff88b785c31ba0af903a2b5..822a8f33c4e46baa44cbb84d2789e5129988ec78 100644 (file)
@@ -30,18 +30,18 @@ public:
   virtual bool ms_dispatch(Message *m) = 0;
 
   // how i deal with transmission failures.
-  virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) = 0;
+  virtual void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr) = 0;
 
   /*
    * on any connection reset.
    * this indicates that the ordered+reliable delivery semantics have 
    * been violated.  messages may have been lost.
    */
-  virtual bool ms_handle_reset(const entity_addr_t& peer) = 0;
+  virtual bool ms_handle_reset(Connection *con, const entity_addr_t& peer) = 0;
 
   // on deliberate reset of connection by remote
   //  implies incoming messages dropped; possibly/probably some of our previous outgoing too.
-  virtual void ms_handle_remote_reset(const entity_addr_t& peer) = 0;
+  virtual void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) = 0;
 };
 
 #endif
index f33c1a75f9fc88e1826b638ac7dddf546014b437..19fce70db124d9d53ab6f7b08930e20ec7b142f2 100644 (file)
@@ -142,10 +142,12 @@ struct RefCountedObject {
   virtual ~RefCountedObject() {}
   
   RefCountedObject *get() {
+    //generic_dout(0) << "RefCountedObject::get " << this << " " << nref.test() << " -> " << (nref.test() + 1) << dendl;
     nref.inc();
     return this;
   }
   void put() {
+    //generic_dout(0) << "RefCountedObject::put " << this << " " << nref.test() << " -> " << (nref.test() - 1) << dendl;
     if (nref.dec() == 0)
       delete this;
   }
@@ -159,8 +161,11 @@ struct Connection : public RefCountedObject {
 public:
   Connection() : nref(1), lock("Connection::lock"), priv(NULL) {}
   ~Connection() {
-    if (priv)
+    //generic_dout(0) << "~Connection " << this << dendl;
+    if (priv) {
+      //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl;
       priv->put();
+    }
   }
 
   Connection *get() {
@@ -176,7 +181,7 @@ public:
     Mutex::Locker l(lock);
     if (priv)
       priv->put();
-    priv = o->get();
+    priv = o;
   }
   RefCountedObject *get_priv() {
     Mutex::Locker l(lock);
index c41c2d0663c84155f3d4cec80139f910af737d54..855dc3477cfc2ec9d0506c572eb29ce11656fa73 100644 (file)
@@ -33,7 +33,7 @@ class Timer;
 
 
 class Messenger {
- private:
+private:
   list<Dispatcher*> dispatchers;
 
 protected:
@@ -104,24 +104,24 @@ protected:
                    << dendl;
     assert(0);
   }
-  void ms_deliver_handle_reset(const entity_addr_t& peer) {
+  void ms_deliver_handle_reset(Connection *con, const entity_addr_t& peer) {
     for (list<Dispatcher*>::iterator p = dispatchers.begin();
         p != dispatchers.end();
         p++)
-      if ((*p)->ms_handle_reset(peer))
+      if ((*p)->ms_handle_reset(con, peer))
        return;
   }
-  void ms_deliver_handle_remote_reset(const entity_addr_t& peer) {
+  void ms_deliver_handle_remote_reset(Connection *con, const entity_addr_t& peer) {
     for (list<Dispatcher*>::iterator p = dispatchers.begin();
         p != dispatchers.end();
         p++)
-      (*p)->ms_handle_remote_reset(peer);
+      (*p)->ms_handle_remote_reset(con, peer);
   }
-  void ms_deliver_handle_failure(Message *m, const entity_addr_t& peer) {
+  void ms_deliver_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) {
     for (list<Dispatcher*>::iterator p = dispatchers.begin();
         p != dispatchers.end();
         p++)
-      (*p)->ms_handle_failure(m, peer);
+      (*p)->ms_handle_failure(con, m, peer);
   }
 
   // shutdown
index 615928037bec3f3da07e2804264a62e2af358889..ff04a52cb351ce35851a8dae211b841b7694b119 100644 (file)
@@ -290,23 +290,29 @@ void SimpleMessenger::Endpoint::dispatch_entry()
           ls.pop_front();
          if ((long)m == BAD_REMOTE_RESET) {
            lock.Lock();
-           entity_addr_t a = remote_reset_q.front();
+           Connection *con = remote_reset_q.front().first;
+           entity_addr_t a = remote_reset_q.front().second;
            remote_reset_q.pop_front();
            lock.Unlock();
-           ms_deliver_handle_remote_reset(a);
+           ms_deliver_handle_remote_reset(con, a);
+           con->put();
          } else if ((long)m == BAD_RESET) {
            lock.Lock();
-           entity_addr_t a = reset_q.front();
+           Connection *con = reset_q.front().first;
+           entity_addr_t a = reset_q.front().second;
            reset_q.pop_front();
            lock.Unlock();
-           ms_deliver_handle_reset(a);
+           ms_deliver_handle_reset(con, a);
+           con->put();
          } else if ((long)m == BAD_FAILED) {
            lock.Lock();
-           m = failed_q.front().first;
-           entity_addr_t a = failed_q.front().second;
+           Connection *con = failed_q.front().con;
+           m = failed_q.front().msg;
+           entity_addr_t a = failed_q.front().addr;
            failed_q.pop_front();
            lock.Unlock();
-           ms_deliver_handle_failure(m, a);
+           ms_deliver_handle_failure(con, m, a);
+           con->put();
            m->put();
          } else {
            dout(1) << "<== " << m->get_source_inst()
@@ -1183,7 +1189,7 @@ void SimpleMessenger::Pipe::fail()
 
   for (unsigned i=0; i<rank->local.size(); i++) 
     if (rank->local[i])
-      rank->local[i]->queue_reset(peer_addr);
+      rank->local[i]->queue_reset(connection_state->get(), peer_addr);
 
   // unregister
   lock.Unlock();
@@ -1201,7 +1207,7 @@ void SimpleMessenger::Pipe::was_session_reset()
   report_failures();
   for (unsigned i=0; i<rank->local.size(); i++) 
     if (rank->local[i])
-      rank->local[i]->queue_remote_reset(peer_addr);
+      rank->local[i]->queue_remote_reset(connection_state->get(), peer_addr);
 
   out_seq = 0;
   in_seq = 0;
@@ -1225,7 +1231,7 @@ void SimpleMessenger::Pipe::report_failures()
        dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
       } else {
        dout(10) << "fail on " << *m << dendl;
-       rank->local[srcrank]->queue_failure(m, peer_addr);
+       rank->local[srcrank]->queue_failure(connection_state->get(), m, peer_addr);
       }
     }
     m->put();
index 531708ed9ee5ec81aee3901cca61ab1f24a61f98..b8ecea1dc293b762c4f5424432eaf00e00fb918c 100644 (file)
@@ -323,28 +323,34 @@ private:
     }
 
     enum { BAD_REMOTE_RESET, BAD_RESET, BAD_FAILED };
-    list<entity_addr_t> remote_reset_q;
-    list<entity_addr_t> reset_q;
-    list<pair<Message*,entity_addr_t> > failed_q;
+    list<pair<Connection*,entity_addr_t> > remote_reset_q;
+    list<pair<Connection*,entity_addr_t> > reset_q;
+    struct fail_item {
+      Connection *con;
+      Message *msg;
+      entity_addr_t addr;
+      fail_item(Connection *c, Message *m, entity_addr_t a) : con(c), msg(m), addr(a) {}
+    };
+    list<fail_item> failed_q;
 
-    void queue_remote_reset(entity_addr_t a) {
+    void queue_remote_reset(Connection *con, entity_addr_t a) {
       lock.Lock();
-      remote_reset_q.push_back(a);
+      remote_reset_q.push_back(pair<Connection*,entity_addr_t>(con, a));
       dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET);
       cond.Signal();
       lock.Unlock();
     }
-    void queue_reset(entity_addr_t a) {
+    void queue_reset(Connection *con, entity_addr_t a) {
       lock.Lock();
-      reset_q.push_back(a);
+      reset_q.push_back(pair<Connection*,entity_addr_t>(con, a));
       dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET);
       cond.Signal();
       lock.Unlock();
     }
-    void queue_failure(Message *m, entity_addr_t a) {
+    void queue_failure(Connection *con, Message *m, entity_addr_t a) {
       lock.Lock();
       m->get();
-      failed_q.push_back(pair<Message*,entity_addr_t>(m, a));
+      failed_q.push_back(fail_item(con, m, a));
       dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_FAILED);
       cond.Signal();
       lock.Unlock();
index 5a1b88d3b775f0e33cb26c7903e8e30cd3a927e4..5ba0f198e793681f34bbb7555a1e229e25a23b47 100644 (file)
@@ -207,9 +207,9 @@ public:
     bool ms_dispatch(Message *m) {
       return osd->heartbeat_dispatch(m);
     };
-    bool ms_handle_reset(const entity_addr_t& peer) { return false; }
-    void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
-    void ms_handle_remote_reset(const entity_addr_t& peer) {}
+    bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+    void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+    void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
   public:
     OSD *osd;
     HeartbeatDispatcher(OSD *o) : osd(o) {}
@@ -816,9 +816,9 @@ protected:
 
  private:
   bool ms_dispatch(Message *m);
-  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
-  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
-  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+  bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
 
  public:
   OSD(int id, Messenger *m, Messenger *hbm, MonClient *mc, const char *dev = 0, const char *jdev = 0);
index 368d34dd7b3abf3efa90462f6d3dde4cb21ef493..f4f15c329db2b3c56ab1a1cbf5f61d33b5ddcede 100644 (file)
@@ -57,9 +57,9 @@ class Admin : public Dispatcher {
     return true;
   }
 
-  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
-  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
-  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+  bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
 
 } dispatcher;