bool MDS::asok_command(string command, cmdmap_t& cmdmap, string format,
ostream& ss)
{
+ dout(1) << "asok_command: " << command << dendl;
+
Formatter *f = new_formatter(format);
if (!f)
f = new_formatter("json-pretty");
op_tracker.dump_ops_in_flight(f);
} else if (command == "dump_historic_ops") {
op_tracker.dump_historic_ops(f);
+ } else if (command == "session ls") {
+ mds_lock.Lock();
+
+ // Dump sessions, decorated with recovery/replay status
+ f->open_array_section("sessions");
+ const ceph::unordered_map<entity_name_t, Session*> session_map = sessionmap.get_sessions();
+ for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
+ p != session_map.end();
+ ++p) {
+ if (!p->first.is_client()) {
+ continue;
+ }
+
+ f->open_object_section("session");
+ f->dump_int("id", p->first.num());
+ f->dump_string("state", p->second->get_state_name());
+ f->dump_int("replay_requests", is_clientreplay() ? p->second->get_request_count() : 0);
+ f->dump_bool("reconnecting", server->waiting_for_reconnect(p->first.num()));
+ f->dump_stream("inst") << p->second->info.inst;
+ f->close_section(); //session
+ }
+ f->close_section(); //sessions
+
+ mds_lock.Unlock();
+ } else if (command == "session evict") {
+ std::string client_id;
+ const bool got_arg = cmd_getval(g_ceph_context, cmdmap, "client_id", client_id);
+ assert(got_arg == true);
+
+ mds_lock.Lock();
+ Session *session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
+ strtol(client_id.c_str(), 0, 10)));
+ if (session) {
+ C_SaferCond on_safe;
+ server->kill_session(session, &on_safe);
+
+ mds_lock.Unlock();
+ on_safe.wait();
+ } else {
+ dout(15) << "session " << session << " not in sessionmap!" << dendl;
+ mds_lock.Unlock();
+ }
}
f->flush(ss);
delete f;
asok_hook,
"show slowest recent ops");
assert(0 == r);
+ r = admin_socket->register_command("session evict",
+ "session evict name=client_id,type=CephString",
+ asok_hook,
+ "Evict a CephFS client");
+ assert(0 == r);
+ r = admin_socket->register_command("session ls",
+ "session ls",
+ asok_hook,
+ "Enumerate connected CephFS clients");
+ assert(0 == r);
}
void MDS::clean_up_admin_socket()
Session *session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
strtol(m->cmd[2].c_str(), 0, 10)));
if (session)
- server->kill_session(session);
+ server->kill_session(session, NULL);
else
dout(15) << "session " << session << " not in sessionmap!" << dendl;
} else if (m->cmd[0] == "issue_caps") {
version_t cmapv;
interval_set<inodeno_t> inos;
version_t inotablev;
+ Context *fin;
public:
- C_MDS_session_finish(MDS *m, Session *se, uint64_t sseq, bool s, version_t mv) :
- mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inotablev(0) { }
- C_MDS_session_finish(MDS *m, Session *se, uint64_t sseq, bool s, version_t mv, interval_set<inodeno_t>& i, version_t iv) :
- mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inos(i), inotablev(iv) { }
+ C_MDS_session_finish(MDS *m, Session *se, uint64_t sseq, bool s, version_t mv, Context *fin_ = NULL) :
+ mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inotablev(0), fin(fin_) { }
+ C_MDS_session_finish(MDS *m, Session *se, uint64_t sseq, bool s, version_t mv, interval_set<inodeno_t>& i, version_t iv, Context *fin_ = NULL) :
+ mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inos(i), inotablev(iv), fin(fin_) { }
void finish(int r) {
assert(r == 0);
mds->server->_session_logged(session, state_seq, open, cmapv, inos, inotablev);
+ if (fin) {
+ fin->complete(r);
+ }
}
};
<< ", BUGGY!" << dendl;
assert(0);
}
- journal_close_session(session, Session::STATE_CLOSING);
+ journal_close_session(session, Session::STATE_CLOSING, NULL);
}
break;
dout(20) << " killing client lease of " << *dn << dendl;
dn->remove_client_lease(r, mds->locker);
}
+ if (client_reconnect_gather.count(session->info.get_client())) {
+ dout(20) << " removing client from reconnect set" << dendl;
+ client_reconnect_gather.erase(session->info.get_client());
+
+ if (client_reconnect_gather.empty()) {
+ dout(7) << " client " << session->info.inst << " was last reconnect, finishing" << dendl;
+ reconnect_gather_finish();
+ }
+ }
if (session->is_closing()) {
// mark con disposable. if there is a fault, we will get a
session->is_killing() ||
session->is_closed())
continue;
- journal_close_session(session, Session::STATE_CLOSING);
+ journal_close_session(session, Session::STATE_CLOSING, NULL);
}
mdlog->wait_for_safe(new C_MDS_TerminatedSessions(this));
mds->clog.info() << "closing stale session " << session->info.inst
<< " after " << age << "\n";
dout(10) << "autoclosing stale session " << session->info.inst << " last " << session->last_cap_renew << dendl;
- kill_session(session);
+ kill_session(session, NULL);
}
}
-void Server::kill_session(Session *session)
+void Server::kill_session(Session *session, Context *on_safe)
{
if ((session->is_opening() ||
session->is_open() ||
session->is_stale()) &&
!session->is_importing()) {
dout(10) << "kill_session " << session << dendl;
- journal_close_session(session, Session::STATE_KILLING);
+ journal_close_session(session, Session::STATE_KILLING, on_safe);
} else {
dout(10) << "kill_session importing or already closing/killing " << session << dendl;
assert(session->is_closing() ||
session->is_closed() ||
session->is_killing() ||
session->is_importing());
+ if (on_safe) {
+ on_safe->complete(0);
+ }
}
}
-void Server::journal_close_session(Session *session, int state)
+void Server::journal_close_session(Session *session, int state, Context *on_safe)
{
uint64_t sseq = mds->sessionmap.set_state(session, state);
version_t pv = ++mds->sessionmap.projected;
piv = 0;
mdlog->start_submit_entry(new ESession(session->info.inst, false, pv, both, piv),
- new C_MDS_session_finish(mds, session, sseq, false, pv, both, piv));
+ new C_MDS_session_finish(mds, session, sseq, false, pv, both, piv, on_safe));
mdlog->flush();
// clean up requests, too
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->v));
assert(session);
dout(1) << "reconnect gave up on " << session->info.inst << dendl;
- kill_session(session);
+ kill_session(session, NULL);
failed_reconnects++;
}
client_reconnect_gather.clear();
}
+/**
+ * Return true if server is in state RECONNECT and this
+ * client has not yet reconnected.
+ */
+bool Server::waiting_for_reconnect(client_t c) const
+{
+ return client_reconnect_gather.count(c) > 0;
+}
+
// -- sessions and recovery --
utime_t reconnect_start;
set<client_t> client_reconnect_gather; // clients i need a reconnect msg from.
+ bool waiting_for_reconnect(client_t c) const;
Session *get_session(Message *m);
void handle_client_session(class MClientSession *m);
void finish_flush_session(Session *session, version_t seq);
void terminate_sessions();
void find_idle_sessions();
- void kill_session(Session *session);
- void journal_close_session(Session *session, int state);
+ void kill_session(Session *session, Context *on_safe);
+ void journal_close_session(Session *session, int state, Context *on_safe);
void reconnect_clients();
void handle_client_reconnect(class MClientReconnect *m);
//void process_reconnect_cap(CInode *in, int from, ceph_mds_cap_reconnect& capinfo);