]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: some infrastructure to recall state from clients when under memory pressure
authorSage Weil <sage@newdream.net>
Thu, 16 Apr 2009 04:17:47 +0000 (21:17 -0700)
committerSage Weil <sage@newdream.net>
Fri, 29 May 2009 21:30:21 +0000 (14:30 -0700)
Also some logging to monitor mds inode and cap stats.

12 files changed:
src/client/Client.cc
src/include/ceph_fs.h
src/include/types.h
src/kernel/mds_client.c
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/MDS.cc
src/mds/MDS.h
src/mds/Migrator.cc
src/mds/Server.cc
src/mds/Server.h
src/messages/MClientSession.h

index c09840597dac2e5e23fcff9cca0ca5bfd14db13d..eb9def2f619587f474f8a8d0ca834d31686e30aa 100644 (file)
@@ -884,7 +884,7 @@ void Client::handle_client_session(MClientSession *m)
   dout(10) << "handle_client_session " << *m << dendl;
   int from = m->get_source().num();
 
-  switch (m->op) {
+  switch (m->get_op()) {
   case CEPH_SESSION_OPEN:
     mds_sessions[from].seq = 0;
     break;
index ac37686e2d5dc368c5bb2264622991f7107efabf..bc1ede348ebec33ec52c2cfa628625b68e98ca13 100644 (file)
@@ -31,6 +31,7 @@
 #define CEPH_MONC_PROTOCOL   12 /* public/client */
 
 
+
 /*
  * types in this file are defined as little-endian, and are
  * primarily intended to describe data structures that pass
@@ -394,7 +395,7 @@ enum {
        CEPH_SESSION_REQUEST_RENEWCAPS,
        CEPH_SESSION_RENEWCAPS,
        CEPH_SESSION_STALE,
-       CEPH_SESSION_TRIMCAPS,
+       CEPH_SESSION_RECALL_STATE,
 };
 
 static inline const char *ceph_session_op_name(int op)
@@ -407,7 +408,7 @@ static inline const char *ceph_session_op_name(int op)
        case CEPH_SESSION_REQUEST_RENEWCAPS: return "request_renewcaps";
        case CEPH_SESSION_RENEWCAPS: return "renewcaps";
        case CEPH_SESSION_STALE: return "stale";
-       case CEPH_SESSION_TRIMCAPS: return "trimcaps";
+       case CEPH_SESSION_RECALL_STATE: return "recall_state";
        default: return "???";
        }
 }
@@ -416,7 +417,7 @@ struct ceph_mds_session_head {
        __le32 op;
        __le64 seq;
        struct ceph_timespec stamp;
-       __le32 max_caps;
+       __le32 max_caps, max_leases;
 } __attribute__ ((packed));
 
 /* client_request */
index ddf894c0672cdcaec4fe98f1c8340e02b3cac5b9..58d9e2fb7e3745cfbdec60435aaf46dbf5c76bc2 100644 (file)
@@ -201,6 +201,7 @@ WRITE_RAW_ENCODER(ceph_fsid_t)
 WRITE_RAW_ENCODER(ceph_file_layout)
 WRITE_RAW_ENCODER(ceph_pg_pool)
 WRITE_RAW_ENCODER(ceph_client_ticket)
+WRITE_RAW_ENCODER(ceph_mds_session_head)
 WRITE_RAW_ENCODER(ceph_mds_request_head)
 WRITE_RAW_ENCODER(ceph_mds_request_release)
 WRITE_RAW_ENCODER(ceph_mds_caps)
index 1534c1b6e07686a74b7074be818c0830aa68fedb..19bfc6ddae86086cd759596a0e40396fe93ce666 100644 (file)
@@ -1771,7 +1771,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
                send_renew_caps(mdsc, session);
                break;
 
-       case CEPH_SESSION_TRIMCAPS:
+       case CEPH_SESSION_RECALL_STATE:
                trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
                break;
 
index 2cddbe23320350be410d4ce8adc77fb92f19fe57..880810f5c36dd97bb1fbab0729339a2c23612847 100644 (file)
@@ -126,18 +126,16 @@ MDCache::~MDCache()
 
 
 
-void MDCache::log_stat(Logger *logger)
+void MDCache::log_stat()
 {
-  if (get_root()) {
-    utime_t now = g_clock.now();
-    //logger->set("pop", (int)get_root()->pop_nested.meta_load(now));
-    //logger->set("popauth", (int)get_root()->pop_auth_subtree_nested.meta_load(now));
-  }
-  logger->set(l_mds_c, lru.lru_get_size());
-  logger->set(l_mds_cpin, lru.lru_get_num_pinned());
-  logger->set(l_mds_ctop, lru.lru_get_top());
-  logger->set(l_mds_cbot, lru.lru_get_bot());
-  logger->set(l_mds_cptail, lru.lru_get_pintail());
+  mds->logger->set(l_mds_imax, g_conf.mds_cache_size);
+  mds->logger->set(l_mds_i, lru.lru_get_size());
+  mds->logger->set(l_mds_ipin, lru.lru_get_num_pinned());
+  mds->logger->set(l_mds_itop, lru.lru_get_top());
+  mds->logger->set(l_mds_ibot, lru.lru_get_bot());
+  mds->logger->set(l_mds_iptail, lru.lru_get_pintail());
+  mds->logger->set(l_mds_icap, num_inodes_with_caps);
+  mds->logger->set(l_mds_cap, num_caps);
 }
 
 
@@ -4518,7 +4516,7 @@ bool MDCache::trim(int max)
 {
   // trim LRU
   if (max < 0) {
-    max = lru.lru_get_max();
+    max = g_conf.mds_cache_size;
     if (!max) return false;
   }
   dout(7) << "trim max=" << max << "  cur=" << lru.lru_get_size() << dendl;
@@ -4626,7 +4624,7 @@ void MDCache::trim_dentry(CDentry *dn, map<int, MCacheExpire*>& expiremap)
   if (dir->get_num_head_items() == 0 && dir->is_subtree_root())
     migrator->export_empty_import(dir);
   
-  if (mds->logger) mds->logger->inc(l_mds_cex);
+  if (mds->logger) mds->logger->inc(l_mds_iex);
 }
 
 
@@ -5106,17 +5104,22 @@ void MDCache::check_memory_usage()
       vmrss = atoi(line.c_str() + 10);
   }
   
-  dout(10) << "check_memory_usage size " << vmsize << ", rss " << vmrss
-          << ", max " << g_conf.mds_mem_max
-          << dendl;
-
 
   // check client caps
   float caps_per_inode = (float)num_caps / (float)inode_map.size();
   //float cap_rate = (float)num_inodes_with_caps / (float)inode_map.size();
 
-  dout(10) << " " << num_inodes_with_caps << " / " << inode_map.size() << " inodes have caps" << dendl;
-  dout(10) << " " << num_caps << " caps, " << caps_per_inode << " caps per inode" << dendl;
+  dout(10) << "check_memory_usage size " << vmsize << ", rss " << vmrss
+          << ", max " << g_conf.mds_mem_max
+          << ", " << num_inodes_with_caps << " / " << inode_map.size() << " inodes have caps"
+          << ", " << num_caps << " caps, " << caps_per_inode << " caps per inode"
+          << dendl;
+
+  if (vmsize > g_conf.mds_mem_max * .9) {
+    float ratio = (float)g_conf.mds_mem_max * .9 / (float)vmsize;
+    if (ratio < 1.0)
+      mds->server->recall_client_state(ratio);
+  }
 
 }
 
@@ -6269,39 +6272,8 @@ void MDCache::request_cleanup(MDRequest *mdr)
   delete mdr;
 
 
-
-
-  // log some stats *****
-  if (mds->logger) {
-    mds->logger->set(l_mds_c, lru.lru_get_size());
-    mds->logger->set(l_mds_cpin, lru.lru_get_num_pinned());
-    mds->logger->set(l_mds_ctop, lru.lru_get_top());
-    mds->logger->set(l_mds_cbot, lru.lru_get_bot());
-    mds->logger->set(l_mds_cptail, lru.lru_get_pintail());
-    //mds->logger->set("buf",buffer_total_alloc);
-  }
-
-  //if (g_conf.log_pins) {
-    // pin
-    /*
-for (int i=0; i<CInode::NUM_PINS; i++) {
-      if (mds->logger2) mds->logger2->set(cinode_pin_names[i],
-                                         cinode_pins[i]);
-    }
-    */
-    /*
-      for (map<int,int>::iterator it = cdir_pins.begin();
-      it != cdir_pins.end();
-      it++) {
-      //string s = "D";
-      //s += cdir_pin_names[it->first];
-      if (mds->logger2) mds->logger2->set(//s, 
-      cdir_pin_names[it->first],
-      it->second);
-      }
-    */
-  //}
-
+  if (mds->logger)
+    log_stat();
 }
 
 
index 69ece7ed24687f6da175964800294be9a9f2982d..0d0c6c625372915ed3eaa3236c139b8218036246 100644 (file)
@@ -758,7 +758,7 @@ public:
   ~MDCache();
   
   // debug
-  void log_stat(Logger *logger);
+  void log_stat();
 
   // root inode
   CInode *get_root() { return root; }
index 6542cf3fd113004a2bd15f805d44f83c965d9353..30fb42b0b083a67ac95350cad7323e17ae6147eb 100644 (file)
@@ -183,12 +183,16 @@ void MDS::reopen_logger(utime_t start)
     mds_logtype.add_inc("dirt5");
     */
 
-    mds_logtype.add_set(l_mds_c, "c");
-    mds_logtype.add_set(l_mds_ctop, "ctop");
-    mds_logtype.add_set(l_mds_cbot, "cbot");
-    mds_logtype.add_set(l_mds_cptail, "cptail");  
-    mds_logtype.add_set(l_mds_cpin, "cpin");
-    mds_logtype.add_inc(l_mds_cex, "cex");
+    mds_logtype.add_set(l_mds_imax, "imax");
+    mds_logtype.add_set(l_mds_i, "i");
+    mds_logtype.add_set(l_mds_itop, "itop");
+    mds_logtype.add_set(l_mds_ibot, "ibot");
+    mds_logtype.add_set(l_mds_iptail, "iptail");  
+    mds_logtype.add_set(l_mds_ipin, "ipin");
+    mds_logtype.add_inc(l_mds_iex, "iex");
+    mds_logtype.add_inc(l_mds_icap, "icap");
+    mds_logtype.add_inc(l_mds_cap, "cap");
+    
     mds_logtype.add_inc(l_mds_dis, "dis");
 
     mds_logtype.add_inc(l_mds_t, "t"); 
@@ -208,7 +212,7 @@ void MDS::reopen_logger(utime_t start)
     
     mds_logtype.add_set(l_mds_sm, "sm");
     mds_logtype.add_inc(l_mds_ex, "ex");
-    mds_logtype.add_inc(l_mds_iex, "iex");
+    mds_logtype.add_inc(l_mds_iexp, "iexp");
     mds_logtype.add_inc(l_mds_im, "im");
     mds_logtype.add_inc(l_mds_iim, "iim");
     /*
@@ -408,7 +412,7 @@ void MDS::tick()
     logger->set(l_mds_buf, buffer_total_alloc.test());
     logger->set(l_mds_sm, mdcache->num_subtrees());
 
-    mdcache->log_stat(logger);
+    mdcache->log_stat();
   }
 
   // ...
index 4e80ed8e68cf00ac02bb04f486ac426ff0b2d8bb..ea1aeb5262c4db76133a125149fa09dd84b4adf1 100644 (file)
@@ -45,12 +45,15 @@ enum {
   l_mds_dir_c,
   l_mds_dir_sp,
   l_mds_dir_ffc,
-  l_mds_c,
-  l_mds_ctop,
-  l_mds_cbot,
-  l_mds_cptail,
-  l_mds_cpin,
-  l_mds_cex,
+  l_mds_imax,
+  l_mds_i,
+  l_mds_itop,
+  l_mds_ibot,
+  l_mds_iptail,
+  l_mds_ipin,
+  l_mds_iex,
+  l_mds_icap,
+  l_mds_cap,
   l_mds_dis,
   l_mds_t,
   l_mds_thit,
@@ -66,7 +69,7 @@ enum {
   l_mds_buf,
   l_mds_sm,
   l_mds_ex,
-  l_mds_iex,
+  l_mds_iexp,
   l_mds_im,
   l_mds_iim,
   l_mds_last,
index c5c95df73f9c7039b99df11e8ea2dc09b0f9987a..ffdbd15dfb50332f52e12e5827ca7a7a47f8f958 100644 (file)
@@ -849,7 +849,7 @@ void Migrator::export_go_synced(CDir *dir)
 
   // stats
   if (mds->logger) mds->logger->inc(l_mds_ex);
-  if (mds->logger) mds->logger->inc(l_mds_iex, num_exported_inodes);
+  if (mds->logger) mds->logger->inc(l_mds_iexp, num_exported_inodes);
 
   cache->show_subtrees();
 }
index 22e1e80e487723b77c56ec131430f91546150aef..20592c28b5b0bd5ae551047452953b42b3139529 100644 (file)
@@ -156,7 +156,7 @@ void Server::handle_client_session(MClientSession *m)
   dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
   assert(m->get_source().is_client()); // should _not_ come from an mds!
 
-  switch (m->op) {
+  switch (m->get_op()) {
   case CEPH_SESSION_REQUEST_OPEN:
     if (session && (session->is_opening() || session->is_open())) {
       dout(10) << "already open|opening, dropping this req" << dendl;
@@ -182,7 +182,7 @@ void Server::handle_client_session(MClientSession *m)
       mds->sessionmap.set_state(session, Session::STATE_OPEN);
       mds->locker->resume_stale_caps(session);
     }
-    mds->messenger->send_message(new MClientSession(CEPH_SESSION_RENEWCAPS, m->stamp), 
+    mds->messenger->send_message(new MClientSession(CEPH_SESSION_RENEWCAPS, m->get_stamp()), 
                                 session->inst);
     break;
     
@@ -192,13 +192,13 @@ void Server::handle_client_session(MClientSession *m)
        dout(10) << "already closing|dne, dropping this req" << dendl;
        return;
       }
-      if (m->seq < session->get_push_seq()) {
-       dout(10) << "old push seq " << m->seq << " < " << session->get_push_seq() 
+      if (m->get_seq() < session->get_push_seq()) {
+       dout(10) << "old push seq " << m->get_seq() << " < " << session->get_push_seq() 
                 << ", dropping" << dendl;
        return;
       }
-      if (m->seq != session->get_push_seq()) {
-       dout(10) << "old push seq " << m->seq << " != " << session->get_push_seq() 
+      if (m->get_seq() != session->get_push_seq()) {
+       dout(10) << "old push seq " << m->get_seq() << " != " << session->get_push_seq() 
                 << ", BUGGY!" << dendl;
        assert(0);
       }
@@ -522,6 +522,43 @@ void Server::reconnect_tick()
 }
 
 
+void Server::recall_client_state(float ratio)
+{
+  int max_caps_per_client = g_conf.mds_cache_size / 2;
+  int min_caps_per_client = 100;
+
+  dout(10) << "recall_client_state " << ratio
+          << ", caps per client " << min_caps_per_client << "-" << max_caps_per_client
+          << dendl;
+
+  set<Session*> sessions;
+  mds->sessionmap.get_client_session_set(sessions);
+  for (set<Session*>::const_iterator p = sessions.begin();
+       p != sessions.end();
+       ++p) {
+    Session *session = *p;
+    if (!session->is_open() ||
+       !session->inst.name.is_client())
+      continue;
+
+    dout(10) << " session " << session->inst
+            << " caps " << session->caps.size()
+            << ", leases " << session->leases.size()
+            << dendl;
+
+    if (session->caps.size() > min_caps_per_client) {  
+      int newlim = session->caps.size() * ratio;
+      if (newlim > max_caps_per_client)
+       newlim = max_caps_per_client;
+      MClientSession *m = new MClientSession(CEPH_SESSION_RECALL_STATE);
+      m->head.max_caps = newlim;
+      mds->send_message_client(m, session->inst);
+    }
+  }
+}
+
+
 /*******
  * some generic stuff for finishing off requests
  */
index de5161730fe563f85093c26bdc355de13dfcd319..db9dcb5d90ba9a743fc474f9e3a321ee7a446c8a 100644 (file)
@@ -79,6 +79,8 @@ public:
   void reconnect_gather_finish();
   void reconnect_tick();
   
+  void recall_client_state(float ratio);
+
   // -- requests --
   void handle_client_request(MClientRequest *m);
 
index e472870e5bab055bee09817f69958b4ee1e7f41a..f91a46f628fefd17c4e2efb27d41630868481b3b 100644 (file)
 
 class MClientSession : public Message {
 public:
+  ceph_mds_session_head head;
 
-  int32_t op;
-  version_t seq;  // used when requesting close, declaring stale
-  utime_t stamp;
-  int32_t max_caps;
+  int get_op() { return head.op; }
+  version_t get_seq() { return head.seq; }
+  utime_t get_stamp() { return utime_t(head.stamp); }
 
   MClientSession() : Message(CEPH_MSG_CLIENT_SESSION) { }
   MClientSession(int o, version_t s=0) : 
-    Message(CEPH_MSG_CLIENT_SESSION),
-    op(o), seq(s), max_caps(0) { }
+    Message(CEPH_MSG_CLIENT_SESSION) {
+    memset(&head, 0, sizeof(head));
+    head.op = o;
+    head.seq = s;
+  }
   MClientSession(int o, utime_t st) : 
-    Message(CEPH_MSG_CLIENT_SESSION),
-    op(o), seq(0), stamp(st), max_caps(0) { }
+    Message(CEPH_MSG_CLIENT_SESSION) {
+    memset(&head, 0, sizeof(head));
+    head.op = o;
+    head.seq = 0;
+    st.encode_timeval(&head.stamp);
+  }
 
   const char *get_type_name() { return "client_session"; }
   void print(ostream& out) {
-    out << "client_session(" << ceph_session_op_name(op);
-    if (seq) out << " seq " << seq;
-    if (op == CEPH_SESSION_TRIMCAPS)
-      out << " max_caps " << max_caps;
+    out << "client_session(" << ceph_session_op_name(get_op());
+    if (get_seq())
+      out << " seq " << get_seq();
+    if (get_op() == CEPH_SESSION_RECALL_STATE)
+      out << " max_caps " << head.max_caps << " max_leases" << head.max_leases;
     out << ")";
   }
 
   void decode_payload() { 
     bufferlist::iterator p = payload.begin();
-    ::decode(op, p);
-    ::decode(seq, p);
-    ::decode(stamp, p);
-    ::decode(max_caps, p);
+    ::decode(head, p);
   }
   void encode_payload() { 
-    ::encode(op, payload);
-    ::encode(seq, payload);
-    ::encode(stamp, payload);
-    ::encode(max_caps, payload);
+    ::encode(head, payload);
   }
 };