]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/simple: add ms_handle_refused callback
authorPiotr Dałek <git@predictor.org.pl>
Thu, 5 May 2016 19:03:37 +0000 (21:03 +0200)
committerPiotr Dałek <git@predictor.org.pl>
Tue, 13 Sep 2016 17:57:04 +0000 (19:57 +0200)
Added new callback (ms_handle_refused) to dispatchers. It is called
once connection attempt fails with ECONNREFUSED.
Also added dummy ms_handle_refused handlers across codebase.

Signed-off-by: Piotr Dałek <git@predictor.org.pl>
28 files changed:
src/client/Client.cc
src/client/Client.h
src/librados/RadosClient.cc
src/librados/RadosClient.h
src/mds/Beacon.h
src/mds/MDSDaemon.cc
src/mds/MDSDaemon.h
src/mon/MonClient.h
src/mon/Monitor.cc
src/mon/Monitor.h
src/msg/DispatchQueue.cc
src/msg/DispatchQueue.h
src/msg/Dispatcher.h
src/msg/Messenger.h
src/msg/simple/Pipe.cc
src/osd/OSD.cc
src/osd/OSD.h
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/test/messenger/simple_dispatcher.h
src/test/messenger/xio_dispatcher.h
src/test/mon/test-mon-msg.cc
src/test/mon/test_mon_workloadgen.cc
src/test/msgr/perf_msgr_client.cc
src/test/msgr/perf_msgr_server.cc
src/test/msgr/test_msgr.cc
src/test/testmsgr.cc
src/tools/cephfs/MDSUtility.h

index f540df5cbcb9b2d156bd3583b4034e066eb8b94f..7b19a2a0482dc33ee1a5b13c0f25b9b348538ca2 100644 (file)
@@ -12380,6 +12380,12 @@ void Client::ms_handle_remote_reset(Connection *con)
   }
 }
 
+bool Client::ms_handle_refused(Connection *con)
+{
+  ldout(cct, 1) << "ms_handle_refused on " << con->get_peer_addr() << dendl;
+  return false;
+}
+
 bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
 {
   if (dest_type == CEPH_ENTITY_TYPE_MON)
index 8b674464a900c6534e726c531639aff17bf0aea3..ead53b2db05661118b25c9483dff216176bc960d 100644 (file)
@@ -569,6 +569,7 @@ protected:
   void ms_handle_connect(Connection *con);
   bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con);
+  bool ms_handle_refused(Connection *con);
   bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
 
   int authenticate();
index 8c5e8ed803fa1a3ea3c229d10c4ef2e1ae14b904..dc98af4159f19e3e2a8ce2da07ad7afb56b03caa 100644 (file)
@@ -478,6 +478,10 @@ void librados::RadosClient::ms_handle_remote_reset(Connection *con)
 {
 }
 
+bool librados::RadosClient::ms_handle_refused(Connection *con)
+{
+  return false;
+}
 
 bool librados::RadosClient::_dispatch(Message *m)
 {
index f495ba5966c2a36dc52b5753c6a50cbd64155b34..55bf8af457888399dcc6e6b7561e8b2f07340a30 100644 (file)
@@ -58,6 +58,7 @@ private:
   void ms_handle_connect(Connection *con);
   bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con);
+  bool ms_handle_refused(Connection *con);
 
   Objecter *objecter;
 
index 1a29f24f6a7136de2da74f762ec5e503a10e4cce..2cd23aac2d7d075a70287d63e3f63b0fa412e308 100644 (file)
@@ -94,6 +94,7 @@ public:
   void ms_handle_connect(Connection *c) {}
   bool ms_handle_reset(Connection *c) {return false;}
   void ms_handle_remote_reset(Connection *c) {}
+  bool ms_handle_refused(Connection *c) {return false;}
 
   void notify_mdsmap(MDSMap const *mdsmap);
   void notify_health(MDSRank const *mds);
index 1f7d7185314b06c3c334b69750c1395f9f3c3cd8..59c631e05481b5559d00226817e881a7cf7b1fb5 100644 (file)
@@ -1271,6 +1271,12 @@ void MDSDaemon::ms_handle_remote_reset(Connection *con)
   }
 }
 
+bool MDSDaemon::ms_handle_refused(Connection *con)
+{
+  // do nothing for now
+  return false;
+}
+
 bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
                               int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
                               bool& is_valid, CryptoKey& session_key)
index 2ae817b730afcd04e41c797b7b9a15716e1e8ad2..701f56f965c8f08b8627c4b5fbd88dfa601ac1f8 100644 (file)
@@ -138,6 +138,7 @@ class MDSDaemon : public Dispatcher, public md_config_obs_t {
   void ms_handle_connect(Connection *con);
   bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con);
+  bool ms_handle_refused(Connection *con);
 
  protected:
   // admin socket handling
index b45dd86568eb308a595f5ec14b534fe1f7904127..f79b595630e5a6fc7606424c76b50f1539a4f43a 100644 (file)
@@ -100,6 +100,9 @@ struct MonClientPinger : public Dispatcher {
     return true;
   }
   void ms_handle_remote_reset(Connection *con) {}
+  bool ms_handle_refused(Connection *con) {
+    return false;
+  }
 };
 
 class MonClient : public Dispatcher {
@@ -140,6 +143,7 @@ private:
   bool ms_dispatch(Message *m);
   bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con) {}
+  bool ms_handle_refused(Connection *con) { return false; }
 
   void handle_monmap(MMonMap *m);
 
index df6a719753f5a11c5128068601f1208416366451..ed41a601ed06c8fb028ec56b8cba5e1628cf7b82 100644 (file)
@@ -4398,6 +4398,13 @@ bool Monitor::ms_handle_reset(Connection *con)
   return true;
 }
 
+bool Monitor::ms_handle_refused(Connection *con)
+{
+  // just log for now...
+  dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
+  return false;
+}
+
 void Monitor::check_subs()
 {
   string type = "monmap";
index 1f6be858d0dca9656bed69c5d222162ab6f0e639..0c0922e130e2d0912c8e681db8cd803f71722b9e 100644 (file)
@@ -902,6 +902,7 @@ public:
                            bool& isvalid, CryptoKey& session_key);
   bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con) {}
+  bool ms_handle_refused(Connection *con);
 
   int write_default_keyring(bufferlist& bl);
   void extract_save_mon_key(KeyRing& keyring);
index c9770bfb1ce50e3298df52c97f6e5d91bbe7e42e..3864c22d3519e498373e9d9212a26bb0d1bab193 100644 (file)
@@ -181,6 +181,9 @@ void DispatchQueue::entry()
        case D_BAD_RESET:
          msgr->ms_deliver_handle_reset(qitem.get_connection());
          break;
+       case D_CONN_REFUSED:
+         msgr->ms_deliver_handle_refused(qitem.get_connection());
+         break;
        default:
          assert(0);
        }
index da035a45a4c10b7d14528b783b9fe09a4906cf05..751282ec2d5a85c023a8f479bec58e676274d8a8 100644 (file)
@@ -89,7 +89,7 @@ class DispatchQueue {
 
   uint64_t next_id;
     
-  enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
+  enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES };
 
   /**
    * The DispatchThread runs dispatch_entry to empty out the dispatch_queue.
@@ -184,6 +184,16 @@ class DispatchQueue {
       QueueItem(D_BAD_RESET, con));
     cond.Signal();
   }
+  void queue_refused(Connection *con) {
+    Mutex::Locker l(lock);
+    if (stop)
+      return;
+    mqueue.enqueue_strict(
+      0,
+      CEPH_MSG_PRIO_HIGHEST,
+      QueueItem(D_CONN_REFUSED, con));
+    cond.Signal();
+  }
 
   bool can_fast_dispatch(Message *m) const;
   void fast_dispatch(Message *m);
index f7de0dedffb5d213e3f1be538fce7a7f0832e70d..898fbc4b4184ad18a861f9cf339ced543f2f5a2a 100644 (file)
@@ -156,6 +156,16 @@ public:
    */
   virtual void ms_handle_remote_reset(Connection *con) = 0;
   
+  /**
+   * This indicates that the connection is both broken and further
+   * connection attempts are failing because other side refuses
+   * it.
+   *
+   * @param con The Connection which broke. You are not granted
+   * a reference to it.
+   */
+  virtual bool ms_handle_refused(Connection *con) = 0;
+
   /**
    * @defgroup Authentication
    * @{
index 58847d08c745e892a436f1ccdaf6a696e1f860c9..fa261df30c727981e0f5497e55e7860faeba42b3 100644 (file)
@@ -681,6 +681,24 @@ public:
         ++p)
       (*p)->ms_handle_remote_reset(con);
   }
+
+  /**
+   * Notify each Dispatcher of a Connection for which reconnection
+   * attempts are being refused. Call this function whenever you
+   * detect that a lossy Connection has been disconnected and it's
+   * impossible to reconnect.
+   *
+   * @param con Pointer to the broken Connection.
+   */
+  void ms_deliver_handle_refused(Connection *con) {
+    for (list<Dispatcher*>::iterator p = dispatchers.begin();
+         p != dispatchers.end();
+         ++p) {
+      if ((*p)->ms_handle_refused(con))
+        return;
+    }
+  }
+
   /**
    * Get the AuthAuthorizer for a new outgoing Connection.
    *
index a1470c4e1247f52b0acd1bbee26fe9ec3adf9e46..19750306fbc9bbba278400271ee49737ad195b77 100644 (file)
@@ -924,9 +924,13 @@ int Pipe::connect()
   ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
   rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
   if (rc < 0) {
-    rc = -errno;
+    int stored_errno = errno;
     ldout(msgr->cct,2) << "connect error " << peer_addr
-            << ", " << cpp_strerror(rc) << dendl;
+            << ", " << cpp_strerror(stored_errno) << dendl;
+    if (stored_errno == ECONNREFUSED) {
+      ldout(msgr->cct, 2) << "connection refused!" << dendl;
+      msgr->dispatch_queue.queue_refused(connection_state.get());
+    }
     goto fail;
   }
 
index 5a1a1bb8b387a0f8dc8606b03559b04580ed7b29..61039678ac85ea3670e5fe6c43052bd6243de1d8 100644 (file)
@@ -4776,6 +4776,11 @@ bool OSD::ms_handle_reset(Connection *con)
   return true;
 }
 
+bool OSD::ms_handle_refused(Connection *con)
+{
+  return false;
+}
+
 struct C_OSD_GetVersion : public Context {
   OSD *osd;
   uint64_t oldest, newest;
index a0a938962ad0603bf43ab8ec11d0306aafae107c..5c7463451671cbb98dc96bdf19ffca8ab7dd0c94 100644 (file)
@@ -1702,6 +1702,9 @@ public:
       return osd->heartbeat_reset(con);
     }
     void ms_handle_remote_reset(Connection *con) {}
+    bool ms_handle_refused(Connection *con) {
+      return osd->ms_handle_refused(con);
+    }
     bool ms_verify_authorizer(Connection *con, int peer_type,
                              int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
                              bool& isvalid, CryptoKey& session_key) {
@@ -2395,6 +2398,7 @@ protected:
   void ms_handle_fast_accept(Connection *con);
   bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con) {}
+  bool ms_handle_refused(Connection *con);
 
   io_queue get_io_queue() const {
     if (cct->_conf->osd_op_queue == "debug_random") {
index be0e17e9d32fab2cb68aaf12221f95f4df18d8ec..271bb352814c3ce1ac2bd07e4eea131d9fbea0a8 100644 (file)
@@ -4336,6 +4336,18 @@ void Objecter::ms_handle_remote_reset(Connection *con)
   ms_handle_reset(con);
 }
 
+bool Objecter::ms_handle_refused(Connection *con)
+{
+  // just log for now
+  if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
+    int osd = osdmap->identify_osd(con->get_peer_addr());
+    if (osd >= 0) {
+      ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
+    }
+  }
+  return false;
+}
+
 bool Objecter::ms_get_authorizer(int dest_type,
                                 AuthAuthorizer **authorizer,
                                 bool force_new)
@@ -4348,7 +4360,6 @@ bool Objecter::ms_get_authorizer(int dest_type,
   return *authorizer != NULL;
 }
 
-
 void Objecter::op_target_t::dump(Formatter *f) const
 {
   f->dump_stream("pg") << pgid;
index d5d6e529dd93908c26510620b71279892bda1f14..65a945e92987de24c09853435bdbf8ecb88460c6 100644 (file)
@@ -2929,6 +2929,7 @@ public:
   void ms_handle_connect(Connection *con);
   bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con);
+  bool ms_handle_refused(Connection *con);
   bool ms_get_authorizer(int dest_type,
                         AuthAuthorizer **authorizer,
                         bool force_new);
index 0c08003b761011283cb18afa5d5cfe28edd167a3..57427901ca66fb6d5b3ba140fdb43ff9dea016cd 100644 (file)
@@ -74,6 +74,8 @@ public:
    * a reference to it.
    */
   virtual void ms_handle_remote_reset(Connection *con);
+  
+  virtual bool ms_handle_refused(Connection *con) { return false; }
 
   /**
    * @defgroup Authentication
index 29c71a0ba17ac39c30b9e49d7c36b9730f7e7485..3b59108071fb0b1405467b0801f1bed2fa2b8009 100644 (file)
@@ -74,6 +74,8 @@ public:
    * a reference to it.
    */
   virtual void ms_handle_remote_reset(Connection *con);
+  
+  virtual bool ms_handle_refused(Connection *con) { return false; }
 
   /**
    * @defgroup test_xio_dispatcher_h_auth Authentication
index 3e4e0505ded1479db40377a503f7f2475e4f525e..58e8b7d133f23bb9b6561f44164513567e5d0952 100644 (file)
@@ -187,6 +187,7 @@ fail:
   void ms_handle_connect(Connection *con) { }
   void ms_handle_remote_reset(Connection *con) { }
   bool ms_handle_reset(Connection *con) { return false; }
+  bool ms_handle_refused(Connection *con) { return false; }
 
   bool is_wanted(Message *m) {
     dout(20) << __func__ << " " << *m << " type " << m->get_type() << dendl;
index 4ad26c31e1fa170afbdaf9eff48f69aa2edc7e16..b12272270b59a23b4d4cc8908ac0cdd38a110559 100644 (file)
@@ -218,6 +218,10 @@ class ClientStub : public TestStub
     return false;
   }
 
+  bool ms_handle_refused(Connection *con) {
+    return false;
+  }
+
   const string get_name() {
     return "client";
   }
@@ -903,6 +907,10 @@ class OSDStub : public TestStub
     return true;
   }
 
+  bool ms_handle_refused(Connection *con) {
+    return false;
+  }
+
   const string get_name() {
     stringstream ss;
     ss << "osd." << whoami;
index 1cb3db2ce798ea3e40f2b8f1c2994c2880f3ff15..d502cca405b0daedcf81e7ddcd5de51c18bd0722 100644 (file)
@@ -54,6 +54,7 @@ class MessengerClient {
     void ms_fast_dispatch(Message *m);
     bool ms_handle_reset(Connection *con) { return true; }
     void ms_handle_remote_reset(Connection *con) {}
+    bool ms_handle_refused(Connection *con) { return false; }
     bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
                               bufferlist& authorizer, bufferlist& authorizer_reply,
                               bool& isvalid, CryptoKey& session_key) {
index dde679c433ee2e05a951d1214a54e2e9ec79067c..eefaf9a29236273040f5861f936d27d49feccf61 100644 (file)
@@ -93,6 +93,7 @@ class ServerDispatcher : public Dispatcher {
   bool ms_dispatch(Message *m) { return true; }
   bool ms_handle_reset(Connection *con) { return true; }
   void ms_handle_remote_reset(Connection *con) {}
+  bool ms_handle_refused(Connection *con) { return false; }
   void ms_fast_dispatch(Message *m) {
     usleep(think_time);
     //cerr << __func__ << " reply message=" << m << std::endl;
index 1c87a354ce8128e4245e346979b1d3ef2255aca5..b2ac1433f4a8b91623e15ed0ec22cb606f76dfaf 100644 (file)
@@ -175,6 +175,9 @@ class FakeDispatcher : public Dispatcher {
     got_remote_reset = true;
     cond.Signal();
   }
+  bool ms_handle_refused(Connection *con) {
+    return false;
+  }
   void ms_fast_dispatch(Message *m) {
     Session *s = static_cast<Session*>(m->get_connection()->get_priv());
     if (!s) {
@@ -825,6 +828,9 @@ class SyntheticDispatcher : public Dispatcher {
     conn_sent.erase(con);
     got_remote_reset = true;
   }
+  bool ms_handle_refused(Connection *con) {
+    return false;
+  }
   void ms_fast_dispatch(Message *m) {
     // MSG_COMMAND is used to disorganize regular message flow
     if (m->get_type() == MSG_COMMAND) {
@@ -1408,6 +1414,9 @@ class MarkdownDispatcher : public Dispatcher {
     conns.erase(con);
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
   }
+  bool ms_handle_refused(Connection *con) {
+    return false;
+  }
   void ms_fast_dispatch(Message *m) {
     assert(0);
   }
index e3610b56b3b706d725841a6b3de1e2bb70708857..6c5d658e33bfc0812b5ad4f02593550f318e44f1 100644 (file)
@@ -62,6 +62,7 @@ private:
 
   bool ms_handle_reset(Connection *con) { return false; }
   void ms_handle_remote_reset(Connection *con) {}
+  bool ms_handle_refused(Connection *con) { return false; }
 
 } dispatcher;
 
index 0f7f80acc7bc8f8e45ab2bbd04142ff14d282855..7547c0e78c6b09e6b9f6b77521d0267c99aa0758 100644 (file)
@@ -50,6 +50,7 @@ public:
   bool ms_dispatch(Message *m);
   bool ms_handle_reset(Connection *con) { return false; }
   void ms_handle_remote_reset(Connection *con) {}
+  bool ms_handle_refused(Connection *con) { return false; }
   bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
                          bool force_new);
   int init();