From da70dde8aa23b0b24057089edaee4d6317f52bd4 Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Sun, 26 Aug 2018 09:48:56 -0400 Subject: [PATCH] mds: command to trim mds cache and client caps With this command, the MDS would request clients to release caps followed by trimming its own cache and a journal flush. The command accepts a timeout to wait for clients to respond to session recall and flush messages. Signed-off-by: Patrick Donnelly Signed-off-by: Rishabh Dave Signed-off-by: Venky Shankar --- src/mds/MDCache.cc | 2 +- src/mds/MDSDaemon.cc | 12 +- src/mds/MDSRank.cc | 258 ++++++++++++++++++++++++++++++++++++++++++- src/mds/MDSRank.h | 6 + src/mds/Server.cc | 41 +++++-- src/mds/Server.h | 4 +- 6 files changed, 302 insertions(+), 21 deletions(-) diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index b46fbf1ba005b..8743ec9955a89 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -7524,7 +7524,7 @@ void MDCache::check_memory_usage() if (cache_toofull()) { last_recall_state = clock::now(); - mds->server->recall_client_state(); + mds->server->recall_client_state(-1.0, false, nullptr); } // If the cache size had exceeded its limit, but we're back in bounds diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index 2de88ebdf87d9..99f12fc7eda2a 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -244,6 +244,11 @@ void MDSDaemon::set_up_admin_socket() asok_hook, "show cache status"); ceph_assert(r == 0); + r = admin_socket->register_command("cache drop", + "cache drop name=timeout,type=CephInt,range=1", + asok_hook, + "drop cache"); + ceph_assert(r == 0); r = admin_socket->register_command("dump tree", "dump tree " "name=root,type=CephString,req=true " @@ -696,6 +701,9 @@ COMMAND("heap " \ "name=heapcmd,type=CephChoices,strings=dump|start_profiler|stop_profiler|release|stats", \ "show heap usage info (available only if compiled with tcmalloc)", \ "mds", "*", "cli,rest") +COMMAND("cache drop name=timeout,type=CephInt,range=1", "trim cache and optionally " + "request client to release all caps and flush the journal", "mds", + "r", "cli,rest") }; @@ -861,8 +869,8 @@ int MDSDaemon::_handle_command( else { bool handled; try { - handled = mds_rank->handle_command(cmdmap, m, &r, &ds, &ss, - need_reply); + handled = mds_rank->handle_command(cmdmap, m, &r, &ds, &ss, + run_later, need_reply); if (!handled) { // MDSDaemon doesn't know this command ss << "unrecognized command! " << prefix; diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index becf3f4d3cfd9..c33f3e24f0c90 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -238,6 +238,190 @@ private: int incarnation; }; +class C_Drop_Cache : public MDSInternalContext { +public: + C_Drop_Cache(Server *server, MDCache *mdcache, MDLog *mdlog, + MDSRank *mds, uint64_t recall_timeout, + Formatter *f, Context *on_finish) + : MDSInternalContext(mds), + server(server), mdcache(mdcache), mdlog(mdlog), + recall_timeout(recall_timeout), f(f), on_finish(on_finish), + whoami(mds->whoami), incarnation(mds->incarnation) { + } + + void send() { + // not really a hard requirement here, but lets ensure this in + // case we change the logic here. + assert(mds->mds_lock.is_locked()); + + dout(20) << __func__ << dendl; + recall_client_state(); + } + +private: + // context which completes itself (with -ETIMEDOUT) after a specified + // timeout or when explicitly completed, whichever comes first. Note + // that the context does not detroy itself after completion -- it + // needs to be explicitly freed. + class C_ContextTimeout : public MDSInternalContext { + public: + C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish) + : MDSInternalContext(mds), + timeout(timeout), + lock("mds::context::timeout", false, true), + on_finish(on_finish) { + } + ~C_ContextTimeout() { + ceph_assert(timer_task == nullptr); + } + + void start_timer() { + timer_task = new FunctionContext([this](int _) { + timer_task = nullptr; + complete(-ETIMEDOUT); + }); + mds->timer.add_event_after(timeout, timer_task); + } + + void finish(int r) override { + Context *ctx = nullptr; + { + Mutex::Locker locker(lock); + std::swap(on_finish, ctx); + } + if (ctx != nullptr) { + ctx->complete(r); + } + } + void complete(int r) override { + if (timer_task != nullptr) { + mds->timer.cancel_event(timer_task); + } + + finish(r); + } + + uint64_t timeout; + Mutex lock; + Context *on_finish = nullptr; + Context *timer_task = nullptr; + }; + + void recall_client_state() { + dout(20) << __func__ << dendl; + + f->open_object_section("result"); + + MDSGatherBuilder *gather = new MDSGatherBuilder(g_ceph_context); + server->recall_client_state(1.0, true, gather); + if (!gather->has_subs()) { + handle_recall_client_state(0); + delete gather; + return; + } + + C_ContextTimeout *ctx = new C_ContextTimeout( + mds, recall_timeout, new FunctionContext([this](int r) { + handle_recall_client_state(r); + })); + + ctx->start_timer(); + gather->set_finisher(new MDSInternalContextWrapper(mds, ctx)); + gather->activate(); + } + + void handle_recall_client_state(int r) { + dout(20) << __func__ << ": r=" << r << dendl; + + // client recall section + f->open_object_section("client_recall"); + f->dump_int("return_code", r); + f->dump_string("message", cpp_strerror(r)); + f->close_section(); + + // we can still continue after recall timeout + trim_cache(); + } + + void trim_cache() { + dout(20) << __func__ << dendl; + + if (!mdcache->trim(UINT64_MAX)) { + cmd_err(f, "failed to trim cache"); + complete(-EINVAL); + return; + } + + flush_journal(); + } + + void flush_journal() { + dout(20) << __func__ << dendl; + + Context *ctx = new FunctionContext([this](int r) { + handle_flush_journal(r); + }); + + C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, mds, &ss, ctx); + flush_journal->send(); + } + + void handle_flush_journal(int r) { + dout(20) << __func__ << ": r=" << r << dendl; + + if (r != 0) { + cmd_err(f, ss.str()); + complete(r); + return; + } + + // journal flush section + f->open_object_section("flush_journal"); + f->dump_int("return_code", r); + f->dump_string("message", ss.str()); + f->close_section(); + + cache_status(); + } + + void cache_status() { + dout(20) << __func__ << dendl; + + // cache status section + mdcache->cache_status(f); + f->close_section(); + + complete(0); + } + + void finish(int r) override { + dout(20) << __func__ << ": r=" << r << dendl; + + on_finish->complete(r); + } + + Server *server; + MDCache *mdcache; + MDLog *mdlog; + uint64_t recall_timeout; + Formatter *f; + Context *on_finish; + + int retval = 0; + std::stringstream ss; + + // so as to use dout + mds_rank_t whoami; + int incarnation; + + void cmd_err(Formatter *f, std::string_view err) { + f->reset(); + f->open_object_section("result"); + f->dump_string("error", err); + f->close_section(); + } +}; + MDSRank::MDSRank( mds_rank_t whoami_, Mutex &mds_lock_, @@ -2289,6 +2473,18 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command, } else if (command == "cache status") { Mutex::Locker l(mds_lock); mdcache->cache_status(f); + } else if (command == "cache drop") { + int64_t timeout; + if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) { + return false; + } + + C_SaferCond cond; + command_cache_drop((uint64_t)timeout, f, &cond); + int r = cond.wait(); + if (r != 0) { + f->flush(ss); + } } else if (command == "dump tree") { command_dump_tree(cmdmap, ss, f); } else if (command == "dump loads") { @@ -2335,18 +2531,25 @@ bool MDSRankDispatcher::handle_asok_command(std::string_view command, return true; } -class C_MDS_Send_Command_Reply : public MDSInternalContext -{ +class C_MDS_Send_Command_Reply : public MDSInternalContext { protected: MCommand::const_ref m; public: C_MDS_Send_Command_Reply(MDSRank *_mds, const MCommand::const_ref &_m) : MDSInternalContext(_mds), m(_m) {} - void send (int r, std::string_view out_str) { + + void send(int r, std::string_view ss) { + std::stringstream ds; + send(r, ss, ds); + } + + void send(int r, std::string_view ss, std::stringstream &ds) { bufferlist bl; - MDSDaemon::send_command_reply(m, mds, r, bl, out_str); + bl.append(ds); + MDSDaemon::send_command_reply(m, mds, r, bl, ss); } - void finish (int r) override { + + void finish(int r) override { send(r, ""); } }; @@ -3097,6 +3300,7 @@ bool MDSRankDispatcher::handle_command( int *r, std::stringstream *ds, std::stringstream *ss, + Context **run_later, bool *need_reply) { ceph_assert(r != nullptr); @@ -3150,12 +3354,56 @@ bool MDSRankDispatcher::handle_command( } damage_table.erase(id); + return true; + } else if (prefix == "cache drop") { + int64_t timeout; + if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) { + return false; + } + + JSONFormatter *f = new JSONFormatter(true); + C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m); + Context *on_finish = new FunctionContext([this, f, reply](int r) { + cache_drop_send_reply(f, reply, r); + delete f; + delete reply; + }); + + *need_reply = false; + *run_later = new C_OnFinisher( + new FunctionContext([this, timeout, f, on_finish](int _) { + command_cache_drop((uint64_t)timeout, f, on_finish); + }), finisher); + return true; } else { return false; } } +void MDSRank::cache_drop_send_reply(Formatter *f, C_MDS_Send_Command_Reply *reply, int r) { + dout(20) << __func__ << ": r=" << r << dendl; + + std::stringstream ds; + std::stringstream ss; + if (r != 0) { + f->flush(ss); + } else { + f->flush(ds); + } + + reply->send(r, ss.str(), ds); +} + +void MDSRank::command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish) { + dout(20) << __func__ << dendl; + + Mutex::Locker locker(mds_lock); + C_Drop_Cache *request = new C_Drop_Cache(server, mdcache, mdlog, this, + timeout, f, on_finish); + request->send(); +} + epoch_t MDSRank::get_osd_epoch() const { return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)); diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index b8bd7e6c1edaf..b5b5579f8eb55 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -119,6 +119,7 @@ class Objecter; class MonClient; class Finisher; class ScrubStack; +class C_MDS_Send_Command_Reply; /** * The public part of this class's interface is what's exposed to all @@ -136,6 +137,7 @@ class MDSRank { public: friend class C_Flush_Journal; + friend class C_Drop_Cache; mds_rank_t get_nodeid() const { return whoami; } int64_t get_metadata_pool(); @@ -480,6 +482,9 @@ class MDSRank { void command_dump_tree(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f); void command_dump_inode(Formatter *f, const cmdmap_t &cmdmap, std::ostream &ss); + void cache_drop_send_reply(Formatter *f, C_MDS_Send_Command_Reply *reply, int r); + void command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish); + protected: Messenger *messenger; MonClient *monc; @@ -607,6 +612,7 @@ public: int *r, std::stringstream *ds, std::stringstream *ss, + Context **run_later, bool *need_reply); void dump_sessions(const SessionFilter &filter, Formatter *f) const; diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 3d8f3c9a2bce7..f88724d252e83 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -493,17 +493,25 @@ void Server::handle_client_session(const MClientSession::const_ref &m) } } + +void Server::flush_session(Session *session, MDSGatherBuilder *gather) { + if (!session->is_open() || + !session->get_connection() || + !session->get_connection()->has_feature(CEPH_FEATURE_EXPORT_PEER)) { + return; + } + + version_t seq = session->wait_for_flush(gather->new_sub()); + mds->send_message_client( + MClientSession::create(CEPH_SESSION_FLUSHMSG, seq), session); +} + void Server::flush_client_sessions(set& client_set, MDSGatherBuilder& gather) { for (set::iterator p = client_set.begin(); p != client_set.end(); ++p) { Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->v)); ceph_assert(session); - if (!session->is_open() || - !session->get_connection() || - !session->get_connection()->has_feature(CEPH_FEATURE_EXPORT_PEER)) - continue; - version_t seq = session->wait_for_flush(gather.new_sub()); - mds->send_message_client(MClientSession::create(CEPH_SESSION_FLUSHMSG, seq), session); + flush_session(session, &gather); } } @@ -1268,8 +1276,12 @@ void Server::recover_filelocks(CInode *in, bufferlist locks, int64_t client) * to trim some caps, and consequently unpin some inodes in the MDCache so * that it can trim too. */ -void Server::recall_client_state(void) -{ +void Server::recall_client_state(double ratio, bool flush_client_session, + MDSGatherBuilder *gather) { + if (flush_client_session) { + assert(gather != nullptr); + } + /* try to recall at least 80% of all caps */ uint64_t max_caps_per_client = Capability::count() * g_conf().get_val("mds_max_ratio_caps_per_client"); uint64_t min_caps_per_client = g_conf().get_val("mds_min_caps_per_client"); @@ -1283,16 +1295,18 @@ void Server::recall_client_state(void) /* ratio: determine the amount of caps to recall from each client. Use * percentage full over the cache reservation. Cap the ratio at 80% of client * caps. */ - double ratio = 1.0-fmin(0.80, mdcache->cache_toofull_ratio()); + if (ratio < 0.0) + ratio = 1.0 - fmin(0.80, mdcache->cache_toofull_ratio()); - dout(10) << "recall_client_state " << ratio - << ", caps per client " << min_caps_per_client << "-" << max_caps_per_client - << dendl; + dout(10) << __func__ << ": ratio=" << ratio << ", caps per client " + << min_caps_per_client << "-" << max_caps_per_client << dendl; set sessions; mds->sessionmap.get_client_session_set(sessions); + for (auto &session : sessions) { if (!session->is_open() || + !session->get_connection() || !session->info.inst.name.is_client()) continue; @@ -1306,6 +1320,9 @@ void Server::recall_client_state(void) auto m = MClientSession::create(CEPH_SESSION_RECALL_STATE); m->head.max_caps = newlim; mds->send_message_client(m, session); + if (flush_client_session) { + flush_session(session, gather); + } session->notify_recall_sent(newlim); } } diff --git a/src/mds/Server.h b/src/mds/Server.h index c53f7271d185f..6c27aa057b0fe 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -152,7 +152,8 @@ public: void reconnect_tick(); void recover_filelocks(CInode *in, bufferlist locks, int64_t client); - void recall_client_state(void); + void recall_client_state(double ratio, bool flush_client_session, + MDSGatherBuilder *gather); void force_clients_readonly(); // -- requests -- @@ -326,6 +327,7 @@ public: private: void reply_client_request(MDRequestRef& mdr, const MClientReply::ref &reply); + void flush_session(Session *session, MDSGatherBuilder *gather); }; #endif -- 2.39.5