Also some logging to monitor mds inode and cap stats.
dout(10) << "handle_client_session " << *m << dendl;
int from = m->get_source().num();
- switch (m->op) {
+ switch (m->get_op()) {
case CEPH_SESSION_OPEN:
mds_sessions[from].seq = 0;
break;
#define CEPH_MONC_PROTOCOL 12 /* public/client */
+
/*
* types in this file are defined as little-endian, and are
* primarily intended to describe data structures that pass
CEPH_SESSION_REQUEST_RENEWCAPS,
CEPH_SESSION_RENEWCAPS,
CEPH_SESSION_STALE,
- CEPH_SESSION_TRIMCAPS,
+ CEPH_SESSION_RECALL_STATE,
};
static inline const char *ceph_session_op_name(int op)
case CEPH_SESSION_REQUEST_RENEWCAPS: return "request_renewcaps";
case CEPH_SESSION_RENEWCAPS: return "renewcaps";
case CEPH_SESSION_STALE: return "stale";
- case CEPH_SESSION_TRIMCAPS: return "trimcaps";
+ case CEPH_SESSION_RECALL_STATE: return "recall_state";
default: return "???";
}
}
__le32 op;
__le64 seq;
struct ceph_timespec stamp;
- __le32 max_caps;
+ __le32 max_caps, max_leases;
} __attribute__ ((packed));
/* client_request */
WRITE_RAW_ENCODER(ceph_file_layout)
WRITE_RAW_ENCODER(ceph_pg_pool)
WRITE_RAW_ENCODER(ceph_client_ticket)
+WRITE_RAW_ENCODER(ceph_mds_session_head)
WRITE_RAW_ENCODER(ceph_mds_request_head)
WRITE_RAW_ENCODER(ceph_mds_request_release)
WRITE_RAW_ENCODER(ceph_mds_caps)
send_renew_caps(mdsc, session);
break;
- case CEPH_SESSION_TRIMCAPS:
+ case CEPH_SESSION_RECALL_STATE:
trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
break;
-void MDCache::log_stat(Logger *logger)
+void MDCache::log_stat()
{
- if (get_root()) {
- utime_t now = g_clock.now();
- //logger->set("pop", (int)get_root()->pop_nested.meta_load(now));
- //logger->set("popauth", (int)get_root()->pop_auth_subtree_nested.meta_load(now));
- }
- logger->set(l_mds_c, lru.lru_get_size());
- logger->set(l_mds_cpin, lru.lru_get_num_pinned());
- logger->set(l_mds_ctop, lru.lru_get_top());
- logger->set(l_mds_cbot, lru.lru_get_bot());
- logger->set(l_mds_cptail, lru.lru_get_pintail());
+ mds->logger->set(l_mds_imax, g_conf.mds_cache_size);
+ mds->logger->set(l_mds_i, lru.lru_get_size());
+ mds->logger->set(l_mds_ipin, lru.lru_get_num_pinned());
+ mds->logger->set(l_mds_itop, lru.lru_get_top());
+ mds->logger->set(l_mds_ibot, lru.lru_get_bot());
+ mds->logger->set(l_mds_iptail, lru.lru_get_pintail());
+ mds->logger->set(l_mds_icap, num_inodes_with_caps);
+ mds->logger->set(l_mds_cap, num_caps);
}
{
// trim LRU
if (max < 0) {
- max = lru.lru_get_max();
+ max = g_conf.mds_cache_size;
if (!max) return false;
}
dout(7) << "trim max=" << max << " cur=" << lru.lru_get_size() << dendl;
if (dir->get_num_head_items() == 0 && dir->is_subtree_root())
migrator->export_empty_import(dir);
- if (mds->logger) mds->logger->inc(l_mds_cex);
+ if (mds->logger) mds->logger->inc(l_mds_iex);
}
vmrss = atoi(line.c_str() + 10);
}
- dout(10) << "check_memory_usage size " << vmsize << ", rss " << vmrss
- << ", max " << g_conf.mds_mem_max
- << dendl;
-
// check client caps
float caps_per_inode = (float)num_caps / (float)inode_map.size();
//float cap_rate = (float)num_inodes_with_caps / (float)inode_map.size();
- dout(10) << " " << num_inodes_with_caps << " / " << inode_map.size() << " inodes have caps" << dendl;
- dout(10) << " " << num_caps << " caps, " << caps_per_inode << " caps per inode" << dendl;
+ dout(10) << "check_memory_usage size " << vmsize << ", rss " << vmrss
+ << ", max " << g_conf.mds_mem_max
+ << ", " << num_inodes_with_caps << " / " << inode_map.size() << " inodes have caps"
+ << ", " << num_caps << " caps, " << caps_per_inode << " caps per inode"
+ << dendl;
+
+ if (vmsize > g_conf.mds_mem_max * .9) {
+ float ratio = (float)g_conf.mds_mem_max * .9 / (float)vmsize;
+ if (ratio < 1.0)
+ mds->server->recall_client_state(ratio);
+ }
}
delete mdr;
-
-
- // log some stats *****
- if (mds->logger) {
- mds->logger->set(l_mds_c, lru.lru_get_size());
- mds->logger->set(l_mds_cpin, lru.lru_get_num_pinned());
- mds->logger->set(l_mds_ctop, lru.lru_get_top());
- mds->logger->set(l_mds_cbot, lru.lru_get_bot());
- mds->logger->set(l_mds_cptail, lru.lru_get_pintail());
- //mds->logger->set("buf",buffer_total_alloc);
- }
-
- //if (g_conf.log_pins) {
- // pin
- /*
-for (int i=0; i<CInode::NUM_PINS; i++) {
- if (mds->logger2) mds->logger2->set(cinode_pin_names[i],
- cinode_pins[i]);
- }
- */
- /*
- for (map<int,int>::iterator it = cdir_pins.begin();
- it != cdir_pins.end();
- it++) {
- //string s = "D";
- //s += cdir_pin_names[it->first];
- if (mds->logger2) mds->logger2->set(//s,
- cdir_pin_names[it->first],
- it->second);
- }
- */
- //}
-
+ if (mds->logger)
+ log_stat();
}
~MDCache();
// debug
- void log_stat(Logger *logger);
+ void log_stat();
// root inode
CInode *get_root() { return root; }
mds_logtype.add_inc("dirt5");
*/
- mds_logtype.add_set(l_mds_c, "c");
- mds_logtype.add_set(l_mds_ctop, "ctop");
- mds_logtype.add_set(l_mds_cbot, "cbot");
- mds_logtype.add_set(l_mds_cptail, "cptail");
- mds_logtype.add_set(l_mds_cpin, "cpin");
- mds_logtype.add_inc(l_mds_cex, "cex");
+ mds_logtype.add_set(l_mds_imax, "imax");
+ mds_logtype.add_set(l_mds_i, "i");
+ mds_logtype.add_set(l_mds_itop, "itop");
+ mds_logtype.add_set(l_mds_ibot, "ibot");
+ mds_logtype.add_set(l_mds_iptail, "iptail");
+ mds_logtype.add_set(l_mds_ipin, "ipin");
+ mds_logtype.add_inc(l_mds_iex, "iex");
+ mds_logtype.add_inc(l_mds_icap, "icap");
+ mds_logtype.add_inc(l_mds_cap, "cap");
+
mds_logtype.add_inc(l_mds_dis, "dis");
mds_logtype.add_inc(l_mds_t, "t");
mds_logtype.add_set(l_mds_sm, "sm");
mds_logtype.add_inc(l_mds_ex, "ex");
- mds_logtype.add_inc(l_mds_iex, "iex");
+ mds_logtype.add_inc(l_mds_iexp, "iexp");
mds_logtype.add_inc(l_mds_im, "im");
mds_logtype.add_inc(l_mds_iim, "iim");
/*
logger->set(l_mds_buf, buffer_total_alloc.test());
logger->set(l_mds_sm, mdcache->num_subtrees());
- mdcache->log_stat(logger);
+ mdcache->log_stat();
}
// ...
l_mds_dir_c,
l_mds_dir_sp,
l_mds_dir_ffc,
- l_mds_c,
- l_mds_ctop,
- l_mds_cbot,
- l_mds_cptail,
- l_mds_cpin,
- l_mds_cex,
+ l_mds_imax,
+ l_mds_i,
+ l_mds_itop,
+ l_mds_ibot,
+ l_mds_iptail,
+ l_mds_ipin,
+ l_mds_iex,
+ l_mds_icap,
+ l_mds_cap,
l_mds_dis,
l_mds_t,
l_mds_thit,
l_mds_buf,
l_mds_sm,
l_mds_ex,
- l_mds_iex,
+ l_mds_iexp,
l_mds_im,
l_mds_iim,
l_mds_last,
// stats
if (mds->logger) mds->logger->inc(l_mds_ex);
- if (mds->logger) mds->logger->inc(l_mds_iex, num_exported_inodes);
+ if (mds->logger) mds->logger->inc(l_mds_iexp, num_exported_inodes);
cache->show_subtrees();
}
dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
assert(m->get_source().is_client()); // should _not_ come from an mds!
- switch (m->op) {
+ switch (m->get_op()) {
case CEPH_SESSION_REQUEST_OPEN:
if (session && (session->is_opening() || session->is_open())) {
dout(10) << "already open|opening, dropping this req" << dendl;
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->locker->resume_stale_caps(session);
}
- mds->messenger->send_message(new MClientSession(CEPH_SESSION_RENEWCAPS, m->stamp),
+ mds->messenger->send_message(new MClientSession(CEPH_SESSION_RENEWCAPS, m->get_stamp()),
session->inst);
break;
dout(10) << "already closing|dne, dropping this req" << dendl;
return;
}
- if (m->seq < session->get_push_seq()) {
- dout(10) << "old push seq " << m->seq << " < " << session->get_push_seq()
+ if (m->get_seq() < session->get_push_seq()) {
+ dout(10) << "old push seq " << m->get_seq() << " < " << session->get_push_seq()
<< ", dropping" << dendl;
return;
}
- if (m->seq != session->get_push_seq()) {
- dout(10) << "old push seq " << m->seq << " != " << session->get_push_seq()
+ if (m->get_seq() != session->get_push_seq()) {
+ dout(10) << "old push seq " << m->get_seq() << " != " << session->get_push_seq()
<< ", BUGGY!" << dendl;
assert(0);
}
}
+void Server::recall_client_state(float ratio)
+{
+ int max_caps_per_client = g_conf.mds_cache_size / 2;
+ int min_caps_per_client = 100;
+
+ dout(10) << "recall_client_state " << ratio
+ << ", caps per client " << min_caps_per_client << "-" << max_caps_per_client
+ << dendl;
+
+ set<Session*> sessions;
+ mds->sessionmap.get_client_session_set(sessions);
+ for (set<Session*>::const_iterator p = sessions.begin();
+ p != sessions.end();
+ ++p) {
+ Session *session = *p;
+ if (!session->is_open() ||
+ !session->inst.name.is_client())
+ continue;
+
+ dout(10) << " session " << session->inst
+ << " caps " << session->caps.size()
+ << ", leases " << session->leases.size()
+ << dendl;
+
+ if (session->caps.size() > min_caps_per_client) {
+ int newlim = session->caps.size() * ratio;
+ if (newlim > max_caps_per_client)
+ newlim = max_caps_per_client;
+ MClientSession *m = new MClientSession(CEPH_SESSION_RECALL_STATE);
+ m->head.max_caps = newlim;
+ mds->send_message_client(m, session->inst);
+ }
+ }
+
+}
+
+
/*******
* some generic stuff for finishing off requests
*/
void reconnect_gather_finish();
void reconnect_tick();
+ void recall_client_state(float ratio);
+
// -- requests --
void handle_client_request(MClientRequest *m);
class MClientSession : public Message {
public:
+ ceph_mds_session_head head;
- int32_t op;
- version_t seq; // used when requesting close, declaring stale
- utime_t stamp;
- int32_t max_caps;
+ int get_op() { return head.op; }
+ version_t get_seq() { return head.seq; }
+ utime_t get_stamp() { return utime_t(head.stamp); }
MClientSession() : Message(CEPH_MSG_CLIENT_SESSION) { }
MClientSession(int o, version_t s=0) :
- Message(CEPH_MSG_CLIENT_SESSION),
- op(o), seq(s), max_caps(0) { }
+ Message(CEPH_MSG_CLIENT_SESSION) {
+ memset(&head, 0, sizeof(head));
+ head.op = o;
+ head.seq = s;
+ }
MClientSession(int o, utime_t st) :
- Message(CEPH_MSG_CLIENT_SESSION),
- op(o), seq(0), stamp(st), max_caps(0) { }
+ Message(CEPH_MSG_CLIENT_SESSION) {
+ memset(&head, 0, sizeof(head));
+ head.op = o;
+ head.seq = 0;
+ st.encode_timeval(&head.stamp);
+ }
const char *get_type_name() { return "client_session"; }
void print(ostream& out) {
- out << "client_session(" << ceph_session_op_name(op);
- if (seq) out << " seq " << seq;
- if (op == CEPH_SESSION_TRIMCAPS)
- out << " max_caps " << max_caps;
+ out << "client_session(" << ceph_session_op_name(get_op());
+ if (get_seq())
+ out << " seq " << get_seq();
+ if (get_op() == CEPH_SESSION_RECALL_STATE)
+ out << " max_caps " << head.max_caps << " max_leases" << head.max_leases;
out << ")";
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
- ::decode(op, p);
- ::decode(seq, p);
- ::decode(stamp, p);
- ::decode(max_caps, p);
+ ::decode(head, p);
}
void encode_payload() {
- ::encode(op, payload);
- ::encode(seq, payload);
- ::encode(stamp, payload);
- ::encode(max_caps, payload);
+ ::encode(head, payload);
}
};