}
}
+void MgrClient::reconnect()
+{
+ assert(lock.is_locked_by_me());
+
+ if (session) {
+ ldout(cct, 4) << "Terminating session with "
+ << session->con->get_peer_addr() << dendl;
+ session->con->mark_down();
+ delete session;
+ session = nullptr;
+ stats_period = 0;
+ if (report_callback != nullptr) {
+ timer.cancel_event(report_callback);
+ report_callback = nullptr;
+ }
+
+ std::vector<ceph_tid_t> erase_cmds;
+ auto commands = command_table.get_commands();
+ for (const auto &i : commands) {
+ // FIXME be nicer, retarget command on new mgr?
+ if (i.second.on_finish != nullptr) {
+ i.second.on_finish->complete(-ETIMEDOUT);
+ }
+ erase_cmds.push_back(i.first);
+ }
+ for (const auto &tid : erase_cmds) {
+ command_table.erase(tid);
+ }
+ }
+
+ if (map.get_available()) {
+ ldout(cct, 4) << "Starting new session with " << map.get_active_addr()
+ << dendl;
+ entity_inst_t inst;
+ inst.addr = map.get_active_addr();
+ inst.name = entity_name_t::MGR(map.get_active_gid());
+
+ session = new MgrSessionState();
+ session->con = msgr->get_connection(inst);
+
+ // Don't send an open if we're just a client (i.e. doing
+ // command-sending, not stats etc)
+ if (g_conf && !g_conf->name.is_client()) {
+ auto open = new MMgrOpen();
+ open->daemon_name = g_conf->name.get_id();
+ session->con->send_message(open);
+ }
+
+ signal_cond_list(waiting_for_session);
+ } else {
+ ldout(cct, 4) << "No active mgr available yet" << dendl;
+ }
+}
+
bool MgrClient::handle_mgr_map(MMgrMap *m)
{
assert(lock.is_locked_by_me());
// Reset session?
if (session == nullptr ||
session->con->get_peer_addr() != map.get_active_addr()) {
-
- if (session) {
- ldout(cct, 4) << "Terminating session with "
- << session->con->get_peer_addr() << dendl;
- session->con->mark_down();
- delete session;
- session = nullptr;
- stats_period = 0;
- if (report_callback != nullptr) {
- timer.cancel_event(report_callback);
- report_callback = nullptr;
- }
-
- std::vector<ceph_tid_t> erase_cmds;
- auto commands = command_table.get_commands();
- for (const auto &i : commands) {
- // FIXME be nicer, retarget command on new mgr?
- if (i.second.on_finish != nullptr) {
- i.second.on_finish->complete(-ETIMEDOUT);
- }
- erase_cmds.push_back(i.first);
- }
- for (const auto &tid : erase_cmds) {
- command_table.erase(tid);
- }
- }
-
- if (map.get_available()) {
- ldout(cct, 4) << "Starting new session with " << map.get_active_addr()
- << dendl;
- entity_inst_t inst;
- inst.addr = map.get_active_addr();
- inst.name = entity_name_t::MGR(map.get_active_gid());
-
- session = new MgrSessionState();
- session->con = msgr->get_connection(inst);
-
- // Don't send an open if we're just a client (i.e. doing
- // command-sending, not stats etc)
- if (g_conf && !g_conf->name.is_client()) {
- auto open = new MMgrOpen();
- open->daemon_name = g_conf->name.get_id();
- session->con->send_message(open);
- }
-
- signal_cond_list(waiting_for_session);
- } else {
- ldout(cct, 4) << "No active mgr available yet" << dendl;
- }
+ reconnect();
}
return true;
bool MgrClient::ms_handle_reset(Connection *con)
{
-#if 0
- Mutex::Locker lock(monc_lock);
-
- if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
- if (cur_mon.empty() || con != cur_con) {
- ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl;
- return true;
- } else {
- ldout(cct, 10) << "ms_handle_reset current mon " << con->get_peer_addr() << dendl;
- if (hunting)
- return true;
-
- ldout(cct, 0) << "hunting for new mon" << dendl;
- _reopen_session();
- }
+ Mutex::Locker l(lock);
+ if (session && con == session->con) {
+ ldout(cct, 4) << __func__ << " con " << con << dendl;
+ reconnect();
+ return true;
}
return false;
-#else
- return false;
-#endif
}
bool MgrClient::ms_handle_refused(Connection *con)