]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: use OMAP to store dirfrags
authorYan, Zheng <zheng.z.yan@intel.com>
Tue, 24 Dec 2013 00:56:55 +0000 (08:56 +0800)
committerYan, Zheng <zheng.z.yan@intel.com>
Fri, 10 Jan 2014 23:40:37 +0000 (07:40 +0800)
MDS can fetch dirfrags from both TMAP and OMAP. When committing a
dirfrags that is stored in TMAP, MDS first uses OSD_OP_TMAP2OMAP
to convert corresponding TMAP to OMAP, then updates the resulting
OMAP.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
src/common/config_opts.h
src/mds/CDir.cc
src/mds/CDir.h
src/mds/MDCache.cc
src/mds/MDSMap.cc
src/mds/MDSMap.h
src/mds/mdstypes.h

index 0a452fec46bbbf59704df1fd574052db5ae8bb4c..47e3783de2fc03055103860cdb9c367d5501b590 100644 (file)
@@ -287,8 +287,7 @@ OPTION(mds_max_file_size, OPT_U64, 1ULL << 40)
 OPTION(mds_cache_size, OPT_INT, 100000)
 OPTION(mds_cache_mid, OPT_FLOAT, .7)
 OPTION(mds_mem_max, OPT_INT, 1048576)        // KB
-OPTION(mds_dir_commit_ratio, OPT_FLOAT, .5)
-OPTION(mds_dir_max_commit_size, OPT_INT, 90) // MB
+OPTION(mds_dir_max_commit_size, OPT_INT, 10) // MB
 OPTION(mds_decay_halflife, OPT_FLOAT, 5)
 OPTION(mds_beacon_interval, OPT_FLOAT, 4)
 OPTION(mds_beacon_grace, OPT_FLOAT, 15)
@@ -304,7 +303,6 @@ OPTION(mds_dirstat_min_interval, OPT_FLOAT, 1)    // try to avoid propagating mo
 OPTION(mds_scatter_nudge_interval, OPT_FLOAT, 5)  // how quickly dirstat changes propagate up the hierarchy
 OPTION(mds_client_prealloc_inos, OPT_INT, 1000)
 OPTION(mds_early_reply, OPT_BOOL, true)
-OPTION(mds_use_tmap, OPT_BOOL, true)        // use trivialmap for dir updates
 OPTION(mds_default_dir_hash, OPT_INT, CEPH_STR_HASH_RJENKINS)
 OPTION(mds_log, OPT_BOOL, true)
 OPTION(mds_log_skip_corrupt_events, OPT_BOOL, false)
index 1ac26fd1c8d8f13f91111c5a8476c8cf7b6fe7a2..3697929e86282130101904d7ddb14b2329a536d5 100644 (file)
@@ -1339,20 +1339,6 @@ void CDir::last_put()
 
 // -----------------------
 // FETCH
-
-class C_Dir_Fetch : public Context {
- protected:
-  CDir *dir;
-  string want_dn;
- public:
-  bufferlist bl;
-
-  C_Dir_Fetch(CDir *d, const string& w) : dir(d), want_dn(w) { }
-  void finish(int result) {
-    dir->_fetched(bl, want_dn);
-  }
-};
-
 void CDir::fetch(Context *c, bool ignore_authpinnability)
 {
   string want;
@@ -1388,8 +1374,26 @@ void CDir::fetch(Context *c, const string& want_dn, bool ignore_authpinnability)
 
   if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_f);
 
+  _omap_fetch(want_dn);
+}
+
+class C_Dir_TMAP_Fetched : public Context {
+ protected:
+  CDir *dir;
+  string want_dn;
+ public:
+  bufferlist bl;
+
+  C_Dir_TMAP_Fetched(CDir *d, const string& w) : dir(d), want_dn(w) { }
+  void finish(int r) {
+    dir->_tmap_fetched(bl, want_dn, r);
+  }
+};
+
+void CDir::_tmap_fetch(const string& want_dn)
+{
   // start by reading the first hunk of it
-  C_Dir_Fetch *fin = new C_Dir_Fetch(this, want_dn);
+  C_Dir_TMAP_Fetched *fin = new C_Dir_TMAP_Fetched(this, want_dn);
   object_t oid = get_ondisk_object();
   object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
   ObjectOperation rd;
@@ -1397,22 +1401,85 @@ void CDir::fetch(Context *c, const string& want_dn, bool ignore_authpinnability)
   cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0, fin);
 }
 
-void CDir::_fetched(bufferlist &bl, const string& want_dn)
+void CDir::_tmap_fetched(bufferlist& bl, const string& want_dn, int r)
 {
   LogClient &clog = cache->mds->clog;
-  dout(10) << "_fetched " << bl.length() 
-          << " bytes for " << *this
-          << " want_dn=" << want_dn
-          << dendl;
-  
+  dout(10) << "_tmap_fetched " << bl.length()  << " bytes for " << *this
+          << " want_dn=" << want_dn << dendl;
+
+  assert(r == 0 || r == -ENOENT);
   assert(is_auth());
   assert(!is_frozen());
 
-  // empty?!?
+  bufferlist header;
+  map<string, bufferlist> omap;
+
   if (bl.length() == 0) {
+    r = -ENODATA;
+  } else {
+    bufferlist::iterator p = bl.begin();
+    ::decode(header, p);
+    ::decode(omap, p);
+
+    if (!p.end()) {
+      clog.warn() << "tmap buffer of dir " << dirfrag() << " has "
+                 << bl.length() - p.get_off() << " extra bytes\n";
+    }
+    bl.clear();
+  }
+
+  _omap_fetched(header, omap, want_dn, r);
+}
+
+class C_Dir_OMAP_Fetched : public Context {
+ protected:
+  CDir *dir;
+  string want_dn;
+ public:
+  bufferlist hdrbl;
+  map<string, bufferlist> omap;
+  int ret1, ret2;
+
+  C_Dir_OMAP_Fetched(CDir *d, const string& w) : dir(d), want_dn(w) { }
+  void finish(int r) {
+    if (r >= 0) r = ret1;
+    if (r >= 0) r = ret2;
+    dir->_omap_fetched(hdrbl, omap, want_dn, r);
+  }
+};
+
+void CDir::_omap_fetch(const string& want_dn)
+{
+  C_Dir_OMAP_Fetched *fin = new C_Dir_OMAP_Fetched(this, want_dn);
+  object_t oid = get_ondisk_object();
+  object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
+  ObjectOperation rd;
+  rd.omap_get_header(&fin->hdrbl, &fin->ret1);
+  rd.omap_get_vals("", "", (uint64_t)-1, &fin->omap, &fin->ret2);
+  cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0, fin);
+}
+
+void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
+                        const string& want_dn, int r)
+{
+  LogClient &clog = cache->mds->clog;
+  dout(10) << "_fetched header " << hdrbl.length() << " bytes "
+          << omap.size() << " keys for " << *this
+          << " want_dn=" << want_dn << dendl;
+
+  assert(r == 0 || r == -ENOENT || r == -ENODATA);
+  assert(is_auth());
+  assert(!is_frozen());
+
+  if (hdrbl.length() == 0) {
+    if (r != -ENODATA) { // called by _tmap_fetched() ?
+      dout(10) << "_fetched 0 byte from omap, retry tmap" << dendl;
+      _tmap_fetch(want_dn);
+      return;
+    }
+
     dout(0) << "_fetched missing object for " << *this << dendl;
-    clog.error() << "dir " << ino() << "." << dirfrag()
-         << " object missing on disk; some files may be lost\n";
+    clog.error() << "dir " << dirfrag() << " object missing on disk; some files may be lost\n";
 
     log_mark_dirty();
 
@@ -1426,24 +1493,18 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn)
     return;
   }
 
-  // decode trivialmap.
-  int len = bl.length();
-  bufferlist::iterator p = bl.begin();
-  
-  bufferlist header;
-  ::decode(header, p);
-  bufferlist::iterator hp = header.begin();
   fnode_t got_fnode;
-  ::decode(got_fnode, hp);
-
-  __u32 n;
-  ::decode(n, p);
+  {
+    bufferlist::iterator p = hdrbl.begin();
+    ::decode(got_fnode, p);
+    if (!p.end()) {
+      clog.warn() << "header buffer of dir " << dirfrag() << " has "
+                 << hdrbl.length() - p.get_off() << " extra bytes\n";
+    }
+  }
 
-  dout(10) << "_fetched version " << got_fnode.version
-          << ", " << len << " bytes, " << n << " keys"
-          << dendl;
+  dout(10) << "_fetched version " << got_fnode.version << dendl;
   
-
   // take the loaded fnode?
   // only if we are a fresh CDir* with no prior state.
   if (get_version() == 0) {
@@ -1460,8 +1521,9 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn)
   }
 
   // purge stale snaps?
-  //  * only if we have past_parents open!
-  const set<snapid_t> *snaps = 0;
+  // only if we have past_parents open!
+  bool purged_any = false;
+  const set<snapid_t> *snaps = NULL;
   SnapRealm *realm = inode->find_snaprealm();
   if (!realm->have_past_parents_open()) {
     dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
@@ -1472,30 +1534,26 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn)
             << ", snap purge based on " << *snaps << dendl;
     fnode.snap_purged_thru = realm->get_last_destroyed();
   }
-  bool purged_any = false;
 
   bool stray = inode->is_stray();
 
-  //int num_new_inodes_loaded = 0;
-  loff_t baseoff = p.get_off();
-  for (unsigned i=0; i<n; i++) {
-    loff_t dn_offset = p.get_off() - baseoff;
-
+  unsigned pos = 0;
+  for (map<string, bufferlist>::iterator p = omap.begin();
+       p != omap.end();
+       ++p, ++pos) {
     // dname
     string dname;
     snapid_t first, last;
-    dentry_key_t::decode_helper(p, dname, last);
+    dentry_key_t::decode_helper(p->first, dname, last);
     
-    bufferlist dndata;
-    ::decode(dndata, p);
-    bufferlist::iterator q = dndata.begin();
+    bufferlist::iterator q = p->second.begin();
     ::decode(first, q);
 
     // marker
     char type;
     ::decode(type, q);
 
-    dout(24) << "_fetched pos " << dn_offset << " marker '" << type << "' dname '" << dname
+    dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
             << " [" << first << "," << last << "]"
             << dendl;
 
@@ -1515,7 +1573,7 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn)
      * doesn't exist) but for which no explicit negative dentry is in
      * the cache.
      */
-    CDentry *dn = 0;
+    CDentry *dn = NULL;
     if (!stale)
       dn = lookup(dname, last);
 
@@ -1644,8 +1702,7 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn)
        }
       }
     } else {
-      dout(1) << "corrupt directory, i got tag char '" << type << "' val " << (int)(type)
-             << " at offset " << p.get_off() << dendl;
+      dout(1) << "corrupt directory, i got tag char '" << type << "' pos " << pos << dendl;
       assert(0);
     }
     
@@ -1680,10 +1737,6 @@ void CDir::_fetched(bufferlist &bl, const string& want_dn)
       }
     }
   }
-  if (!p.end()) {
-    clog.warn() << "dir " << dirfrag() << " has "
-       << bl.length() - p.get_off() << " extra bytes\n";
-  }
 
   //cache->mds->logger->inc("newin", num_new_inodes_loaded);
   //hack_num_accessed = 0;
@@ -1736,18 +1789,6 @@ void CDir::commit(version_t want, Context *c, bool ignore_authpinnability)
   _commit(want);
 }
 
-
-class C_Dir_RetryCommit : public Context {
-  CDir *dir;
-  version_t want;
-public:
-  C_Dir_RetryCommit(CDir *d, version_t v) : 
-    dir(d), want(v) { }
-  void finish(int r) {
-    dir->_commit(want);
-  }
-};
-
 class C_Dir_Committed : public Context {
   CDir *dir;
   version_t version;
@@ -1760,102 +1801,52 @@ public:
 };
 
 /**
- * Try and write out the full directory to disk.
- *
- * If the bufferlist we're using exceeds max_write_size, bail out
- * and switch to _commit_partial -- it can safely break itself into
- * multiple non-atomic writes.
+ * Flush out the modified dentries in this dir. Keep the bufferlist
+ * below max_write_size;
  */
-CDir::map_t::iterator CDir::_commit_full(ObjectOperation& m, const set<snapid_t> *snaps,
-                               unsigned max_write_size)
+void CDir::_omap_commit()
 {
-  dout(10) << "_commit_full" << dendl;
-
-  // encode
-  bufferlist bl;
-  __u32 n = 0;
-
-  bufferlist header;
-  ::encode(fnode, header);
-  max_write_size -= header.length();
-
-  map_t::iterator p = items.begin();
-  while (p != items.end() && bl.length() < max_write_size) {
-    CDentry *dn = p->second;
-    ++p;
-    
-    if (dn->linkage.is_null()) 
-      continue;  // skip negative entries
+  dout(10) << "_omap_commit" << dendl;
 
-    if (snaps && dn->last != CEPH_NOSNAP &&
-       try_trim_snap_dentry(dn, *snaps))
-      continue;
-    
-    n++;
-
-    _encode_dentry(dn, bl, snaps);
-  }
+  unsigned max_write_size = cache->max_dir_commit_size;
+  unsigned write_size = 0;
 
-  if (p != items.end()) {
-    assert(bl.length() > max_write_size);
-    return _commit_partial(m, snaps, max_write_size);
+  // snap purge?
+  const set<snapid_t> *snaps = NULL;
+  SnapRealm *realm = inode->find_snaprealm();
+  if (!realm->have_past_parents_open()) {
+    dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
+  } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
+    snaps = &realm->get_snaps();
+    dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
+            << " < " << realm->get_last_destroyed()
+            << ", snap purge based on " << *snaps << dendl;
   }
 
-  // encode final trivialmap
-  bufferlist finalbl;
-  ::encode(header, finalbl);
-  assert(num_head_items + num_head_null + num_snap_items + num_snap_null == items.size());
-  assert(n == (num_head_items + num_snap_items));
-  ::encode(n, finalbl);
-  finalbl.claim_append(bl);
+  set<string> to_remove;
+  map<string, bufferlist> to_set;
 
-  // write out the full blob
-  m.tmap_put(finalbl);
-  return p;
-}
+  C_GatherBuilder gather(g_ceph_context, new C_Dir_Committed(this, get_version()));
 
-/**
- * Flush out the modified dentries in this dir. Keep the bufferlist
- * below max_write_size; if we exceed that size then return the last
- * dentry that got committed into the bufferlist. (Note that the
- * bufferlist might be larger than requested by the size of that
- * last dentry as encoded.)
- *
- * If we're passed a last_committed_dn, skip to the next dentry after that.
- * Also, don't encode the header again -- we don't want to update it
- * on-disk until all the updates have made it through, so keep the header
- * in only the first changeset -- our caller is responsible for making sure
- * that changeset doesn't go through until after all the others do, if it's
- * necessary.
- */
-CDir::map_t::iterator CDir::_commit_partial(ObjectOperation& m,
-                                  const set<snapid_t> *snaps,
-                                  unsigned max_write_size,
-                                  map_t::iterator last_committed_dn)
-{
-  dout(10) << "_commit_partial" << dendl;
-  bufferlist finalbl;
-
-  // header
-  if (last_committed_dn == map_t::iterator()) {
-    bufferlist header;
-    ::encode(fnode, header);
-    finalbl.append(CEPH_OSD_TMAP_HDR);
-    ::encode(header, finalbl);
-  }
-
-  // updated dentries
-  map_t::iterator p = items.begin();
-  if(last_committed_dn != map_t::iterator())
-    p = last_committed_dn;
+  SnapContext snapc;
+  object_t oid = get_ondisk_object();
+  object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
 
-  while (p != items.end() && finalbl.length() < max_write_size) {
+  for (map_t::iterator p = items.begin();
+      p != items.end(); ) {
     CDentry *dn = p->second;
     ++p;
-    
-    if (snaps && dn->last != CEPH_NOSNAP &&
-       try_trim_snap_dentry(dn, *snaps))
+
+    string key;
+    dn->key().encode(key);
+
+    if (dn->last != CEPH_NOSNAP &&
+       snaps && try_trim_snap_dentry(dn, *snaps)) {
+      dout(10) << " rm " << dn->name << " " << *dn << dendl;
+      write_size += key.length();
+      to_remove.insert(key);
       continue;
+    }
 
     if (!dn->is_dirty() &&
        (!dn->state_test(CDentry::STATE_FRAGMENTING) || dn->get_linkage()->is_null()))
@@ -1863,18 +1854,60 @@ CDir::map_t::iterator CDir::_commit_partial(ObjectOperation& m,
 
     if (dn->get_linkage()->is_null()) {
       dout(10) << " rm " << dn->name << " " << *dn << dendl;
-      finalbl.append(CEPH_OSD_TMAP_RMSLOPPY);
-      dn->key().encode(finalbl);
+      write_size += key.length();
+      to_remove.insert(key);
     } else {
       dout(10) << " set " << dn->name << " " << *dn << dendl;
-      finalbl.append(CEPH_OSD_TMAP_SET);
-      _encode_dentry(dn, finalbl, snaps);
+      bufferlist dnbl;
+      _encode_dentry(dn, dnbl, snaps);
+      write_size += key.length() + dnbl.length();
+      to_set[key].swap(dnbl);
+    }
+
+    if (write_size >= max_write_size) {
+      ObjectOperation op;
+      op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
+      op.tmap_to_omap(true); // convert tmap to omap
+
+      if (!to_set.empty())
+       op.omap_set(to_set);
+      if (!to_remove.empty())
+       op.omap_rm_keys(to_remove);
+
+      cache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+                                  0, NULL, gather.new_sub());
+
+      write_size = 0;
+      to_set.clear();
+      to_remove.clear();
     }
   }
 
-  // update the trivialmap at the osd
-  m.tmap_update(finalbl);
-  return p;
+  ObjectOperation op;
+  op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
+  op.tmap_to_omap(true); // convert tmap to omap
+
+  /*
+   * save the header at the last moment.. If we were to send it off before other
+   * updates, but die before sending them all, we'd think that the on-disk state
+   * was fully committed even though it wasn't! However, since the messages are
+   * strictly ordered between the MDS and the OSD, and since messages to a given
+   * PG are strictly ordered, if we simply send the message containing the header
+   * off last, we cannot get our header into an incorrect state.
+   */
+  bufferlist header;
+  ::encode(fnode, header);
+  op.omap_set_header(header);
+
+  if (!to_set.empty())
+    op.omap_set(to_set);
+  if (!to_remove.empty())
+    op.omap_rm_keys(to_remove);
+
+  cache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+                              0, NULL, gather.new_sub());
+
+  gather.activate();
 }
 
 void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
@@ -1883,12 +1916,6 @@ void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
   // clear dentry NEW flag, if any.  we can no longer silently drop it.
   dn->clear_new();
 
-  dn->key().encode(bl);
-
-  ceph_le32 plen = init_le32(0);
-  unsigned plen_off = bl.length();
-  ::encode(plen, bl);
-
   ::encode(dn->first, bl);
 
   // primary or remote?
@@ -1928,15 +1955,8 @@ void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
       in->purge_stale_snap_data(*snaps);
     ::encode(in->old_inodes, bl);
   }
-  
-  plen = bl.length() - plen_off - sizeof(__u32);
-
-  ceph_le32 eplen;
-  eplen = plen;
-  bl.copy_in(plen_off, sizeof(eplen), (char*)&eplen);
 }
 
-
 void CDir::_commit(version_t want)
 {
   dout(10) << "_commit want " << want << " on " << *this << dendl;
@@ -1966,14 +1986,6 @@ void CDir::_commit(version_t want)
     return;
   }
   
-  // complete first?  (only if we're not using TMAPUP osd op)
-  if (!g_conf->mds_use_tmap && !is_complete()) {
-    dout(7) << "commit not complete, fetching first" << dendl;
-    if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_ffc);
-    fetch(new C_Dir_RetryCommit(this, want));
-    return;
-  }
-  
   // commit.
   committing_version = get_version();
 
@@ -1985,62 +1997,7 @@ void CDir::_commit(version_t want)
   
   if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_c);
 
-  // snap purge?
-  SnapRealm *realm = inode->find_snaprealm();
-  const set<snapid_t> *snaps = 0;
-  if (!realm->have_past_parents_open()) {
-    dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
-  } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
-    snaps = &realm->get_snaps();
-    dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
-            << " < " << realm->get_last_destroyed()
-            << ", snap purge based on " << *snaps << dendl;
-  }
-
-  ObjectOperation m;
-  map_t::iterator committed_dn;
-  unsigned max_write_size = cache->max_dir_commit_size;
-
-  if (is_complete() &&
-      ((num_dirty > (num_head_items*g_conf->mds_dir_commit_ratio)) ||
-       state_test(CDir::STATE_FRAGMENTING))) {
-    fnode.snap_purged_thru = realm->get_last_destroyed();
-    committed_dn = _commit_full(m, snaps, max_write_size);
-  } else {
-    committed_dn = _commit_partial(m, snaps, max_write_size);
-  }
-
-  SnapContext snapc;
-  object_t oid = get_ondisk_object();
-  object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
-
-  m.priority = CEPH_MSG_PRIO_LOW;  // set priority lower than journal!
-
-  if (committed_dn == items.end())
-    cache->mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0, NULL,
-                                 new C_Dir_Committed(this, get_version()));
-  else { // send in a different Context
-    C_GatherBuilder gather(g_ceph_context, new C_Dir_Committed(this, get_version()));
-    while (committed_dn != items.end()) {
-      ObjectOperation n = ObjectOperation();
-      committed_dn = _commit_partial(n, snaps, max_write_size, committed_dn);
-      cache->mds->objecter->mutate(oid, oloc, n, snapc, ceph_clock_now(g_ceph_context), 0, NULL,
-                                  gather.new_sub());
-    }
-    /*
-     * save the original object for last -- it contains the new header,
-     * which will be committed on-disk. If we were to send it off before
-     * the other commits, but die before sending them all, we'd think
-     * that the on-disk state was fully committed even though it wasn't!
-     * However, since the messages are strictly ordered between the MDS and
-     * the OSD, and since messages to a given PG are strictly ordered, if
-     * we simply send the message containing the header off last, we cannot
-     * get our header into an incorrect state.
-     */
-    cache->mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0, NULL,
-                                gather.new_sub());
-    gather.activate();
-  }
+   _omap_commit();
 }
 
 
index 0aa13d70b5099ecf30bc9b9c20b87f8c913591ea..db11c060623b4a7ac1f97f4b72d4911d89a51bb6 100644 (file)
@@ -278,6 +278,9 @@ protected:
 
   friend class CDirDiscover;
   friend class CDirExport;
+  friend class C_Dir_TMAP_Fetched;
+  friend class C_Dir_OMAP_Fetched;
+  friend class C_Dir_Committed;
 
   bloom_filter *bloom;
   /* If you set up the bloom filter, you must keep it accurate!
@@ -483,22 +486,23 @@ private:
   }
   void fetch(Context *c, bool ignore_authpinnability=false);
   void fetch(Context *c, const string& want_dn, bool ignore_authpinnability=false);
-  void _fetched(bufferlist &bl, const string& want_dn);
+protected:
+  void _omap_fetch(const string& want_dn);
+  void _omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
+                    const string& want_dn, int r);
+  void _tmap_fetch(const string& want_dn);
+  void _tmap_fetched(bufferlist &bl, const string& want_dn, int r);
 
   // -- commit --
   map<version_t, list<Context*> > waiting_for_commit;
-
-  void commit_to(version_t want);
-  void commit(version_t want, Context *c, bool ignore_authpinnability=false);
   void _commit(version_t want);
-  map_t::iterator _commit_full(ObjectOperation& m, const set<snapid_t> *snaps,
-                           unsigned max_write_size=-1);
-  map_t::iterator _commit_partial(ObjectOperation& m, const set<snapid_t> *snaps,
-                       unsigned max_write_size=-1,
-                       map_t::iterator last_committed_dn=map_t::iterator());
+  void _omap_commit();
   void _encode_dentry(CDentry *dn, bufferlist& bl, const set<snapid_t> *snaps);
   void _committed(version_t v);
+public:
   void wait_for_commit(Context *c, version_t v=0);
+  void commit_to(version_t want);
+  void commit(version_t want, Context *c, bool ignore_authpinnability=false);
 
   // -- dirtyness --
   version_t get_committing_version() { return committing_version; }
index e405657396e3bec3731853f82ff54f8b2f211b45..b12684364ba762f2395af5ee2ab2181f19cdbc9a 100644 (file)
@@ -11447,6 +11447,7 @@ void MDCache::_fragment_committed(dirfrag_t basedirfrag, list<CDir*>& resultfrag
       // backtrace object
       dout(10) << " truncate orphan dirfrag " << oid << dendl;
       op.truncate(0);
+      op.omap_clear();
     } else {
       dout(10) << " removing orphan dirfrag " << oid << dendl;
       op.remove();
index 05b0c42c20ab3a476d5e0f8be13e0b7f016157e7..20ef7493f98f83b87e90e5f0273f1e68758e6e87 100644 (file)
@@ -29,6 +29,7 @@ CompatSet get_mdsmap_compat_set() {
   feature_incompat.insert(MDS_FEATURE_INCOMPAT_FILELAYOUT);
   feature_incompat.insert(MDS_FEATURE_INCOMPAT_DIRINODE);
   feature_incompat.insert(MDS_FEATURE_INCOMPAT_ENCODING);
+  feature_incompat.insert(MDS_FEATURE_INCOMPAT_OMAPDIRFRAG);
 
   return CompatSet(feature_compat, feature_ro_compat, feature_incompat);
 }
index 5eadf156a956d51f0e2f8fca9dec2425c73ea217..feeee7e1393455a7451c0ca89032263eee6181a6 100644 (file)
@@ -66,6 +66,7 @@ extern CompatSet get_mdsmap_compat_set_base(); // pre v0.20
 #define MDS_FEATURE_INCOMPAT_FILELAYOUT CompatSet::Feature(3, "default file layouts on dirs")
 #define MDS_FEATURE_INCOMPAT_DIRINODE CompatSet::Feature(4, "dir inode in separate object")
 #define MDS_FEATURE_INCOMPAT_ENCODING CompatSet::Feature(5, "mds uses versioned encoding")
+#define MDS_FEATURE_INCOMPAT_OMAPDIRFRAG CompatSet::Feature(6, "dirfrag is stored in omap")
 
 class MDSMap {
 public:
index 2b7c8e0e66aaa16051a0d2d7d1d87d4c34221aaa..b180abfb60a26fdbce70f9c4b1f6b6150572b69c 100644 (file)
@@ -541,43 +541,40 @@ struct dentry_key_t {
   // encode into something that can be decoded as a string.
   // name_ (head) or name_%x (!head)
   void encode(bufferlist& bl) const {
-    __u32 l = strlen(name) + 1;
+    string key;
+    encode(key);
+    ::encode(key, bl);
+  }
+  void encode(string& key) const {
     char b[20];
     if (snapid != CEPH_NOSNAP) {
       uint64_t val(snapid);
       snprintf(b, sizeof(b), "%" PRIx64, val);
-      l += strlen(b);
     } else {
       snprintf(b, sizeof(b), "%s", "head");
-      l += 4;
     }
-    ::encode(l, bl);
-    bl.append(name, strlen(name));
-    bl.append("_", 1);
-    bl.append(b);
+    ostringstream oss;
+    oss << name << "_" << b;
+    key = oss.str();
   }
   static void decode_helper(bufferlist::iterator& bl, string& nm, snapid_t& sn) {
-    string foo;
-    ::decode(foo, bl);
-
-    int i = foo.length()-1;
-    while (foo[i] != '_' && i)
-      i--;
-    assert(i);
-    if (i+5 == (int)foo.length() &&
-       foo[i+1] == 'h' &&
-       foo[i+2] == 'e' &&
-       foo[i+3] == 'a' &&
-       foo[i+4] == 'd') {
+    string key;
+    ::decode(key, bl);
+    decode_helper(key, nm, sn);
+  }
+  static void decode_helper(const string& key, string& nm, snapid_t& sn) {
+    size_t i = key.find_last_of('_');
+    assert(i != string::npos);
+    if (key.compare(i+1, string::npos, "head") == 0) {
       // name_head
       sn = CEPH_NOSNAP;
     } else {
       // name_%x
       long long unsigned x = 0;
-      sscanf(foo.c_str() + i + 1, "%llx", &x);
+      sscanf(key.c_str() + i + 1, "%llx", &x);
       sn = x;
     }  
-    nm = string(foo.c_str(), i);
+    nm = string(key.c_str(), i);
   }
 };