// -----------------------
// FETCH
-
-class C_Dir_Fetch : public Context {
- protected:
- CDir *dir;
- string want_dn;
- public:
- bufferlist bl;
-
- C_Dir_Fetch(CDir *d, const string& w) : dir(d), want_dn(w) { }
- void finish(int result) {
- dir->_fetched(bl, want_dn);
- }
-};
-
void CDir::fetch(Context *c, bool ignore_authpinnability)
{
string want;
if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_f);
+ _omap_fetch(want_dn);
+}
+
+class C_Dir_TMAP_Fetched : public Context {
+ protected:
+ CDir *dir;
+ string want_dn;
+ public:
+ bufferlist bl;
+
+ C_Dir_TMAP_Fetched(CDir *d, const string& w) : dir(d), want_dn(w) { }
+ void finish(int r) {
+ dir->_tmap_fetched(bl, want_dn, r);
+ }
+};
+
+void CDir::_tmap_fetch(const string& want_dn)
+{
// start by reading the first hunk of it
- C_Dir_Fetch *fin = new C_Dir_Fetch(this, want_dn);
+ C_Dir_TMAP_Fetched *fin = new C_Dir_TMAP_Fetched(this, want_dn);
object_t oid = get_ondisk_object();
object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
ObjectOperation rd;
cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0, fin);
}
-void CDir::_fetched(bufferlist &bl, const string& want_dn)
+void CDir::_tmap_fetched(bufferlist& bl, const string& want_dn, int r)
{
LogClient &clog = cache->mds->clog;
- dout(10) << "_fetched " << bl.length()
- << " bytes for " << *this
- << " want_dn=" << want_dn
- << dendl;
-
+ dout(10) << "_tmap_fetched " << bl.length() << " bytes for " << *this
+ << " want_dn=" << want_dn << dendl;
+
+ assert(r == 0 || r == -ENOENT);
assert(is_auth());
assert(!is_frozen());
- // empty?!?
+ bufferlist header;
+ map<string, bufferlist> omap;
+
if (bl.length() == 0) {
+ r = -ENODATA;
+ } else {
+ bufferlist::iterator p = bl.begin();
+ ::decode(header, p);
+ ::decode(omap, p);
+
+ if (!p.end()) {
+ clog.warn() << "tmap buffer of dir " << dirfrag() << " has "
+ << bl.length() - p.get_off() << " extra bytes\n";
+ }
+ bl.clear();
+ }
+
+ _omap_fetched(header, omap, want_dn, r);
+}
+
+class C_Dir_OMAP_Fetched : public Context {
+ protected:
+ CDir *dir;
+ string want_dn;
+ public:
+ bufferlist hdrbl;
+ map<string, bufferlist> omap;
+ int ret1, ret2;
+
+ C_Dir_OMAP_Fetched(CDir *d, const string& w) : dir(d), want_dn(w) { }
+ void finish(int r) {
+ if (r >= 0) r = ret1;
+ if (r >= 0) r = ret2;
+ dir->_omap_fetched(hdrbl, omap, want_dn, r);
+ }
+};
+
+void CDir::_omap_fetch(const string& want_dn)
+{
+ C_Dir_OMAP_Fetched *fin = new C_Dir_OMAP_Fetched(this, want_dn);
+ object_t oid = get_ondisk_object();
+ object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
+ ObjectOperation rd;
+ rd.omap_get_header(&fin->hdrbl, &fin->ret1);
+ rd.omap_get_vals("", "", (uint64_t)-1, &fin->omap, &fin->ret2);
+ cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0, fin);
+}
+
+void CDir::_omap_fetched(bufferlist& hdrbl, map<string, bufferlist>& omap,
+ const string& want_dn, int r)
+{
+ LogClient &clog = cache->mds->clog;
+ dout(10) << "_fetched header " << hdrbl.length() << " bytes "
+ << omap.size() << " keys for " << *this
+ << " want_dn=" << want_dn << dendl;
+
+ assert(r == 0 || r == -ENOENT || r == -ENODATA);
+ assert(is_auth());
+ assert(!is_frozen());
+
+ if (hdrbl.length() == 0) {
+ if (r != -ENODATA) { // called by _tmap_fetched() ?
+ dout(10) << "_fetched 0 byte from omap, retry tmap" << dendl;
+ _tmap_fetch(want_dn);
+ return;
+ }
+
dout(0) << "_fetched missing object for " << *this << dendl;
- clog.error() << "dir " << ino() << "." << dirfrag()
- << " object missing on disk; some files may be lost\n";
+ clog.error() << "dir " << dirfrag() << " object missing on disk; some files may be lost\n";
log_mark_dirty();
return;
}
- // decode trivialmap.
- int len = bl.length();
- bufferlist::iterator p = bl.begin();
-
- bufferlist header;
- ::decode(header, p);
- bufferlist::iterator hp = header.begin();
fnode_t got_fnode;
- ::decode(got_fnode, hp);
-
- __u32 n;
- ::decode(n, p);
+ {
+ bufferlist::iterator p = hdrbl.begin();
+ ::decode(got_fnode, p);
+ if (!p.end()) {
+ clog.warn() << "header buffer of dir " << dirfrag() << " has "
+ << hdrbl.length() - p.get_off() << " extra bytes\n";
+ }
+ }
- dout(10) << "_fetched version " << got_fnode.version
- << ", " << len << " bytes, " << n << " keys"
- << dendl;
+ dout(10) << "_fetched version " << got_fnode.version << dendl;
-
// take the loaded fnode?
// only if we are a fresh CDir* with no prior state.
if (get_version() == 0) {
}
// purge stale snaps?
- // * only if we have past_parents open!
- const set<snapid_t> *snaps = 0;
+ // only if we have past_parents open!
+ bool purged_any = false;
+ const set<snapid_t> *snaps = NULL;
SnapRealm *realm = inode->find_snaprealm();
if (!realm->have_past_parents_open()) {
dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
<< ", snap purge based on " << *snaps << dendl;
fnode.snap_purged_thru = realm->get_last_destroyed();
}
- bool purged_any = false;
bool stray = inode->is_stray();
- //int num_new_inodes_loaded = 0;
- loff_t baseoff = p.get_off();
- for (unsigned i=0; i<n; i++) {
- loff_t dn_offset = p.get_off() - baseoff;
-
+ unsigned pos = 0;
+ for (map<string, bufferlist>::iterator p = omap.begin();
+ p != omap.end();
+ ++p, ++pos) {
// dname
string dname;
snapid_t first, last;
- dentry_key_t::decode_helper(p, dname, last);
+ dentry_key_t::decode_helper(p->first, dname, last);
- bufferlist dndata;
- ::decode(dndata, p);
- bufferlist::iterator q = dndata.begin();
+ bufferlist::iterator q = p->second.begin();
::decode(first, q);
// marker
char type;
::decode(type, q);
- dout(24) << "_fetched pos " << dn_offset << " marker '" << type << "' dname '" << dname
+ dout(20) << "_fetched pos " << pos << " marker '" << type << "' dname '" << dname
<< " [" << first << "," << last << "]"
<< dendl;
* doesn't exist) but for which no explicit negative dentry is in
* the cache.
*/
- CDentry *dn = 0;
+ CDentry *dn = NULL;
if (!stale)
dn = lookup(dname, last);
}
}
} else {
- dout(1) << "corrupt directory, i got tag char '" << type << "' val " << (int)(type)
- << " at offset " << p.get_off() << dendl;
+ dout(1) << "corrupt directory, i got tag char '" << type << "' pos " << pos << dendl;
assert(0);
}
}
}
}
- if (!p.end()) {
- clog.warn() << "dir " << dirfrag() << " has "
- << bl.length() - p.get_off() << " extra bytes\n";
- }
//cache->mds->logger->inc("newin", num_new_inodes_loaded);
//hack_num_accessed = 0;
_commit(want);
}
-
-class C_Dir_RetryCommit : public Context {
- CDir *dir;
- version_t want;
-public:
- C_Dir_RetryCommit(CDir *d, version_t v) :
- dir(d), want(v) { }
- void finish(int r) {
- dir->_commit(want);
- }
-};
-
class C_Dir_Committed : public Context {
CDir *dir;
version_t version;
};
/**
- * Try and write out the full directory to disk.
- *
- * If the bufferlist we're using exceeds max_write_size, bail out
- * and switch to _commit_partial -- it can safely break itself into
- * multiple non-atomic writes.
+ * Flush out the modified dentries in this dir. Keep the bufferlist
+ * below max_write_size;
*/
-CDir::map_t::iterator CDir::_commit_full(ObjectOperation& m, const set<snapid_t> *snaps,
- unsigned max_write_size)
+void CDir::_omap_commit()
{
- dout(10) << "_commit_full" << dendl;
-
- // encode
- bufferlist bl;
- __u32 n = 0;
-
- bufferlist header;
- ::encode(fnode, header);
- max_write_size -= header.length();
-
- map_t::iterator p = items.begin();
- while (p != items.end() && bl.length() < max_write_size) {
- CDentry *dn = p->second;
- ++p;
-
- if (dn->linkage.is_null())
- continue; // skip negative entries
+ dout(10) << "_omap_commit" << dendl;
- if (snaps && dn->last != CEPH_NOSNAP &&
- try_trim_snap_dentry(dn, *snaps))
- continue;
-
- n++;
-
- _encode_dentry(dn, bl, snaps);
- }
+ unsigned max_write_size = cache->max_dir_commit_size;
+ unsigned write_size = 0;
- if (p != items.end()) {
- assert(bl.length() > max_write_size);
- return _commit_partial(m, snaps, max_write_size);
+ // snap purge?
+ const set<snapid_t> *snaps = NULL;
+ SnapRealm *realm = inode->find_snaprealm();
+ if (!realm->have_past_parents_open()) {
+ dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
+ } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
+ snaps = &realm->get_snaps();
+ dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
+ << " < " << realm->get_last_destroyed()
+ << ", snap purge based on " << *snaps << dendl;
}
- // encode final trivialmap
- bufferlist finalbl;
- ::encode(header, finalbl);
- assert(num_head_items + num_head_null + num_snap_items + num_snap_null == items.size());
- assert(n == (num_head_items + num_snap_items));
- ::encode(n, finalbl);
- finalbl.claim_append(bl);
+ set<string> to_remove;
+ map<string, bufferlist> to_set;
- // write out the full blob
- m.tmap_put(finalbl);
- return p;
-}
+ C_GatherBuilder gather(g_ceph_context, new C_Dir_Committed(this, get_version()));
-/**
- * Flush out the modified dentries in this dir. Keep the bufferlist
- * below max_write_size; if we exceed that size then return the last
- * dentry that got committed into the bufferlist. (Note that the
- * bufferlist might be larger than requested by the size of that
- * last dentry as encoded.)
- *
- * If we're passed a last_committed_dn, skip to the next dentry after that.
- * Also, don't encode the header again -- we don't want to update it
- * on-disk until all the updates have made it through, so keep the header
- * in only the first changeset -- our caller is responsible for making sure
- * that changeset doesn't go through until after all the others do, if it's
- * necessary.
- */
-CDir::map_t::iterator CDir::_commit_partial(ObjectOperation& m,
- const set<snapid_t> *snaps,
- unsigned max_write_size,
- map_t::iterator last_committed_dn)
-{
- dout(10) << "_commit_partial" << dendl;
- bufferlist finalbl;
-
- // header
- if (last_committed_dn == map_t::iterator()) {
- bufferlist header;
- ::encode(fnode, header);
- finalbl.append(CEPH_OSD_TMAP_HDR);
- ::encode(header, finalbl);
- }
-
- // updated dentries
- map_t::iterator p = items.begin();
- if(last_committed_dn != map_t::iterator())
- p = last_committed_dn;
+ SnapContext snapc;
+ object_t oid = get_ondisk_object();
+ object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
- while (p != items.end() && finalbl.length() < max_write_size) {
+ for (map_t::iterator p = items.begin();
+ p != items.end(); ) {
CDentry *dn = p->second;
++p;
-
- if (snaps && dn->last != CEPH_NOSNAP &&
- try_trim_snap_dentry(dn, *snaps))
+
+ string key;
+ dn->key().encode(key);
+
+ if (dn->last != CEPH_NOSNAP &&
+ snaps && try_trim_snap_dentry(dn, *snaps)) {
+ dout(10) << " rm " << dn->name << " " << *dn << dendl;
+ write_size += key.length();
+ to_remove.insert(key);
continue;
+ }
if (!dn->is_dirty() &&
(!dn->state_test(CDentry::STATE_FRAGMENTING) || dn->get_linkage()->is_null()))
if (dn->get_linkage()->is_null()) {
dout(10) << " rm " << dn->name << " " << *dn << dendl;
- finalbl.append(CEPH_OSD_TMAP_RMSLOPPY);
- dn->key().encode(finalbl);
+ write_size += key.length();
+ to_remove.insert(key);
} else {
dout(10) << " set " << dn->name << " " << *dn << dendl;
- finalbl.append(CEPH_OSD_TMAP_SET);
- _encode_dentry(dn, finalbl, snaps);
+ bufferlist dnbl;
+ _encode_dentry(dn, dnbl, snaps);
+ write_size += key.length() + dnbl.length();
+ to_set[key].swap(dnbl);
+ }
+
+ if (write_size >= max_write_size) {
+ ObjectOperation op;
+ op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
+ op.tmap_to_omap(true); // convert tmap to omap
+
+ if (!to_set.empty())
+ op.omap_set(to_set);
+ if (!to_remove.empty())
+ op.omap_rm_keys(to_remove);
+
+ cache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ 0, NULL, gather.new_sub());
+
+ write_size = 0;
+ to_set.clear();
+ to_remove.clear();
}
}
- // update the trivialmap at the osd
- m.tmap_update(finalbl);
- return p;
+ ObjectOperation op;
+ op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
+ op.tmap_to_omap(true); // convert tmap to omap
+
+ /*
+ * save the header at the last moment.. If we were to send it off before other
+ * updates, but die before sending them all, we'd think that the on-disk state
+ * was fully committed even though it wasn't! However, since the messages are
+ * strictly ordered between the MDS and the OSD, and since messages to a given
+ * PG are strictly ordered, if we simply send the message containing the header
+ * off last, we cannot get our header into an incorrect state.
+ */
+ bufferlist header;
+ ::encode(fnode, header);
+ op.omap_set_header(header);
+
+ if (!to_set.empty())
+ op.omap_set(to_set);
+ if (!to_remove.empty())
+ op.omap_rm_keys(to_remove);
+
+ cache->mds->objecter->mutate(oid, oloc, op, snapc, ceph_clock_now(g_ceph_context),
+ 0, NULL, gather.new_sub());
+
+ gather.activate();
}
void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
// clear dentry NEW flag, if any. we can no longer silently drop it.
dn->clear_new();
- dn->key().encode(bl);
-
- ceph_le32 plen = init_le32(0);
- unsigned plen_off = bl.length();
- ::encode(plen, bl);
-
::encode(dn->first, bl);
// primary or remote?
in->purge_stale_snap_data(*snaps);
::encode(in->old_inodes, bl);
}
-
- plen = bl.length() - plen_off - sizeof(__u32);
-
- ceph_le32 eplen;
- eplen = plen;
- bl.copy_in(plen_off, sizeof(eplen), (char*)&eplen);
}
-
void CDir::_commit(version_t want)
{
dout(10) << "_commit want " << want << " on " << *this << dendl;
return;
}
- // complete first? (only if we're not using TMAPUP osd op)
- if (!g_conf->mds_use_tmap && !is_complete()) {
- dout(7) << "commit not complete, fetching first" << dendl;
- if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_ffc);
- fetch(new C_Dir_RetryCommit(this, want));
- return;
- }
-
// commit.
committing_version = get_version();
if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_c);
- // snap purge?
- SnapRealm *realm = inode->find_snaprealm();
- const set<snapid_t> *snaps = 0;
- if (!realm->have_past_parents_open()) {
- dout(10) << " no snap purge, one or more past parents NOT open" << dendl;
- } else if (fnode.snap_purged_thru < realm->get_last_destroyed()) {
- snaps = &realm->get_snaps();
- dout(10) << " snap_purged_thru " << fnode.snap_purged_thru
- << " < " << realm->get_last_destroyed()
- << ", snap purge based on " << *snaps << dendl;
- }
-
- ObjectOperation m;
- map_t::iterator committed_dn;
- unsigned max_write_size = cache->max_dir_commit_size;
-
- if (is_complete() &&
- ((num_dirty > (num_head_items*g_conf->mds_dir_commit_ratio)) ||
- state_test(CDir::STATE_FRAGMENTING))) {
- fnode.snap_purged_thru = realm->get_last_destroyed();
- committed_dn = _commit_full(m, snaps, max_write_size);
- } else {
- committed_dn = _commit_partial(m, snaps, max_write_size);
- }
-
- SnapContext snapc;
- object_t oid = get_ondisk_object();
- object_locator_t oloc(cache->mds->mdsmap->get_metadata_pool());
-
- m.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
-
- if (committed_dn == items.end())
- cache->mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0, NULL,
- new C_Dir_Committed(this, get_version()));
- else { // send in a different Context
- C_GatherBuilder gather(g_ceph_context, new C_Dir_Committed(this, get_version()));
- while (committed_dn != items.end()) {
- ObjectOperation n = ObjectOperation();
- committed_dn = _commit_partial(n, snaps, max_write_size, committed_dn);
- cache->mds->objecter->mutate(oid, oloc, n, snapc, ceph_clock_now(g_ceph_context), 0, NULL,
- gather.new_sub());
- }
- /*
- * save the original object for last -- it contains the new header,
- * which will be committed on-disk. If we were to send it off before
- * the other commits, but die before sending them all, we'd think
- * that the on-disk state was fully committed even though it wasn't!
- * However, since the messages are strictly ordered between the MDS and
- * the OSD, and since messages to a given PG are strictly ordered, if
- * we simply send the message containing the header off last, we cannot
- * get our header into an incorrect state.
- */
- cache->mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0, NULL,
- gather.new_sub());
- gather.activate();
- }
+ _omap_commit();
}