*/
#include <string_view>
+#include <algorithm>
#include "include/types.h"
stale = true;
}
}
-
+
/*
* look for existing dentry for _last_ snap, because unlink +
* create may leave a "hole" (epochs during which the dentry
} else {
// (remote) link
dn = add_remote_dentry(dname, ino, d_type, first, last);
-
+
// link to inode?
CInode *in = mdcache->get_inode(ino); // we may or may not have it.
if (in) {
dout(12) << "_fetched got remote link " << ino << " (don't have it)" << dendl;
}
}
- }
+ }
else if (type == 'I') {
// inode
-
+
// Load inode data before looking up or constructing CInode
InodeStore inode_data;
inode_data.decode_bare(q);
-
+
if (stale) {
if (!dn) {
stale_items.insert(mempool::mds_co::string(key));
dir->_committed(r, version);
}
void print(ostream& out) const override {
- out << "dirfrag_commit(" << dir->dirfrag() << ")";
+ out << "dirfrag_committed(" << dir->dirfrag() << ")";
+ }
+};
+
+class C_IO_Dir_Commit_Ops : public Context {
+public:
+ C_IO_Dir_Commit_Ops(CDir *d, int pr, bufferlist &&bl,
+ vector<dentry_key_t> &&r, vector<CDir::dentry_commit_item> &&s,
+ mempool::mds_co::compact_set<mempool::mds_co::string> &&stale) :
+ dir(d), op_prio(pr) {
+ version = dir->get_version();
+ is_new = dir->is_new();
+ dfts.swap(bl);
+ to_remove.swap(r);
+ to_set.swap(s);
+ stale_items.swap(stale);
+ }
+
+ void finish(int r) override {
+ dir->_omap_commit_ops(r, op_prio, version, is_new, dfts, to_remove, to_set,
+ stale_items);
}
+
+private:
+ CDir *dir;
+ version_t version;
+ int op_prio;
+ bool is_new;
+ bufferlist dfts;
+ vector<dentry_key_t> to_remove;
+ vector<CDir::dentry_commit_item> to_set;
+ mempool::mds_co::compact_set<mempool::mds_co::string> stale_items;
};
+// This is not locked by mds_lock
+void CDir::_omap_commit_ops(int r, int op_prio, version_t version, bool _new, bufferlist &dfts,
+ vector<dentry_key_t>& to_remove, vector<dentry_commit_item> &to_set,
+ mempool::mds_co::compact_set<mempool::mds_co::string> &stales)
+{
+ dout(10) << __func__ << dendl;
+
+ if (r < 0) {
+ mdcache->mds->handle_write_error_with_lock(r);
+ return;
+ }
+
+ C_GatherBuilder gather(g_ceph_context,
+ new C_OnFinisher(new C_IO_Dir_Committed(this, version),
+ mdcache->mds->finisher));
+
+ SnapContext snapc;
+ object_t oid = get_ondisk_object();
+ object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
+
+ map<string, bufferlist> _set;
+ set<string> _rm;
+
+ unsigned max_write_size = mdcache->max_dir_commit_size;
+ unsigned write_size = 0;
+
+ auto commit_one = [&](bool header=false) {
+ ObjectOperation op;
+
+ // don't create new dirfrag blindly
+ if (!_new)
+ op.stat(nullptr, nullptr, nullptr);
+
+ /*
+ * save the header at the last moment.. If we were to send it off before
+ * other updates, but die before sending them all, we'd think that the
+ * on-disk state was fully committed even though it wasn't! However, since
+ * the messages are strictly ordered between the MDS and the OSD, and
+ * since messages to a given PG are strictly ordered, if we simply send
+ * the message containing the header off last, we cannot get our header
+ * into an incorrect state.
+ */
+ if (header) {
+ bufferlist header;
+ encode(*fnode, header);
+ op.omap_set_header(header);
+ }
+
+ op.priority = op_prio;
+ if (!_set.empty())
+ op.omap_set(_set);
+ if (!_rm.empty())
+ op.omap_rm_keys(_rm);
+ mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
+ ceph::real_clock::now(),
+ 0, gather.new_sub());
+ write_size = 0;
+ _set.clear();
+ _rm.clear();
+ };
+
+ for (auto &key : stales) {
+ write_size += key.length();
+ _rm.emplace(key);
+
+ if (write_size >= max_write_size)
+ commit_one();
+ }
+
+ for (auto &k : to_remove) {
+ string key;
+ k.encode(key);
+ write_size += key.length();
+ _rm.emplace(std::move(key));
+
+ if (write_size >= max_write_size)
+ commit_one();
+ }
+
+ uint64_t off = 0;
+ bufferlist bl;
+ using ceph::encode;
+ for (auto &item : to_set) {
+ string key;
+ item.key.encode(key);
+
+ encode(item.first, bl);
+ if (item.is_remote) {
+ bl.append('L'); // remote link
+ encode(item.ino, bl);
+ encode(item.d_type, bl);
+ } else {
+ bl.append('I'); // inode
+
+ encode(*item.inode, bl, item.features);
+
+ if (!item.symlink.empty())
+ encode(item.symlink, bl);
+
+ // dirfragtree
+ dfts.splice(0, item.dft_len, &bl);
+
+ if (item.xattrs)
+ encode(*item.xattrs, bl);
+ else
+ encode((__u32)0, bl);
+
+ if (item.snaprealm) {
+ bufferlist snapr_bl;
+ encode(item.srnode, snapr_bl);
+ encode(snapr_bl, bl);
+ } else {
+ encode(bufferlist(), bl);
+ }
+
+ if (item.old_inodes)
+ encode(*item.old_inodes, bl, item.features);
+ else
+ encode((__u32)0, bl);
+
+ encode(item.oldest_snap, bl);
+ encode(item.damage_flags, bl);
+ }
+ off += item.dft_len;
+
+ write_size += key.length() + bl.length();
+ _set[std::move(key)].swap(bl);
+ if (write_size >= max_write_size)
+ commit_one();
+ }
+
+ commit_one(true);
+ gather.activate();
+}
+
/**
* Flush out the modified dentries in this dir. Keep the bufferlist
* below max_write_size;
{
dout(10) << __func__ << dendl;
- unsigned max_write_size = mdcache->max_dir_commit_size;
- unsigned write_size = 0;
-
if (op_prio < 0)
op_prio = CEPH_MSG_PRIO_DEFAULT;
// fnode.snap_purged_thru = realm->get_last_destroyed();
}
- set<string> to_remove;
- map<string, bufferlist> to_set;
+ size_t count = 0;
+ if (state_test(CDir::STATE_FRAGMENTING) && is_new()) {
+ count = get_num_head_items() && get_num_snap_items();
+ } else {
+ for (elist<CDentry*>::iterator it = dirty_dentries.begin(); !it.end(); ++it)
+ ++count;
+ }
- C_GatherBuilder gather(g_ceph_context,
- new C_OnFinisher(new C_IO_Dir_Committed(this,
- get_version()),
- mdcache->mds->finisher));
+ vector<dentry_key_t> to_remove;
+ // reverve enough memories, which maybe larger than the actually needed
+ to_remove.reserve(count);
- SnapContext snapc;
- object_t oid = get_ondisk_object();
- object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
+ vector<dentry_commit_item> to_set;
+ // reverve enough memories, which maybe larger than the actually needed
+ to_set.reserve(count);
- if (!stale_items.empty()) {
- for (const auto &p : stale_items) {
- to_remove.insert(std::string(p));
- write_size += p.length();
- }
- stale_items.clear();
- }
+ bufferlist dfts(CEPH_PAGE_SIZE);
auto write_one = [&](CDentry *dn) {
- string key;
- dn->key().encode(key);
+ auto key = dn->key();
if (dn->last != CEPH_NOSNAP &&
snaps && try_trim_snap_dentry(dn, *snaps)) {
dout(10) << " rm " << key << dendl;
- write_size += key.length();
- to_remove.insert(key);
+ to_remove.push_back(key);
return;
}
if (dn->get_linkage()->is_null()) {
dout(10) << " rm " << dn->get_name() << " " << *dn << dendl;
- write_size += key.length();
- to_remove.insert(key);
+ to_remove.push_back(key);
} else {
dout(10) << " set " << dn->get_name() << " " << *dn << dendl;
- bufferlist dnbl;
- _encode_dentry(dn, dnbl, snaps);
- write_size += key.length() + dnbl.length();
- to_set[key].swap(dnbl);
- }
-
- if (write_size >= max_write_size) {
- ObjectOperation op;
- op.priority = op_prio;
-
- // don't create new dirfrag blindly
- if (!is_new())
- op.stat(nullptr, nullptr, nullptr);
-
- if (!to_set.empty())
- op.omap_set(to_set);
- if (!to_remove.empty())
- op.omap_rm_keys(to_remove);
- mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
- ceph::real_clock::now(),
- 0, gather.new_sub());
+ uint64_t off = dfts.length();
+ // try to reserve new size if there has less
+ // than 1/8 page space
+ uint64_t left = CEPH_PAGE_SIZE - off % CEPH_PAGE_SIZE;
+ if (left < CEPH_PAGE_SIZE / 8)
+ dfts.reserve(left + CEPH_PAGE_SIZE);
- write_size = 0;
- to_set.clear();
- to_remove.clear();
+ auto& item = to_set.emplace_back();
+ item.key = key;
+ _parse_dentry(dn, item, snaps, dfts);
+ item.dft_len = dfts.length() - off;
}
};
}
}
- ObjectOperation op;
- op.priority = op_prio;
-
- // don't create new dirfrag blindly
- if (!is_new())
- op.stat(nullptr, nullptr, nullptr);
-
- /*
- * save the header at the last moment.. If we were to send it off before other
- * updates, but die before sending them all, we'd think that the on-disk state
- * was fully committed even though it wasn't! However, since the messages are
- * strictly ordered between the MDS and the OSD, and since messages to a given
- * PG are strictly ordered, if we simply send the message containing the header
- * off last, we cannot get our header into an incorrect state.
- */
- bufferlist header;
- encode(*fnode, header);
- op.omap_set_header(header);
-
- if (!to_set.empty())
- op.omap_set(to_set);
- if (!to_remove.empty())
- op.omap_rm_keys(to_remove);
-
- mdcache->mds->objecter->mutate(oid, oloc, op, snapc,
- ceph::real_clock::now(),
- 0, gather.new_sub());
-
- gather.activate();
+ auto c = new C_IO_Dir_Commit_Ops(this, op_prio, std::move(dfts),
+ std::move(to_remove), std::move(to_set),
+ std::move(stale_items));
+ stale_items.clear();
+ mdcache->mds->finisher->queue(c);
}
-void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
- const set<snapid_t> *snaps)
+void CDir::_parse_dentry(CDentry *dn, dentry_commit_item &item,
+ const set<snapid_t> *snaps, bufferlist &bl)
{
// clear dentry NEW flag, if any. we can no longer silently drop it.
dn->clear_new();
- encode(dn->first, bl);
+ item.first = dn->first;
// primary or remote?
if (dn->linkage.is_remote()) {
- inodeno_t ino = dn->linkage.get_remote_ino();
- unsigned char d_type = dn->linkage.get_remote_d_type();
- dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' remote ino " << ino << dendl;
-
- // marker, name, ino
- bl.append('L'); // remote link
- encode(ino, bl);
- encode(d_type, bl);
+ item.is_remote = true;
+ item.ino = dn->linkage.get_remote_ino();
+ item.d_type = dn->linkage.get_remote_d_type();
+ dout(14) << " dn '" << dn->get_name() << "' remote ino " << item.ino << dendl;
} else if (dn->linkage.is_primary()) {
// primary link
CInode *in = dn->linkage.get_inode();
ceph_assert(in);
-
- dout(14) << " pos " << bl.length() << " dn '" << dn->get_name() << "' inode " << *in << dendl;
-
+
+ dout(14) << " dn '" << dn->get_name() << "' inode " << *in << dendl;
// marker, name, inode, [symlink string]
- bl.append('I'); // inode
if (in->is_multiversion()) {
if (!in->snaprealm) {
}
}
- bufferlist snap_blob;
- in->encode_snap_blob(snap_blob);
- in->encode_bare(bl, mdcache->mds->mdsmap->get_up_features(), &snap_blob);
+ if (in->snaprealm) {
+ item.snaprealm = true;
+ item.srnode = in->snaprealm->srnode;
+ }
+ item.features = mdcache->mds->mdsmap->get_up_features();
+ item.inode = in->inode;
+ if (in->inode->is_symlink())
+ item.symlink = in->symlink;
+ using ceph::encode;
+ encode(in->dirfragtree, bl);
+ item.xattrs = in->xattrs;
+ item.old_inodes = in->old_inodes;
+ item.oldest_snap = in->oldest_snap;
+ item.damage_flags = in->damage_flags;
} else {
ceph_assert(!dn->linkage.is_null());
}