From: Piotr Dałek Date: Thu, 5 May 2016 19:03:37 +0000 (+0200) Subject: msg/simple: add ms_handle_refused callback X-Git-Tag: v11.0.1~211^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d58d7d3dbd21a8dca0a19964f51cb9bf78814a75;p=ceph.git msg/simple: add ms_handle_refused callback Added new callback (ms_handle_refused) to dispatchers. It is called once connection attempt fails with ECONNREFUSED. Also added dummy ms_handle_refused handlers across codebase. Signed-off-by: Piotr Dałek --- diff --git a/src/client/Client.cc b/src/client/Client.cc index f540df5cbcb9..7b19a2a0482d 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -12380,6 +12380,12 @@ void Client::ms_handle_remote_reset(Connection *con) } } +bool Client::ms_handle_refused(Connection *con) +{ + ldout(cct, 1) << "ms_handle_refused on " << con->get_peer_addr() << dendl; + return false; +} + bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) { if (dest_type == CEPH_ENTITY_TYPE_MON) diff --git a/src/client/Client.h b/src/client/Client.h index 8b674464a900..ead53b2db056 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -569,6 +569,7 @@ protected: void ms_handle_connect(Connection *con); bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con); + bool ms_handle_refused(Connection *con); bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); int authenticate(); diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 8c5e8ed803fa..dc98af4159f1 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -478,6 +478,10 @@ void librados::RadosClient::ms_handle_remote_reset(Connection *con) { } +bool librados::RadosClient::ms_handle_refused(Connection *con) +{ + return false; +} bool librados::RadosClient::_dispatch(Message *m) { diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h index f495ba5966c2..55bf8af45788 100644 --- a/src/librados/RadosClient.h +++ b/src/librados/RadosClient.h @@ -58,6 +58,7 @@ private: void ms_handle_connect(Connection *con); bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con); + bool ms_handle_refused(Connection *con); Objecter *objecter; diff --git a/src/mds/Beacon.h b/src/mds/Beacon.h index 1a29f24f6a71..2cd23aac2d7d 100644 --- a/src/mds/Beacon.h +++ b/src/mds/Beacon.h @@ -94,6 +94,7 @@ public: void ms_handle_connect(Connection *c) {} bool ms_handle_reset(Connection *c) {return false;} void ms_handle_remote_reset(Connection *c) {} + bool ms_handle_refused(Connection *c) {return false;} void notify_mdsmap(MDSMap const *mdsmap); void notify_health(MDSRank const *mds); diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index 1f7d7185314b..59c631e05481 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -1271,6 +1271,12 @@ void MDSDaemon::ms_handle_remote_reset(Connection *con) } } +bool MDSDaemon::ms_handle_refused(Connection *con) +{ + // do nothing for now + return false; +} + bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply, bool& is_valid, CryptoKey& session_key) diff --git a/src/mds/MDSDaemon.h b/src/mds/MDSDaemon.h index 2ae817b730af..701f56f965c8 100644 --- a/src/mds/MDSDaemon.h +++ b/src/mds/MDSDaemon.h @@ -138,6 +138,7 @@ class MDSDaemon : public Dispatcher, public md_config_obs_t { void ms_handle_connect(Connection *con); bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con); + bool ms_handle_refused(Connection *con); protected: // admin socket handling diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index b45dd86568eb..f79b595630e5 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -100,6 +100,9 @@ struct MonClientPinger : public Dispatcher { return true; } void ms_handle_remote_reset(Connection *con) {} + bool ms_handle_refused(Connection *con) { + return false; + } }; class MonClient : public Dispatcher { @@ -140,6 +143,7 @@ private: bool ms_dispatch(Message *m); bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con) {} + bool ms_handle_refused(Connection *con) { return false; } void handle_monmap(MMonMap *m); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index df6a719753f5..ed41a601ed06 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -4398,6 +4398,13 @@ bool Monitor::ms_handle_reset(Connection *con) return true; } +bool Monitor::ms_handle_refused(Connection *con) +{ + // just log for now... + dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl; + return false; +} + void Monitor::check_subs() { string type = "monmap"; diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 1f6be858d0dc..0c0922e130e2 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -902,6 +902,7 @@ public: bool& isvalid, CryptoKey& session_key); bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con) {} + bool ms_handle_refused(Connection *con); int write_default_keyring(bufferlist& bl); void extract_save_mon_key(KeyRing& keyring); diff --git a/src/msg/DispatchQueue.cc b/src/msg/DispatchQueue.cc index c9770bfb1ce5..3864c22d3519 100644 --- a/src/msg/DispatchQueue.cc +++ b/src/msg/DispatchQueue.cc @@ -181,6 +181,9 @@ void DispatchQueue::entry() case D_BAD_RESET: msgr->ms_deliver_handle_reset(qitem.get_connection()); break; + case D_CONN_REFUSED: + msgr->ms_deliver_handle_refused(qitem.get_connection()); + break; default: assert(0); } diff --git a/src/msg/DispatchQueue.h b/src/msg/DispatchQueue.h index da035a45a4c1..751282ec2d5a 100644 --- a/src/msg/DispatchQueue.h +++ b/src/msg/DispatchQueue.h @@ -89,7 +89,7 @@ class DispatchQueue { uint64_t next_id; - enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES }; + enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES }; /** * The DispatchThread runs dispatch_entry to empty out the dispatch_queue. @@ -184,6 +184,16 @@ class DispatchQueue { QueueItem(D_BAD_RESET, con)); cond.Signal(); } + void queue_refused(Connection *con) { + Mutex::Locker l(lock); + if (stop) + return; + mqueue.enqueue_strict( + 0, + CEPH_MSG_PRIO_HIGHEST, + QueueItem(D_CONN_REFUSED, con)); + cond.Signal(); + } bool can_fast_dispatch(Message *m) const; void fast_dispatch(Message *m); diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index f7de0dedffb5..898fbc4b4184 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -156,6 +156,16 @@ public: */ virtual void ms_handle_remote_reset(Connection *con) = 0; + /** + * This indicates that the connection is both broken and further + * connection attempts are failing because other side refuses + * it. + * + * @param con The Connection which broke. You are not granted + * a reference to it. + */ + virtual bool ms_handle_refused(Connection *con) = 0; + /** * @defgroup Authentication * @{ diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 58847d08c745..fa261df30c72 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -681,6 +681,24 @@ public: ++p) (*p)->ms_handle_remote_reset(con); } + + /** + * Notify each Dispatcher of a Connection for which reconnection + * attempts are being refused. Call this function whenever you + * detect that a lossy Connection has been disconnected and it's + * impossible to reconnect. + * + * @param con Pointer to the broken Connection. + */ + void ms_deliver_handle_refused(Connection *con) { + for (list::iterator p = dispatchers.begin(); + p != dispatchers.end(); + ++p) { + if ((*p)->ms_handle_refused(con)) + return; + } + } + /** * Get the AuthAuthorizer for a new outgoing Connection. * diff --git a/src/msg/simple/Pipe.cc b/src/msg/simple/Pipe.cc index a1470c4e1247..19750306fbc9 100644 --- a/src/msg/simple/Pipe.cc +++ b/src/msg/simple/Pipe.cc @@ -924,9 +924,13 @@ int Pipe::connect() ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl; rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len()); if (rc < 0) { - rc = -errno; + int stored_errno = errno; ldout(msgr->cct,2) << "connect error " << peer_addr - << ", " << cpp_strerror(rc) << dendl; + << ", " << cpp_strerror(stored_errno) << dendl; + if (stored_errno == ECONNREFUSED) { + ldout(msgr->cct, 2) << "connection refused!" << dendl; + msgr->dispatch_queue.queue_refused(connection_state.get()); + } goto fail; } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 5a1a1bb8b387..61039678ac85 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4776,6 +4776,11 @@ bool OSD::ms_handle_reset(Connection *con) return true; } +bool OSD::ms_handle_refused(Connection *con) +{ + return false; +} + struct C_OSD_GetVersion : public Context { OSD *osd; uint64_t oldest, newest; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index a0a938962ad0..5c7463451671 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1702,6 +1702,9 @@ public: return osd->heartbeat_reset(con); } void ms_handle_remote_reset(Connection *con) {} + bool ms_handle_refused(Connection *con) { + return osd->ms_handle_refused(con); + } bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply, bool& isvalid, CryptoKey& session_key) { @@ -2395,6 +2398,7 @@ protected: void ms_handle_fast_accept(Connection *con); bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con) {} + bool ms_handle_refused(Connection *con); io_queue get_io_queue() const { if (cct->_conf->osd_op_queue == "debug_random") { diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index be0e17e9d32f..271bb352814c 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -4336,6 +4336,18 @@ void Objecter::ms_handle_remote_reset(Connection *con) ms_handle_reset(con); } +bool Objecter::ms_handle_refused(Connection *con) +{ + // just log for now + if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) { + int osd = osdmap->identify_osd(con->get_peer_addr()); + if (osd >= 0) { + ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl; + } + } + return false; +} + bool Objecter::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) @@ -4348,7 +4360,6 @@ bool Objecter::ms_get_authorizer(int dest_type, return *authorizer != NULL; } - void Objecter::op_target_t::dump(Formatter *f) const { f->dump_stream("pg") << pgid; diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index d5d6e529dd93..65a945e92987 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -2929,6 +2929,7 @@ public: void ms_handle_connect(Connection *con); bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con); + bool ms_handle_refused(Connection *con); bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); diff --git a/src/test/messenger/simple_dispatcher.h b/src/test/messenger/simple_dispatcher.h index 0c08003b7610..57427901ca66 100644 --- a/src/test/messenger/simple_dispatcher.h +++ b/src/test/messenger/simple_dispatcher.h @@ -74,6 +74,8 @@ public: * a reference to it. */ virtual void ms_handle_remote_reset(Connection *con); + + virtual bool ms_handle_refused(Connection *con) { return false; } /** * @defgroup Authentication diff --git a/src/test/messenger/xio_dispatcher.h b/src/test/messenger/xio_dispatcher.h index 29c71a0ba17a..3b59108071fb 100644 --- a/src/test/messenger/xio_dispatcher.h +++ b/src/test/messenger/xio_dispatcher.h @@ -74,6 +74,8 @@ public: * a reference to it. */ virtual void ms_handle_remote_reset(Connection *con); + + virtual bool ms_handle_refused(Connection *con) { return false; } /** * @defgroup test_xio_dispatcher_h_auth Authentication diff --git a/src/test/mon/test-mon-msg.cc b/src/test/mon/test-mon-msg.cc index 3e4e0505ded1..58e8b7d133f2 100644 --- a/src/test/mon/test-mon-msg.cc +++ b/src/test/mon/test-mon-msg.cc @@ -187,6 +187,7 @@ fail: void ms_handle_connect(Connection *con) { } void ms_handle_remote_reset(Connection *con) { } bool ms_handle_reset(Connection *con) { return false; } + bool ms_handle_refused(Connection *con) { return false; } bool is_wanted(Message *m) { dout(20) << __func__ << " " << *m << " type " << m->get_type() << dendl; diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index 4ad26c31e1fa..b12272270b59 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -218,6 +218,10 @@ class ClientStub : public TestStub return false; } + bool ms_handle_refused(Connection *con) { + return false; + } + const string get_name() { return "client"; } @@ -903,6 +907,10 @@ class OSDStub : public TestStub return true; } + bool ms_handle_refused(Connection *con) { + return false; + } + const string get_name() { stringstream ss; ss << "osd." << whoami; diff --git a/src/test/msgr/perf_msgr_client.cc b/src/test/msgr/perf_msgr_client.cc index 1cb3db2ce798..d502cca405b0 100644 --- a/src/test/msgr/perf_msgr_client.cc +++ b/src/test/msgr/perf_msgr_client.cc @@ -54,6 +54,7 @@ class MessengerClient { void ms_fast_dispatch(Message *m); bool ms_handle_reset(Connection *con) { return true; } void ms_handle_remote_reset(Connection *con) {} + bool ms_handle_refused(Connection *con) { return false; } bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, bool& isvalid, CryptoKey& session_key) { diff --git a/src/test/msgr/perf_msgr_server.cc b/src/test/msgr/perf_msgr_server.cc index dde679c433ee..eefaf9a29236 100644 --- a/src/test/msgr/perf_msgr_server.cc +++ b/src/test/msgr/perf_msgr_server.cc @@ -93,6 +93,7 @@ class ServerDispatcher : public Dispatcher { bool ms_dispatch(Message *m) { return true; } bool ms_handle_reset(Connection *con) { return true; } void ms_handle_remote_reset(Connection *con) {} + bool ms_handle_refused(Connection *con) { return false; } void ms_fast_dispatch(Message *m) { usleep(think_time); //cerr << __func__ << " reply message=" << m << std::endl; diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 1c87a354ce81..b2ac1433f4a8 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -175,6 +175,9 @@ class FakeDispatcher : public Dispatcher { got_remote_reset = true; cond.Signal(); } + bool ms_handle_refused(Connection *con) { + return false; + } void ms_fast_dispatch(Message *m) { Session *s = static_cast(m->get_connection()->get_priv()); if (!s) { @@ -825,6 +828,9 @@ class SyntheticDispatcher : public Dispatcher { conn_sent.erase(con); got_remote_reset = true; } + bool ms_handle_refused(Connection *con) { + return false; + } void ms_fast_dispatch(Message *m) { // MSG_COMMAND is used to disorganize regular message flow if (m->get_type() == MSG_COMMAND) { @@ -1408,6 +1414,9 @@ class MarkdownDispatcher : public Dispatcher { conns.erase(con); lderr(g_ceph_context) << __func__ << " " << con << dendl; } + bool ms_handle_refused(Connection *con) { + return false; + } void ms_fast_dispatch(Message *m) { assert(0); } diff --git a/src/test/testmsgr.cc b/src/test/testmsgr.cc index e3610b56b3b7..6c5d658e33bf 100644 --- a/src/test/testmsgr.cc +++ b/src/test/testmsgr.cc @@ -62,6 +62,7 @@ private: bool ms_handle_reset(Connection *con) { return false; } void ms_handle_remote_reset(Connection *con) {} + bool ms_handle_refused(Connection *con) { return false; } } dispatcher; diff --git a/src/tools/cephfs/MDSUtility.h b/src/tools/cephfs/MDSUtility.h index 0f7f80acc7bc..7547c0e78c6b 100644 --- a/src/tools/cephfs/MDSUtility.h +++ b/src/tools/cephfs/MDSUtility.h @@ -50,6 +50,7 @@ public: bool ms_dispatch(Message *m); bool ms_handle_reset(Connection *con) { return false; } void ms_handle_remote_reset(Connection *con) {} + bool ms_handle_refused(Connection *con) { return false; } bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); int init();