]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: dynamically adjust priority of committing dirfrags 1683/head
authorYan, Zheng <zheng.z.yan@intel.com>
Wed, 16 Apr 2014 02:53:01 +0000 (10:53 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Wed, 16 Apr 2014 07:11:38 +0000 (15:11 +0800)
Adjust priority of committing dirfrags according to number of
expiring log segments. The more expiring log segments, the higher
priority. Because it mean MDS does not trim log segments quickly
enough.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/mds/CDir.cc
src/mds/CDir.h
src/mds/LogSegment.h
src/mds/MDLog.cc
src/mds/MDLog.h
src/mds/journal.cc

index b89ac08e7461ef312fca23216eed6b0daf4284e2..a423ef75dbf1d767dc0205e88f1691524dcfb7ad 100644 (file)
@@ -1775,7 +1775,7 @@ void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
  * @param want - min version i want committed
  * @param c - callback for completion
  */
-void CDir::commit(version_t want, Context *c, bool ignore_authpinnability)
+void CDir::commit(version_t want, Context *c, bool ignore_authpinnability, int op_prio)
 {
   dout(10) << "commit want " << want << " on " << *this << dendl;
   if (want == 0) want = get_version();
@@ -1797,7 +1797,7 @@ void CDir::commit(version_t want, Context *c, bool ignore_authpinnability)
   waiting_for_commit[want].push_back(c);
   
   // ok.
-  _commit(want);
+  _commit(want, op_prio);
 }
 
 class C_Dir_Committed : public Context {
@@ -1815,13 +1815,16 @@ public:
  * Flush out the modified dentries in this dir. Keep the bufferlist
  * below max_write_size;
  */
-void CDir::_omap_commit()
+void CDir::_omap_commit(int op_prio)
 {
   dout(10) << "_omap_commit" << dendl;
 
   unsigned max_write_size = cache->max_dir_commit_size;
   unsigned write_size = 0;
 
+  if (op_prio < 0)
+    op_prio = CEPH_MSG_PRIO_DEFAULT;
+
   // snap purge?
   const set<snapid_t> *snaps = NULL;
   SnapRealm *realm = inode->find_snaprealm();
@@ -1877,7 +1880,7 @@ void CDir::_omap_commit()
 
     if (write_size >= max_write_size) {
       ObjectOperation op;
-      op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
+      op.priority = op_prio;
       op.tmap_to_omap(true); // convert tmap to omap
 
       if (!to_set.empty())
@@ -1895,7 +1898,7 @@ void CDir::_omap_commit()
   }
 
   ObjectOperation op;
-  op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
+  op.priority = op_prio;
   op.tmap_to_omap(true); // convert tmap to omap
 
   /*
@@ -1968,7 +1971,7 @@ void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
   }
 }
 
-void CDir::_commit(version_t want)
+void CDir::_commit(version_t want, int op_prio)
 {
   dout(10) << "_commit want " << want << " on " << *this << dendl;
 
@@ -2008,7 +2011,7 @@ void CDir::_commit(version_t want)
   
   if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_c);
 
-   _omap_commit();
+   _omap_commit(op_prio);
 }
 
 
@@ -2090,7 +2093,7 @@ void CDir::_committed(version_t v)
     ++n;
     if (p->first > committed_version) {
       dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
-      _commit(p->first);
+      _commit(p->first, -1);
       break;
     }
     cache->mds->queue_waiters(p->second);
index 5ef2d65372c5a8c33d0de9cc176d549e241968fc..f5762e210c1d2c13ebcea04d46264480f152b255 100644 (file)
@@ -498,14 +498,15 @@ protected:
 
   // -- commit --
   map<version_t, list<Context*> > waiting_for_commit;
-  void _commit(version_t want);
-  void _omap_commit();
+  void _commit(version_t want, int op_prio);
+  void _omap_commit(int op_prio);
   void _encode_dentry(CDentry *dn, bufferlist& bl, const set<snapid_t> *snaps);
   void _committed(version_t v);
 public:
   void wait_for_commit(Context *c, version_t v=0);
   void commit_to(version_t want);
-  void commit(version_t want, Context *c, bool ignore_authpinnability=false);
+  void commit(version_t want, Context *c,
+             bool ignore_authpinnability=false, int op_prio=-1);
 
   // -- dirtyness --
   version_t get_committing_version() { return committing_version; }
index 2fffe3bbaf9337aee15be65067ee9020c8ab811b..a2b8ace387c80065d58351a5b87b40224b52fb1d 100644 (file)
@@ -67,7 +67,7 @@ class LogSegment {
   map<int,version_t> tablev;
 
   // try to expire
-  void try_to_expire(MDS *mds, C_GatherBuilder &gather_bld);
+  void try_to_expire(MDS *mds, C_GatherBuilder &gather_bld, int op_prio);
 
   // cons
   LogSegment(loff_t off) :
index 991eaf5d2b90944a0ccd946da0c37c9261ed786b..c224695671bf2e5d1da0dabbe815864f7ed5f78a 100644 (file)
@@ -333,9 +333,14 @@ void MDLog::trim(int m)
     if (stop < ceph_clock_now(g_ceph_context))
       break;
 
-    if ((int)expiring_segments.size() >= g_conf->mds_log_max_expiring)
+    int num_expiring_segments = (int)expiring_segments.size();
+    if (num_expiring_segments >= g_conf->mds_log_max_expiring)
       break;
 
+    int op_prio = CEPH_MSG_PRIO_LOW +
+                 (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) *
+                 num_expiring_segments / g_conf->mds_log_max_expiring;
+
     // look at first segment
     LogSegment *ls = p->second;
     assert(ls);
@@ -351,7 +356,7 @@ void MDLog::trim(int m)
     } else if (expired_segments.count(ls)) {
       dout(5) << "trim already expired segment " << ls->offset << ", " << ls->num_events << " events" << dendl;
     } else {
-      try_expire(ls);
+      try_expire(ls, op_prio);
     }
   }
 
@@ -360,16 +365,16 @@ void MDLog::trim(int m)
 }
 
 
-void MDLog::try_expire(LogSegment *ls)
+void MDLog::try_expire(LogSegment *ls, int op_prio)
 {
   C_GatherBuilder gather_bld(g_ceph_context);
-  ls->try_to_expire(mds, gather_bld);
+  ls->try_to_expire(mds, gather_bld, op_prio);
   if (gather_bld.has_subs()) {
     assert(expiring_segments.count(ls) == 0);
     expiring_segments.insert(ls);
     expiring_events += ls->num_events;
     dout(5) << "try_expire expiring segment " << ls->offset << dendl;
-    gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls));
+    gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
     gather_bld.activate();
   } else {
     dout(10) << "try_expire expired segment " << ls->offset << dendl;
@@ -380,13 +385,13 @@ void MDLog::try_expire(LogSegment *ls)
   logger->set(l_mdl_evexg, expiring_events);
 }
 
-void MDLog::_maybe_expired(LogSegment *ls
+void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
 {
   dout(10) << "_maybe_expired segment " << ls->offset << " " << ls->num_events << " events" << dendl;
   assert(expiring_segments.count(ls));
   expiring_segments.erase(ls);
   expiring_events -= ls->num_events;
-  try_expire(ls);
+  try_expire(ls, op_prio);
 }
 
 void MDLog::_trim_expired_segments()
index 82d51c33d150084fbd94fc67e1bd861ae6a7a3c3..6e8e980c94e94a62b991f8b1f8dd0ca03f6ad138 100644 (file)
@@ -219,15 +219,16 @@ private:
   class C_MaybeExpiredSegment : public Context {
     MDLog *mdlog;
     LogSegment *ls;
+    int op_prio;
   public:
-    C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s) : mdlog(mdl), ls(s) {}
+    C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) : mdlog(mdl), ls(s), op_prio(p) {}
     void finish(int res) {
-      mdlog->_maybe_expired(ls);
+      mdlog->_maybe_expired(ls, op_prio);
     }
   };
 
-  void try_expire(LogSegment *ls);
-  void _maybe_expired(LogSegment *ls);
+  void try_expire(LogSegment *ls, int op_prio);
+  void _maybe_expired(LogSegment *ls, int op_prio);
   void _expired(LogSegment *ls);
   void _trim_expired_segments();
 
index 2bf24c3eb3deb311c9cd53155fcfe2d4240d05db..2b8bef0bd8e2eb317a3afc6ea7d0a34edf1f843f 100644 (file)
@@ -62,7 +62,7 @@
 // -----------------------
 // LogSegment
 
-void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld)
+void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld, int op_prio)
 {
   set<CDir*> commit;
 
@@ -103,7 +103,7 @@ void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld)
       assert(dir->is_auth());
       if (dir->can_auth_pin()) {
        dout(15) << "try_to_expire committing " << *dir << dendl;
-       dir->commit(0, gather_bld.new_sub());
+       dir->commit(0, gather_bld.new_sub(), false, op_prio);
       } else {
        dout(15) << "try_to_expire waiting for unfreeze on " << *dir << dendl;
        dir->add_waiter(CDir::WAIT_UNFREEZE, gather_bld.new_sub());