]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
MDCache: switch CDir::_commit so that it can limit max write size.
authorGreg Farnum <gregory.farnum@dreamhost.com>
Sat, 12 Feb 2011 00:58:18 +0000 (16:58 -0800)
committerGreg Farnum <gregory.farnum@dreamhost.com>
Sat, 12 Feb 2011 00:58:18 +0000 (16:58 -0800)
This should fix #777.

Signed-off-by: Greg Farnum <gregory.farnum@dreamhost.com>
src/mds/CDir.cc
src/mds/CDir.h
src/mds/CInode.cc
src/mds/CInode.h

index 8c720b54fc2c06b3dd091361faa7a0947c195b77..2c991885aaa70d640a828fad67bff60294d608b5 100644 (file)
@@ -1545,16 +1545,28 @@ public:
   }
 };
 
-
-void CDir::_commit_full(ObjectOperation& m, const set<snapid_t> *snaps)
+/**
+ * Try and write out the full directory to disk.
+ *
+ * If the bufferlist we're using exceeds max_write_size, bail out
+ * and switch to _commit_partial -- it can safely break itself into
+ * multiple non-atomic writes.
+ */
+CDir::map_t::iterator CDir::_commit_full(ObjectOperation& m, const set<snapid_t> *snaps,
+                               unsigned max_write_size)
 {
   dout(10) << "_commit_full" << dendl;
 
   // encode
   bufferlist bl;
   __u32 n = 0;
+
+  bufferlist header;
+  ::encode(fnode, header);
+  max_write_size -= header.length();
+
   map_t::iterator p = items.begin();
-  while (p != items.end()) {
+  while (p != items.end() && bl.length() < max_write_size) {
     CDentry *dn = p->second;
     p++;
     
@@ -1570,9 +1582,12 @@ void CDir::_commit_full(ObjectOperation& m, const set<snapid_t> *snaps)
     _encode_dentry(dn, bl, snaps);
   }
 
+  if (p != items.end()) {
+    assert(bl.length() > max_write_size);
+    return _commit_partial(m, snaps, max_write_size);
+  }
+
   // encode final trivialmap
-  bufferlist header;
-  ::encode(fnode, header);
   bufferlist finalbl;
   ::encode(header, finalbl);
   assert(num_head_items + num_head_null + num_snap_items + num_snap_null == items.size());
@@ -1582,9 +1597,22 @@ void CDir::_commit_full(ObjectOperation& m, const set<snapid_t> *snaps)
 
   // write out the full blob
   m.tmap_put(finalbl);
+  return p;
 }
 
-void CDir::_commit_partial(ObjectOperation& m, const set<snapid_t> *snaps)
+/**
+ * Flush out the modified dentries in this dir. Keep the bufferlist
+ * below max_write_size; if we exceed that size then return the last
+ * dentry that got committed into the bufferlist. (Note that the
+ * bufferlist might be larger than requested by the size of that
+ * last dentry as encoded.)
+ *
+ * If we're passed a last_committed_dn, skip to the next dentry after that.
+ */
+CDir::map_t::iterator CDir::_commit_partial(ObjectOperation& m,
+                                  const set<snapid_t> *snaps,
+                                  unsigned max_write_size,
+                                  map_t::iterator last_committed_dn)
 {
   dout(10) << "_commit_partial" << dendl;
   bufferlist finalbl;
@@ -1597,9 +1625,12 @@ void CDir::_commit_partial(ObjectOperation& m, const set<snapid_t> *snaps)
 
   // updated dentries
   map_t::iterator p = items.begin();
-  while (p != items.end()) {
+  if(last_committed_dn != map_t::iterator())
+    p = last_committed_dn;
+
+  while (p != items.end() && finalbl.length() < max_write_size) {
     CDentry *dn = p->second;
-    p++;
+    ++p;
     
     if (snaps && dn->last != CEPH_NOSNAP &&
        try_trim_snap_dentry(dn, *snaps))
@@ -1621,6 +1652,7 @@ void CDir::_commit_partial(ObjectOperation& m, const set<snapid_t> *snaps)
 
   // update the trivialmap at the osd
   m.tmap_update(finalbl);
+  return p;
 }
 
 void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
@@ -1744,19 +1776,22 @@ void CDir::_commit(version_t want)
   }
 
   ObjectOperation m;
-  if (is_complete() &&
-      (num_dirty > (num_head_items*g_conf.mds_dir_commit_ratio))) {
-    fnode.snap_purged_thru = realm->get_last_destroyed();
-    _commit_full(m, snaps);
-  } else {
-    _commit_partial(m, snaps);
-  }
+  map_t::iterator committed_dn;
+  unsigned max_write_size = -1;
 
   // update parent pointer while we're here.
   //  NOTE: the pointer is ONLY required to be valid for the first frag.  we put the xattr
   //        on other frags too because it can't hurt, but it won't necessarily be up to date
   //        in that case!!
-  inode->encode_parent_mutation(m);
+  max_write_size -= inode->encode_parent_mutation(m);
+
+  if (is_complete() &&
+      (num_dirty > (num_head_items*g_conf.mds_dir_commit_ratio))) {
+    fnode.snap_purged_thru = realm->get_last_destroyed();
+    committed_dn = _commit_full(m, snaps, max_write_size);
+  } else {
+    committed_dn = _commit_partial(m, snaps, max_write_size);
+  }
 
   SnapContext snapc;
   object_t oid = get_ondisk_object();
@@ -1764,8 +1799,22 @@ void CDir::_commit(version_t want)
 
   m.priority = CEPH_MSG_PRIO_LOW;  // set priority lower than journal!
 
-  cache->mds->objecter->mutate(oid, oloc, m, snapc, g_clock.now(), 0,
-                              NULL, new C_Dir_Committed(this, get_version(), inode->inode.last_renamed_version) );
+  if (committed_dn == items.end())
+    cache->mds->objecter->mutate(oid, oloc, m, snapc, g_clock.now(), 0, NULL,
+                                 new C_Dir_Committed(this, get_version(),
+                                       inode->inode.last_renamed_version));
+  else { // send in a different Context
+    C_Gather *gather = new C_Gather(new C_Dir_Committed(this, get_version(),
+                                         inode->inode.last_renamed_version));
+    cache->mds->objecter->mutate(oid, oloc, m, snapc, g_clock.now(), 0, NULL,
+                                gather->new_sub());
+    while (committed_dn != items.end()) {
+      m = ObjectOperation();
+      committed_dn = _commit_partial(m, snaps, max_write_size, committed_dn);
+      cache->mds->objecter->mutate(oid, oloc, m, snapc, g_clock.now(), 0, NULL,
+                                  gather->new_sub());
+    }
+  }
 }
 
 
index 33e8c0f28b861689e8bb4e4fe5bd6d725d761015..100c555c60adff69cd45518f138d0ede5e910f30 100644 (file)
@@ -473,8 +473,11 @@ private:
   void commit_to(version_t want);
   void commit(version_t want, Context *c, bool ignore_authpinnability=false);
   void _commit(version_t want);
-  void _commit_full(ObjectOperation& m, const set<snapid_t> *snaps);
-  void _commit_partial(ObjectOperation& m, const set<snapid_t> *snaps);
+  map_t::iterator _commit_full(ObjectOperation& m, const set<snapid_t> *snaps,
+                           unsigned max_write_size=-1);
+  map_t::iterator _commit_partial(ObjectOperation& m, const set<snapid_t> *snaps,
+                       unsigned max_write_size=-1,
+                       map_t::iterator last_committed_dn=map_t::iterator());
   void _encode_dentry(CDentry *dn, bufferlist& bl, const set<snapid_t> *snaps);
   void _committed(version_t v, version_t last_renamed_version);
   void wait_for_commit(Context *c, version_t v=0);
index bf3ee85b2bdd5da70d2b302a910c1b6028750ed0..4ecdddd85bcf12f6a33645ebc8d0caa7da9fef56 100644 (file)
@@ -1018,7 +1018,7 @@ void CInode::build_backtrace(inode_backtrace_t& bt)
   }
 }
 
-void CInode::encode_parent_mutation(ObjectOperation& m)
+unsigned CInode::encode_parent_mutation(ObjectOperation& m)
 {
   string path;
   make_path_string(path);
@@ -1030,6 +1030,7 @@ void CInode::encode_parent_mutation(ObjectOperation& m)
   bufferlist parent;
   ::encode(bt, parent);
   m.setxattr("parent", parent);
+  return path.length() + parent.length();
 }
 
 struct C_Inode_StoredParent : public Context {
index 099ecac8d0cd8136d0de544ef3aac21d07ca445c..2cf264c627c51ec4048cd134c5c2234631462639 100644 (file)
@@ -559,7 +559,7 @@ private:
   void _stored_parent(version_t v, Context *fin);
 
   void build_backtrace(inode_backtrace_t& bt);
-  void encode_parent_mutation(ObjectOperation& m);
+  unsigned encode_parent_mutation(ObjectOperation& m);
 
   void encode_store(bufferlist& bl) {
     __u8 struct_v = 2;