}
}
+bool Client::ms_handle_refused(Connection *con)
+{
+ ldout(cct, 1) << "ms_handle_refused on " << con->get_peer_addr() << dendl;
+ return false;
+}
+
bool Client::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
{
if (dest_type == CEPH_ENTITY_TYPE_MON)
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
+ bool ms_handle_refused(Connection *con);
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
int authenticate();
{
}
+bool librados::RadosClient::ms_handle_refused(Connection *con)
+{
+ return false;
+}
bool librados::RadosClient::_dispatch(Message *m)
{
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
+ bool ms_handle_refused(Connection *con);
Objecter *objecter;
void ms_handle_connect(Connection *c) {}
bool ms_handle_reset(Connection *c) {return false;}
void ms_handle_remote_reset(Connection *c) {}
+ bool ms_handle_refused(Connection *c) {return false;}
void notify_mdsmap(MDSMap const *mdsmap);
void notify_health(MDSRank const *mds);
}
}
+bool MDSDaemon::ms_handle_refused(Connection *con)
+{
+ // do nothing for now
+ return false;
+}
+
bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
bool& is_valid, CryptoKey& session_key)
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
+ bool ms_handle_refused(Connection *con);
protected:
// admin socket handling
return true;
}
void ms_handle_remote_reset(Connection *con) {}
+ bool ms_handle_refused(Connection *con) {
+ return false;
+ }
};
class MonClient : public Dispatcher {
bool ms_dispatch(Message *m);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {}
+ bool ms_handle_refused(Connection *con) { return false; }
void handle_monmap(MMonMap *m);
return true;
}
+bool Monitor::ms_handle_refused(Connection *con)
+{
+ // just log for now...
+ dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
+ return false;
+}
+
void Monitor::check_subs()
{
string type = "monmap";
bool& isvalid, CryptoKey& session_key);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {}
+ bool ms_handle_refused(Connection *con);
int write_default_keyring(bufferlist& bl);
void extract_save_mon_key(KeyRing& keyring);
case D_BAD_RESET:
msgr->ms_deliver_handle_reset(qitem.get_connection());
break;
+ case D_CONN_REFUSED:
+ msgr->ms_deliver_handle_refused(qitem.get_connection());
+ break;
default:
assert(0);
}
uint64_t next_id;
- enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
+ enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_CONN_REFUSED, D_NUM_CODES };
/**
* The DispatchThread runs dispatch_entry to empty out the dispatch_queue.
QueueItem(D_BAD_RESET, con));
cond.Signal();
}
+ void queue_refused(Connection *con) {
+ Mutex::Locker l(lock);
+ if (stop)
+ return;
+ mqueue.enqueue_strict(
+ 0,
+ CEPH_MSG_PRIO_HIGHEST,
+ QueueItem(D_CONN_REFUSED, con));
+ cond.Signal();
+ }
bool can_fast_dispatch(Message *m) const;
void fast_dispatch(Message *m);
*/
virtual void ms_handle_remote_reset(Connection *con) = 0;
+ /**
+ * This indicates that the connection is both broken and further
+ * connection attempts are failing because other side refuses
+ * it.
+ *
+ * @param con The Connection which broke. You are not granted
+ * a reference to it.
+ */
+ virtual bool ms_handle_refused(Connection *con) = 0;
+
/**
* @defgroup Authentication
* @{
++p)
(*p)->ms_handle_remote_reset(con);
}
+
+ /**
+ * Notify each Dispatcher of a Connection for which reconnection
+ * attempts are being refused. Call this function whenever you
+ * detect that a lossy Connection has been disconnected and it's
+ * impossible to reconnect.
+ *
+ * @param con Pointer to the broken Connection.
+ */
+ void ms_deliver_handle_refused(Connection *con) {
+ for (list<Dispatcher*>::iterator p = dispatchers.begin();
+ p != dispatchers.end();
+ ++p) {
+ if ((*p)->ms_handle_refused(con))
+ return;
+ }
+ }
+
/**
* Get the AuthAuthorizer for a new outgoing Connection.
*
ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
if (rc < 0) {
- rc = -errno;
+ int stored_errno = errno;
ldout(msgr->cct,2) << "connect error " << peer_addr
- << ", " << cpp_strerror(rc) << dendl;
+ << ", " << cpp_strerror(stored_errno) << dendl;
+ if (stored_errno == ECONNREFUSED) {
+ ldout(msgr->cct, 2) << "connection refused!" << dendl;
+ msgr->dispatch_queue.queue_refused(connection_state.get());
+ }
goto fail;
}
return true;
}
+bool OSD::ms_handle_refused(Connection *con)
+{
+ return false;
+}
+
struct C_OSD_GetVersion : public Context {
OSD *osd;
uint64_t oldest, newest;
return osd->heartbeat_reset(con);
}
void ms_handle_remote_reset(Connection *con) {}
+ bool ms_handle_refused(Connection *con) {
+ return osd->ms_handle_refused(con);
+ }
bool ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
bool& isvalid, CryptoKey& session_key) {
void ms_handle_fast_accept(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {}
+ bool ms_handle_refused(Connection *con);
io_queue get_io_queue() const {
if (cct->_conf->osd_op_queue == "debug_random") {
ms_handle_reset(con);
}
+bool Objecter::ms_handle_refused(Connection *con)
+{
+ // just log for now
+ if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
+ int osd = osdmap->identify_osd(con->get_peer_addr());
+ if (osd >= 0) {
+ ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
+ }
+ }
+ return false;
+}
+
bool Objecter::ms_get_authorizer(int dest_type,
AuthAuthorizer **authorizer,
bool force_new)
return *authorizer != NULL;
}
-
void Objecter::op_target_t::dump(Formatter *f) const
{
f->dump_stream("pg") << pgid;
void ms_handle_connect(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
+ bool ms_handle_refused(Connection *con);
bool ms_get_authorizer(int dest_type,
AuthAuthorizer **authorizer,
bool force_new);
* a reference to it.
*/
virtual void ms_handle_remote_reset(Connection *con);
+
+ virtual bool ms_handle_refused(Connection *con) { return false; }
/**
* @defgroup Authentication
* a reference to it.
*/
virtual void ms_handle_remote_reset(Connection *con);
+
+ virtual bool ms_handle_refused(Connection *con) { return false; }
/**
* @defgroup test_xio_dispatcher_h_auth Authentication
void ms_handle_connect(Connection *con) { }
void ms_handle_remote_reset(Connection *con) { }
bool ms_handle_reset(Connection *con) { return false; }
+ bool ms_handle_refused(Connection *con) { return false; }
bool is_wanted(Message *m) {
dout(20) << __func__ << " " << *m << " type " << m->get_type() << dendl;
return false;
}
+ bool ms_handle_refused(Connection *con) {
+ return false;
+ }
+
const string get_name() {
return "client";
}
return true;
}
+ bool ms_handle_refused(Connection *con) {
+ return false;
+ }
+
const string get_name() {
stringstream ss;
ss << "osd." << whoami;
void ms_fast_dispatch(Message *m);
bool ms_handle_reset(Connection *con) { return true; }
void ms_handle_remote_reset(Connection *con) {}
+ bool ms_handle_refused(Connection *con) { return false; }
bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
bufferlist& authorizer, bufferlist& authorizer_reply,
bool& isvalid, CryptoKey& session_key) {
bool ms_dispatch(Message *m) { return true; }
bool ms_handle_reset(Connection *con) { return true; }
void ms_handle_remote_reset(Connection *con) {}
+ bool ms_handle_refused(Connection *con) { return false; }
void ms_fast_dispatch(Message *m) {
usleep(think_time);
//cerr << __func__ << " reply message=" << m << std::endl;
got_remote_reset = true;
cond.Signal();
}
+ bool ms_handle_refused(Connection *con) {
+ return false;
+ }
void ms_fast_dispatch(Message *m) {
Session *s = static_cast<Session*>(m->get_connection()->get_priv());
if (!s) {
conn_sent.erase(con);
got_remote_reset = true;
}
+ bool ms_handle_refused(Connection *con) {
+ return false;
+ }
void ms_fast_dispatch(Message *m) {
// MSG_COMMAND is used to disorganize regular message flow
if (m->get_type() == MSG_COMMAND) {
conns.erase(con);
lderr(g_ceph_context) << __func__ << " " << con << dendl;
}
+ bool ms_handle_refused(Connection *con) {
+ return false;
+ }
void ms_fast_dispatch(Message *m) {
assert(0);
}
bool ms_handle_reset(Connection *con) { return false; }
void ms_handle_remote_reset(Connection *con) {}
+ bool ms_handle_refused(Connection *con) { return false; }
} dispatcher;
bool ms_dispatch(Message *m);
bool ms_handle_reset(Connection *con) { return false; }
void ms_handle_remote_reset(Connection *con) {}
+ bool ms_handle_refused(Connection *con) { return false; }
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
bool force_new);
int init();