int incarnation;
};
+class C_Drop_Cache : public MDSInternalContext {
+public:
+ C_Drop_Cache(Server *server, MDCache *mdcache, MDLog *mdlog,
+ MDSRank *mds, uint64_t recall_timeout,
+ Formatter *f, Context *on_finish)
+ : MDSInternalContext(mds),
+ server(server), mdcache(mdcache), mdlog(mdlog),
+ recall_timeout(recall_timeout), f(f), on_finish(on_finish),
+ whoami(mds->whoami), incarnation(mds->incarnation) {
+ }
+
+ void send() {
+ // not really a hard requirement here, but lets ensure this in
+ // case we change the logic here.
+ assert(mds->mds_lock.is_locked());
+
+ dout(20) << __func__ << dendl;
+ recall_client_state();
+ }
+
+private:
+ // context which completes itself (with -ETIMEDOUT) after a specified
+ // timeout or when explicitly completed, whichever comes first. Note
+ // that the context does not detroy itself after completion -- it
+ // needs to be explicitly freed.
+ class C_ContextTimeout : public MDSInternalContext {
+ public:
+ C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish)
+ : MDSInternalContext(mds),
+ timeout(timeout),
+ lock("mds::context::timeout", false, true),
+ on_finish(on_finish) {
+ }
+ ~C_ContextTimeout() {
+ ceph_assert(timer_task == nullptr);
+ }
+
+ void start_timer() {
+ timer_task = new FunctionContext([this](int _) {
+ timer_task = nullptr;
+ complete(-ETIMEDOUT);
+ });
+ mds->timer.add_event_after(timeout, timer_task);
+ }
+
+ void finish(int r) override {
+ Context *ctx = nullptr;
+ {
+ Mutex::Locker locker(lock);
+ std::swap(on_finish, ctx);
+ }
+ if (ctx != nullptr) {
+ ctx->complete(r);
+ }
+ }
+ void complete(int r) override {
+ if (timer_task != nullptr) {
+ mds->timer.cancel_event(timer_task);
+ }
+
+ finish(r);
+ }
+
+ uint64_t timeout;
+ Mutex lock;
+ Context *on_finish = nullptr;
+ Context *timer_task = nullptr;
+ };
+
+ void recall_client_state() {
+ dout(20) << __func__ << dendl;
+
+ f->open_object_section("result");
+
+ MDSGatherBuilder *gather = new MDSGatherBuilder(g_ceph_context);
+ server->recall_client_state(1.0, true, gather);
+ if (!gather->has_subs()) {
+ handle_recall_client_state(0);
+ delete gather;
+ return;
+ }
+
+ C_ContextTimeout *ctx = new C_ContextTimeout(
+ mds, recall_timeout, new FunctionContext([this](int r) {
+ handle_recall_client_state(r);
+ }));
+
+ ctx->start_timer();
+ gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
+ gather->activate();
+ }
+
+ void handle_recall_client_state(int r) {
+ dout(20) << __func__ << ": r=" << r << dendl;
+
+ // client recall section
+ f->open_object_section("client_recall");
+ f->dump_int("return_code", r);
+ f->dump_string("message", cpp_strerror(r));
+ f->close_section();
+
+ // we can still continue after recall timeout
+ trim_cache();
+ }
+
+ void trim_cache() {
+ dout(20) << __func__ << dendl;
+
+ if (!mdcache->trim(UINT64_MAX)) {
+ cmd_err(f, "failed to trim cache");
+ complete(-EINVAL);
+ return;
+ }
+
+ flush_journal();
+ }
+
+ void flush_journal() {
+ dout(20) << __func__ << dendl;
+
+ Context *ctx = new FunctionContext([this](int r) {
+ handle_flush_journal(r);
+ });
+
+ C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, mds, &ss, ctx);
+ flush_journal->send();
+ }
+
+ void handle_flush_journal(int r) {
+ dout(20) << __func__ << ": r=" << r << dendl;
+
+ if (r != 0) {
+ cmd_err(f, ss.str());
+ complete(r);
+ return;
+ }
+
+ // journal flush section
+ f->open_object_section("flush_journal");
+ f->dump_int("return_code", r);
+ f->dump_string("message", ss.str());
+ f->close_section();
+
+ cache_status();
+ }
+
+ void cache_status() {
+ dout(20) << __func__ << dendl;
+
+ // cache status section
+ mdcache->cache_status(f);
+ f->close_section();
+
+ complete(0);
+ }
+
+ void finish(int r) override {
+ dout(20) << __func__ << ": r=" << r << dendl;
+
+ on_finish->complete(r);
+ }
+
+ Server *server;
+ MDCache *mdcache;
+ MDLog *mdlog;
+ uint64_t recall_timeout;
+ Formatter *f;
+ Context *on_finish;
+
+ int retval = 0;
+ std::stringstream ss;
+
+ // so as to use dout
+ mds_rank_t whoami;
+ int incarnation;
+
+ void cmd_err(Formatter *f, std::string_view err) {
+ f->reset();
+ f->open_object_section("result");
+ f->dump_string("error", err);
+ f->close_section();
+ }
+};
+
MDSRank::MDSRank(
mds_rank_t whoami_,
Mutex &mds_lock_,
} else if (command == "cache status") {
Mutex::Locker l(mds_lock);
mdcache->cache_status(f);
+ } else if (command == "cache drop") {
+ int64_t timeout;
+ if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) {
+ return false;
+ }
+
+ C_SaferCond cond;
+ command_cache_drop((uint64_t)timeout, f, &cond);
+ int r = cond.wait();
+ if (r != 0) {
+ f->flush(ss);
+ }
} else if (command == "dump tree") {
command_dump_tree(cmdmap, ss, f);
} else if (command == "dump loads") {
return true;
}
-class C_MDS_Send_Command_Reply : public MDSInternalContext
-{
+class C_MDS_Send_Command_Reply : public MDSInternalContext {
protected:
MCommand::const_ref m;
public:
C_MDS_Send_Command_Reply(MDSRank *_mds, const MCommand::const_ref &_m) :
MDSInternalContext(_mds), m(_m) {}
- void send (int r, std::string_view out_str) {
+
+ void send(int r, std::string_view ss) {
+ std::stringstream ds;
+ send(r, ss, ds);
+ }
+
+ void send(int r, std::string_view ss, std::stringstream &ds) {
bufferlist bl;
- MDSDaemon::send_command_reply(m, mds, r, bl, out_str);
+ bl.append(ds);
+ MDSDaemon::send_command_reply(m, mds, r, bl, ss);
}
- void finish (int r) override {
+
+ void finish(int r) override {
send(r, "");
}
};
int *r,
std::stringstream *ds,
std::stringstream *ss,
+ Context **run_later,
bool *need_reply)
{
ceph_assert(r != nullptr);
}
damage_table.erase(id);
+ return true;
+ } else if (prefix == "cache drop") {
+ int64_t timeout;
+ if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) {
+ return false;
+ }
+
+ JSONFormatter *f = new JSONFormatter(true);
+ C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);
+ Context *on_finish = new FunctionContext([this, f, reply](int r) {
+ cache_drop_send_reply(f, reply, r);
+ delete f;
+ delete reply;
+ });
+
+ *need_reply = false;
+ *run_later = new C_OnFinisher(
+ new FunctionContext([this, timeout, f, on_finish](int _) {
+ command_cache_drop((uint64_t)timeout, f, on_finish);
+ }), finisher);
+
return true;
} else {
return false;
}
}
+void MDSRank::cache_drop_send_reply(Formatter *f, C_MDS_Send_Command_Reply *reply, int r) {
+ dout(20) << __func__ << ": r=" << r << dendl;
+
+ std::stringstream ds;
+ std::stringstream ss;
+ if (r != 0) {
+ f->flush(ss);
+ } else {
+ f->flush(ds);
+ }
+
+ reply->send(r, ss.str(), ds);
+}
+
+void MDSRank::command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish) {
+ dout(20) << __func__ << dendl;
+
+ Mutex::Locker locker(mds_lock);
+ C_Drop_Cache *request = new C_Drop_Cache(server, mdcache, mdlog, this,
+ timeout, f, on_finish);
+ request->send();
+}
+
epoch_t MDSRank::get_osd_epoch() const
{
return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
}
}
+
+void Server::flush_session(Session *session, MDSGatherBuilder *gather) {
+ if (!session->is_open() ||
+ !session->get_connection() ||
+ !session->get_connection()->has_feature(CEPH_FEATURE_EXPORT_PEER)) {
+ return;
+ }
+
+ version_t seq = session->wait_for_flush(gather->new_sub());
+ mds->send_message_client(
+ MClientSession::create(CEPH_SESSION_FLUSHMSG, seq), session);
+}
+
void Server::flush_client_sessions(set<client_t>& client_set, MDSGatherBuilder& gather)
{
for (set<client_t>::iterator p = client_set.begin(); p != client_set.end(); ++p) {
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->v));
ceph_assert(session);
- if (!session->is_open() ||
- !session->get_connection() ||
- !session->get_connection()->has_feature(CEPH_FEATURE_EXPORT_PEER))
- continue;
- version_t seq = session->wait_for_flush(gather.new_sub());
- mds->send_message_client(MClientSession::create(CEPH_SESSION_FLUSHMSG, seq), session);
+ flush_session(session, &gather);
}
}
* to trim some caps, and consequently unpin some inodes in the MDCache so
* that it can trim too.
*/
-void Server::recall_client_state(void)
-{
+void Server::recall_client_state(double ratio, bool flush_client_session,
+ MDSGatherBuilder *gather) {
+ if (flush_client_session) {
+ assert(gather != nullptr);
+ }
+
/* try to recall at least 80% of all caps */
uint64_t max_caps_per_client = Capability::count() * g_conf().get_val<double>("mds_max_ratio_caps_per_client");
uint64_t min_caps_per_client = g_conf().get_val<uint64_t>("mds_min_caps_per_client");
/* ratio: determine the amount of caps to recall from each client. Use
* percentage full over the cache reservation. Cap the ratio at 80% of client
* caps. */
- double ratio = 1.0-fmin(0.80, mdcache->cache_toofull_ratio());
+ if (ratio < 0.0)
+ ratio = 1.0 - fmin(0.80, mdcache->cache_toofull_ratio());
- dout(10) << "recall_client_state " << ratio
- << ", caps per client " << min_caps_per_client << "-" << max_caps_per_client
- << dendl;
+ dout(10) << __func__ << ": ratio=" << ratio << ", caps per client "
+ << min_caps_per_client << "-" << max_caps_per_client << dendl;
set<Session*> sessions;
mds->sessionmap.get_client_session_set(sessions);
+
for (auto &session : sessions) {
if (!session->is_open() ||
+ !session->get_connection() ||
!session->info.inst.name.is_client())
continue;
auto m = MClientSession::create(CEPH_SESSION_RECALL_STATE);
m->head.max_caps = newlim;
mds->send_message_client(m, session);
+ if (flush_client_session) {
+ flush_session(session, gather);
+ }
session->notify_recall_sent(newlim);
}
}