}
};
-
-void CDir::_commit_full(ObjectOperation& m, const set<snapid_t> *snaps)
+/**
+ * Try and write out the full directory to disk.
+ *
+ * If the bufferlist we're using exceeds max_write_size, bail out
+ * and switch to _commit_partial -- it can safely break itself into
+ * multiple non-atomic writes.
+ */
+CDir::map_t::iterator CDir::_commit_full(ObjectOperation& m, const set<snapid_t> *snaps,
+ unsigned max_write_size)
{
dout(10) << "_commit_full" << dendl;
// encode
bufferlist bl;
__u32 n = 0;
+
+ bufferlist header;
+ ::encode(fnode, header);
+ max_write_size -= header.length();
+
map_t::iterator p = items.begin();
- while (p != items.end()) {
+ while (p != items.end() && bl.length() < max_write_size) {
CDentry *dn = p->second;
p++;
_encode_dentry(dn, bl, snaps);
}
+ if (p != items.end()) {
+ assert(bl.length() > max_write_size);
+ return _commit_partial(m, snaps, max_write_size);
+ }
+
// encode final trivialmap
- bufferlist header;
- ::encode(fnode, header);
bufferlist finalbl;
::encode(header, finalbl);
assert(num_head_items + num_head_null + num_snap_items + num_snap_null == items.size());
// write out the full blob
m.tmap_put(finalbl);
+ return p;
}
-void CDir::_commit_partial(ObjectOperation& m, const set<snapid_t> *snaps)
+/**
+ * Flush out the modified dentries in this dir. Keep the bufferlist
+ * below max_write_size; if we exceed that size then return the last
+ * dentry that got committed into the bufferlist. (Note that the
+ * bufferlist might be larger than requested by the size of that
+ * last dentry as encoded.)
+ *
+ * If we're passed a last_committed_dn, skip to the next dentry after that.
+ */
+CDir::map_t::iterator CDir::_commit_partial(ObjectOperation& m,
+ const set<snapid_t> *snaps,
+ unsigned max_write_size,
+ map_t::iterator last_committed_dn)
{
dout(10) << "_commit_partial" << dendl;
bufferlist finalbl;
// updated dentries
map_t::iterator p = items.begin();
- while (p != items.end()) {
+ if(last_committed_dn != map_t::iterator())
+ p = last_committed_dn;
+
+ while (p != items.end() && finalbl.length() < max_write_size) {
CDentry *dn = p->second;
- p++;
+ ++p;
if (snaps && dn->last != CEPH_NOSNAP &&
try_trim_snap_dentry(dn, *snaps))
// update the trivialmap at the osd
m.tmap_update(finalbl);
+ return p;
}
void CDir::_encode_dentry(CDentry *dn, bufferlist& bl,
}
ObjectOperation m;
- if (is_complete() &&
- (num_dirty > (num_head_items*g_conf.mds_dir_commit_ratio))) {
- fnode.snap_purged_thru = realm->get_last_destroyed();
- _commit_full(m, snaps);
- } else {
- _commit_partial(m, snaps);
- }
+ map_t::iterator committed_dn;
+ unsigned max_write_size = -1;
// update parent pointer while we're here.
// NOTE: the pointer is ONLY required to be valid for the first frag. we put the xattr
// on other frags too because it can't hurt, but it won't necessarily be up to date
// in that case!!
- inode->encode_parent_mutation(m);
+ max_write_size -= inode->encode_parent_mutation(m);
+
+ if (is_complete() &&
+ (num_dirty > (num_head_items*g_conf.mds_dir_commit_ratio))) {
+ fnode.snap_purged_thru = realm->get_last_destroyed();
+ committed_dn = _commit_full(m, snaps, max_write_size);
+ } else {
+ committed_dn = _commit_partial(m, snaps, max_write_size);
+ }
SnapContext snapc;
object_t oid = get_ondisk_object();
m.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
- cache->mds->objecter->mutate(oid, oloc, m, snapc, g_clock.now(), 0,
- NULL, new C_Dir_Committed(this, get_version(), inode->inode.last_renamed_version) );
+ if (committed_dn == items.end())
+ cache->mds->objecter->mutate(oid, oloc, m, snapc, g_clock.now(), 0, NULL,
+ new C_Dir_Committed(this, get_version(),
+ inode->inode.last_renamed_version));
+ else { // send in a different Context
+ C_Gather *gather = new C_Gather(new C_Dir_Committed(this, get_version(),
+ inode->inode.last_renamed_version));
+ cache->mds->objecter->mutate(oid, oloc, m, snapc, g_clock.now(), 0, NULL,
+ gather->new_sub());
+ while (committed_dn != items.end()) {
+ m = ObjectOperation();
+ committed_dn = _commit_partial(m, snaps, max_write_size, committed_dn);
+ cache->mds->objecter->mutate(oid, oloc, m, snapc, g_clock.now(), 0, NULL,
+ gather->new_sub());
+ }
+ }
}
void commit_to(version_t want);
void commit(version_t want, Context *c, bool ignore_authpinnability=false);
void _commit(version_t want);
- void _commit_full(ObjectOperation& m, const set<snapid_t> *snaps);
- void _commit_partial(ObjectOperation& m, const set<snapid_t> *snaps);
+ map_t::iterator _commit_full(ObjectOperation& m, const set<snapid_t> *snaps,
+ unsigned max_write_size=-1);
+ map_t::iterator _commit_partial(ObjectOperation& m, const set<snapid_t> *snaps,
+ unsigned max_write_size=-1,
+ map_t::iterator last_committed_dn=map_t::iterator());
void _encode_dentry(CDentry *dn, bufferlist& bl, const set<snapid_t> *snaps);
void _committed(version_t v, version_t last_renamed_version);
void wait_for_commit(Context *c, version_t v=0);