// client sync
case CEPH_MSG_CLIENT_CAPS:
handle_client_caps(static_cast<MClientCaps*>(m));
+
break;
case CEPH_MSG_CLIENT_CAPRELEASE:
handle_client_cap_release(static_cast<MClientCapRelease*>(m));
return;
}
- for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p)
+ Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+
+ for (vector<ceph_mds_cap_item>::iterator p = m->caps.begin(); p != m->caps.end(); ++p) {
_do_cap_release(client, inodeno_t((uint64_t)p->ino) , p->cap_id, p->migrate_seq, p->seq);
+ }
+
+ if (session) {
+ session->notify_cap_release(m->caps.size());
+ }
m->put();
}
}
}
+
+/**
+ * Call this when the MDCache is oversized, to send requests to the clients
+ * to trim some caps, and consequently unpin some inodes in the MDCache so
+ * that it can trim too.
+ */
void Server::recall_client_state(float ratio)
{
int max_caps_per_client = (int)(g_conf->mds_cache_size * .8);
<< dendl;
if (session->caps.size() > min_caps_per_client) {
- int newlim = (int)(session->caps.size() * ratio);
- if (newlim > max_caps_per_client)
- newlim = max_caps_per_client;
- MClientSession *m = new MClientSession(CEPH_SESSION_RECALL_STATE);
- m->head.max_caps = newlim;
- mds->send_message_client(m, session);
+ int newlim = MIN((int)(session->caps.size() * ratio), max_caps_per_client);
+ if (session->caps.size() > newlim) {
+ MClientSession *m = new MClientSession(CEPH_SESSION_RECALL_STATE);
+ m->head.max_caps = newlim;
+ mds->send_message_client(m, session);
+ session->notify_recall_sent(newlim);
+ }
}
}
-
}
session->last_cap_renew = ceph_clock_now(g_ceph_context);
}
+/**
+ * Capped in response to a CEPH_MSG_CLIENT_CAPRELEASE message,
+ * with n_caps equal to the number of caps that were released
+ * in the message. Used to update state about how many caps a
+ * client has released since it was last instructed to RECALL_STATE.
+ */
+void Session::notify_cap_release(size_t n_caps)
+{
+ if (!recalled_at.is_zero()) {
+ recall_release_count += n_caps;
+ if (recall_release_count >= recall_count) {
+ recalled_at = utime_t();
+ recall_count = 0;
+ recall_release_count = 0;
+ }
+ }
+}
+
+/**
+ * Called when a CEPH_MSG_CLIENT_SESSION->CEPH_SESSION_RECALL_STATE
+ * message is sent to the client. Update our recall-related state
+ * in order to generate health metrics if the session doesn't see
+ * a commensurate number of calls to ::notify_cap_release
+ */
+void Session::notify_recall_sent(int const new_limit)
+{
+ if (recalled_at.is_zero()) {
+ // Entering recall phase, set up counters so we can later
+ // judge whether the client has respected the recall request
+ recalled_at = ceph_clock_now(g_ceph_context);
+ assert (new_limit < caps.size()); // Behaviour of Server::recall_client_state
+ recall_count = caps.size() - new_limit;
+ recall_release_count = 0;
+ }
+}
+
uint64_t state_seq;
int importing_count;
friend class SessionMap;
+
+ // Ephemeral state for tracking progress of capability recalls
+ utime_t recalled_at; // When was I asked to SESSION_RECALL?
+ uint32_t recall_count; // How many caps was I asked to SESSION_RECALL?
+ uint32_t recall_release_count; // How many caps have I actually revoked?
+
public:
session_info_t info; ///< durable bits
interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
+ void notify_cap_release(size_t n_caps);
+ void notify_recall_sent(int const new_limit);
+
inodeno_t next_ino() {
if (info.prealloc_inos.empty())
return 0;
Session() :
state(STATE_CLOSED), state_seq(0), importing_count(0),
+ recalled_at(), recall_count(0), recall_release_count(0),
connection(NULL), item_session_list(this),
requests(0), // member_offset passed to front() manually
cap_push_seq(0),