map_t::iterator committed_dn;
unsigned max_write_size = cache->max_dir_commit_size;
- // update parent pointer while we're here.
- // NOTE: the pointer is ONLY required to be valid for the first frag. we put the xattr
- // on other frags too because it can't hurt, but it won't necessarily be up to date
- // in that case!!
- max_write_size -= inode->encode_parent_mutation(m);
-
if (is_complete() &&
(num_dirty > (num_head_items*g_conf->mds_dir_commit_ratio))) {
fnode.snap_purged_thru = realm->get_last_destroyed();
bool stray = inode->is_stray();
- // did we update the parent pointer too?
- if (get_frag() == frag_t() && // only counts on first frag
- inode->state_test(CInode::STATE_DIRTYPARENT) &&
- lrv == inode->inode.last_renamed_version) {
- inode->item_renamed_file.remove_myself();
- inode->state_clear(CInode::STATE_DIRTYPARENT);
- inode->put(CInode::PIN_DIRTYPARENT);
- dout(10) << "_committed stored parent pointer, removed from renamed_files list " << *inode << dendl;
- }
-
// take note.
assert(v > committed_version);
assert(v <= committing_version);
if (in.state_test(CInode::STATE_AMBIGUOUSAUTH)) out << " AMBIGAUTH";
if (in.state_test(CInode::STATE_NEEDSRECOVER)) out << " needsrecover";
if (in.state_test(CInode::STATE_RECOVERING)) out << " recovering";
- if (in.state_test(CInode::STATE_DIRTYPARENT)) out << " dirtyparent";
if (in.is_freezing_inode()) out << " FREEZING=" << in.auth_pin_freeze_allowance;
if (in.is_frozen_inode()) out << " FROZEN";
if (in.is_frozen_auth_pin()) out << " FROZEN_AUTHPIN";
delete fin;
}
-
-
-// ------------------
-// parent dir
-
-void CInode::build_backtrace(inode_backtrace_t& bt)
-{
- bt.ino = inode.ino;
- bt.ancestors.clear();
-
- CInode *in = this;
- CDentry *pdn = get_parent_dn();
- while (pdn) {
- CInode *diri = pdn->get_dir()->get_inode();
- bt.ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, in->inode.version));
- in = diri;
- pdn = in->get_parent_dn();
- }
-}
-
-unsigned CInode::encode_parent_mutation(ObjectOperation& m)
-{
- string path;
- make_path_string(path);
- m.setxattr("path", path);
-
- inode_backtrace_t bt;
- build_backtrace(bt);
-
- bufferlist parent;
- ::encode(bt, parent);
- m.setxattr("parent", parent);
- return path.length() + parent.length();
-}
-
-struct C_Inode_StoredParent : public Context {
+class C_CInode_FetchedBacktrace : public Context {
CInode *in;
- version_t version;
+ inode_backtrace_t *backtrace;
Context *fin;
- C_Inode_StoredParent(CInode *i, version_t v, Context *f) : in(i), version(v), fin(f) {}
+public:
+ bufferlist bl;
+ C_CInode_FetchedBacktrace(CInode *i, inode_backtrace_t *bt, Context *f) :
+ in(i), backtrace(bt), fin(f) {}
+
void finish(int r) {
- in->_stored_parent(version, fin);
+ if (r == 0) {
+ in->_fetched_backtrace(&bl, backtrace, fin);
+ } else {
+ fin->finish(r);
+ }
}
};
-void CInode::store_parent(Context *fin)
+void CInode::fetch_backtrace(inode_backtrace_t *bt, Context *fin)
{
- dout(10) << "store_parent" << dendl;
-
- ObjectOperation m;
- encode_parent_mutation(m);
-
- // write it.
- SnapContext snapc;
-
object_t oid = get_object_name(ino(), frag_t(), "");
- object_locator_t oloc(mdcache->mds->mdsmap->get_metadata_pool());
-
- mdcache->mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0,
- NULL, new C_Inode_StoredParent(this, inode.last_renamed_version, fin) );
+ object_locator_t oloc(inode.layout.fl_pg_pool);
+ SnapContext snapc;
+ C_CInode_FetchedBacktrace *c = new C_CInode_FetchedBacktrace(this, bt, fin);
+ mdcache->mds->objecter->getxattr(oid, oloc, "parent", CEPH_NOSNAP, &c->bl, 0, c);
}
-void CInode::_stored_parent(version_t v, Context *fin)
+void CInode::_fetched_backtrace(bufferlist *bl, inode_backtrace_t *bt, Context *fin)
{
- if (state_test(STATE_DIRTYPARENT)) {
- if (v == inode.last_renamed_version) {
- dout(10) << "stored_parent committed v" << v << ", removing from list" << dendl;
- item_renamed_file.remove_myself();
- state_clear(STATE_DIRTYPARENT);
- put(PIN_DIRTYPARENT);
- } else {
- dout(10) << "stored_parent committed v" << v << " < " << inode.last_renamed_version
- << ", renamed again, not removing from list" << dendl;
- }
- } else {
- dout(10) << "stored_parent committed v" << v << ", tho i wasn't on the renamed_files list" << dendl;
- }
+ ::decode(*bt, *bl);
if (fin) {
fin->finish(0);
- delete fin;
}
}
+void CInode::build_backtrace(int64_t location, inode_backtrace_t* bt)
+{
+ bt->ino = inode.ino;
+ bt->ancestors.clear();
+
+ CInode *in = this;
+ CDentry *pdn = get_parent_dn();
+ while (pdn) {
+ CInode *diri = pdn->get_dir()->get_inode();
+ bt->ancestors.push_back(inode_backpointer_t(diri->ino(), pdn->name, in->inode.version));
+ in = diri;
+ pdn = in->get_parent_dn();
+ }
+ vector<ceph_file_layout>::iterator i = inode.old_layouts.begin();
+ while(i != inode.old_layouts.end()) {
+ // don't add our own pool id to old_pools to avoid looping (e.g. setlayout 0, 1, 0)
+ if (i->fl_pg_pool == location)
+ continue;
+ bt->old_pools.insert(i->fl_pg_pool);
+ i++;
+ }
+}
+
+// ------------------
+// parent dir
+
void CInode::encode_store(bufferlist& bl)
{
ENCODE_START(4, 4, bl);
static const int STATE_NEEDSRECOVER = (1<<11);
static const int STATE_RECOVERING = (1<<12);
static const int STATE_PURGING = (1<<13);
- static const int STATE_DIRTYPARENT = (1<<14);
static const int STATE_DIRTYRSTAT = (1<<15);
static const int STATE_STRAYPINNED = (1<<16);
static const int STATE_FROZENAUTHPIN = (1<<17);
elist<CInode*>::item item_dirty;
elist<CInode*>::item item_caps;
elist<CInode*>::item item_open_file;
- elist<CInode*>::item item_renamed_file;
elist<CInode*>::item item_dirty_dirfrag_dir;
elist<CInode*>::item item_dirty_dirfrag_nest;
elist<CInode*>::item item_dirty_dirfrag_dirfragtree;
parent(0),
inode_auth(CDIR_AUTH_DEFAULT),
replica_caps_wanted(0),
- item_dirty(this), item_caps(this), item_open_file(this), item_renamed_file(this),
+ item_dirty(this), item_caps(this), item_open_file(this),
item_dirty_dirfrag_dir(this),
item_dirty_dirfrag_nest(this),
item_dirty_dirfrag_dirfragtree(this),
void fetch(Context *fin);
void _fetched(bufferlist& bl, bufferlist& bl2, Context *fin);
- void store_parent(Context *fin);
- void _stored_parent(version_t v, Context *fin);
+ void fetch_backtrace(inode_backtrace_t *bt, Context *fin);
+ void _fetched_backtrace(bufferlist *bl, inode_backtrace_t *bt, Context *fin);
- void build_backtrace(inode_backtrace_t& bt);
- unsigned encode_parent_mutation(ObjectOperation& m);
+ void build_backtrace(int64_t location, inode_backtrace_t* bt);
void encode_store(bufferlist& bl);
void decode_store(bufferlist::iterator& bl);
class MDS;
class MDSlaveUpdate;
+// The backtrace info struct here is used to maintain the backtrace in
+// a queue that we will eventually want to write out (on journal segment
+// expiry).
+class BacktraceInfo {
+public:
+ int64_t location;
+ int64_t pool;
+ struct inode_backtrace_t bt;
+ elist<BacktraceInfo*>::item item_logseg;
+ BacktraceInfo(int64_t l, CInode *i, LogSegment *ls, int64_t p = -1);
+ ~BacktraceInfo();
+};
+
class LogSegment {
public:
uint64_t offset, end;
elist<CDentry*> dirty_dentries;
elist<CInode*> open_files;
- elist<CInode*> renamed_files;
elist<CInode*> dirty_dirfrag_dir;
elist<CInode*> dirty_dirfrag_nest;
elist<CInode*> dirty_dirfrag_dirfragtree;
+ elist<BacktraceInfo*> update_backtraces;
+
elist<MDSlaveUpdate*> slave_updates;
set<CInode*> truncating_inodes;
dirty_inodes(member_offset(CInode, item_dirty)),
dirty_dentries(member_offset(CDentry, item_dirty)),
open_files(member_offset(CInode, item_open_file)),
- renamed_files(member_offset(CInode, item_renamed_file)),
dirty_dirfrag_dir(member_offset(CInode, item_dirty_dirfrag_dir)),
dirty_dirfrag_nest(member_offset(CInode, item_dirty_dirfrag_nest)),
dirty_dirfrag_dirfragtree(member_offset(CInode, item_dirty_dirfrag_dirfragtree)),
+ update_backtraces(member_offset(BacktraceInfo, item_logseg)),
slave_updates(0), // passed to begin() manually
inotablev(0), sessionmapv(0)
{ }
-};
+
+ // backtrace handling
+ void queue_backtrace_update(CInode *in, int64_t location, int64_t pool = -1);
+ void remove_pending_backtraces(inodeno_t ino, int64_t pool);
+ void store_backtrace_update(MDS *mds, BacktraceInfo *info, Context *fin);
+ void _stored_backtrace(BacktraceInfo *info, Context *fin);
+ unsigned encode_parent_mutation(ObjectOperation& m, BacktraceInfo *info);
+ };
#endif
seg->dirty_inodes.clear_list();
seg->dirty_dentries.clear_list();
seg->open_files.clear_list();
- seg->renamed_files.clear_list();
seg->dirty_dirfrag_dir.clear_list();
seg->dirty_dirfrag_nest.clear_list();
seg->dirty_dirfrag_dirfragtree.clear_list();
if (destdn->is_auth()) {
in->pop_and_dirty_projected_inode(mdr->ls);
- if (in->is_dir()) {
- mdr->ls->renamed_files.push_back(&in->item_renamed_file);
- if (!in->state_test(CInode::STATE_DIRTYPARENT)) {
- in->state_set(CInode::STATE_DIRTYPARENT);
- in->get(CInode::PIN_DIRTYPARENT);
- dout(10) << "added dir to logsegment renamed_files list " << *in << dendl;
- } else {
- dout(10) << "re-added dir to logsegment renamed_files list " << *in << dendl;
- }
- }
} else {
// FIXME: fix up snaprealm!
}
void inode_backtrace_t::encode(bufferlist& bl) const
{
- ENCODE_START(4, 4, bl);
+ ENCODE_START(5, 4, bl);
::encode(ino, bl);
::encode(ancestors, bl);
+ ::encode(pool, bl);
+ ::encode(old_pools, bl);
ENCODE_FINISH(bl);
}
void inode_backtrace_t::decode(bufferlist::iterator& bl)
{
- DECODE_START_LEGACY_COMPAT_LEN(4, 4, 4, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(5, 4, 4, bl);
if (struct_v < 3)
return; // sorry, the old data was crap
::decode(ino, bl);
ancestors.back().decode_old(bl);
}
}
+ if (struct_v >= 5) {
+ ::decode(pool, bl);
+ ::decode(old_pools, bl);
+ }
DECODE_FINISH(bl);
}
f->close_section();
}
f->close_section();
+ f->dump_int("pool", pool);
+ f->open_array_section("old_pools");
+ for (set<int64_t>::iterator p = old_pools.begin(); p != old_pools.end(); ++p) {
+ f->dump_int("old_pool", *p);
+ }
+ f->close_section();
}
void inode_backtrace_t::generate_test_instances(list<inode_backtrace_t*>& ls)
ls.back()->ancestors.back().dirino = 123;
ls.back()->ancestors.back().dname = "bar";
ls.back()->ancestors.back().version = 456;
+ ls.back()->pool = 0;
+ ls.back()->old_pools.insert(10);
+ ls.back()->old_pools.insert(7);
}
struct inode_backtrace_t {
inodeno_t ino; // my ino
vector<inode_backpointer_t> ancestors;
+ int64_t pool;
+ // we use a set for old_pools to avoid duplicate entries, e.g. setlayout 0, 1, 0
+ set<int64_t> old_pools;
+
+ inode_backtrace_t() : pool(-1) {}
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator &bl);
WRITE_CLASS_ENCODER(inode_backtrace_t)
inline ostream& operator<<(ostream& out, const inode_backtrace_t& it) {
- return out << it.ino << ":" << it.ancestors;
+ return out << "(" << it.pool << ")" << it.ino << ":" << it.ancestors << "//" << it.old_pools;
}
}
}
- // parent pointers on renamed dirs
- for (elist<CInode*>::iterator p = renamed_files.begin(); !p.end(); ++p) {
- CInode *in = *p;
- dout(10) << "try_to_expire waiting for dir parent pointer update on " << *in << dendl;
- assert(in->state_test(CInode::STATE_DIRTYPARENT));
- in->store_parent(gather_bld.new_sub());
+ // backtraces to be stored/updated
+ for (elist<BacktraceInfo*>::iterator p = update_backtraces.begin(); !p.end(); ++p) {
+ BacktraceInfo *btinfo = *p;
+ store_backtrace_update(mds, btinfo, gather_bld.new_sub());
}
// slave updates
}
}
+// ----------------------------
+// backtrace handling
+
+// BacktraceInfo is used for keeping the
+// current state of the backtrace to be stored later on
+// logsegment expire. Constructing a BacktraceInfo
+// automatically puts it on the LogSegment list that is passed in,
+// after building the backtrace based on the current state of the inode. We
+// construct the backtrace here to avoid keeping a ref to the inode.
+BacktraceInfo::BacktraceInfo(
+ int64_t l, CInode *i, LogSegment *ls, int64_t p) :
+ location(l), pool(p) {
+
+ // on setlayout cases, forward pointers mean
+ // pool != location, but for all others it does
+ if (pool == -1) pool = location;
+
+ bt.pool = pool;
+ i->build_backtrace(l, bt);
+ ls->update_backtraces.push_back(&item_logseg);
+}
+
+// When the info_t is destroyed, it just needs to remove itself
+// from the LogSegment list
+BacktraceInfo::~BacktraceInfo() {
+ item_logseg.remove_myself();
+}
+
+// Queue a backtrace for later
+void LogSegment::queue_backtrace_update(CInode *inode, int64_t location, int64_t pool) {
+ // allocating a pointer here and not setting it to anything
+ // might look strange, but the constructor adds itself to the backtraces
+ // list of this LogSegment, which is how we keep track of it
+ new BacktraceInfo(location, inode, this, pool);
+}
+
+void LogSegment::remove_pending_backtraces(inodeno_t ino, int64_t pool) {
+ elist<BacktraceInfo*>::iterator i = update_backtraces.begin();
+ while(!i.end()) {
+ ++i;
+ if((*i)->bt.ino == ino && (*i)->location == pool) {
+ delete (*i);
+ }
+ }
+}
+
+unsigned LogSegment::encode_parent_mutation(ObjectOperation& m, BacktraceInfo *info)
+{
+ bufferlist parent;
+ ::encode(info->bt, parent);
+ m.setxattr("parent", parent);
+ return parent.length();
+}
+
+struct C_LogSegment_StoredBacktrace : public Context {
+ LogSegment *ls;
+ BacktraceInfo *info;
+ Context *fin;
+ C_LogSegment_StoredBacktrace(LogSegment *l, BacktraceInfo *c,
+ Context *f) : ls(l), info(c), fin(f) {}
+ void finish(int r) {
+ ls->_stored_backtrace(info, fin);
+ }
+};
+
+void LogSegment::store_backtrace_update(MDS *mds, BacktraceInfo *info, Context *fin)
+{
+ ObjectOperation m;
+ // prev_pool will be the target pool on create,mkdir,etc.
+ encode_parent_mutation(m, info);
+
+ // write it.
+ SnapContext snapc;
+
+ object_t oid = CInode::get_object_name(info->bt.ino, frag_t(), "");
+
+ dout(10) << "store_parent for oid " << oid << " location " << info->location << " pool " << info->pool << dendl;
+
+ // store the backtrace in the specified pool
+ object_locator_t oloc(info->location);
+
+ mds->objecter->mutate(oid, oloc, m, snapc, ceph_clock_now(g_ceph_context), 0,
+ NULL, new C_LogSegment_StoredBacktrace(this, info, fin) );
+
+}
+
+void LogSegment::_stored_backtrace(BacktraceInfo *info, Context *fin)
+{
+ delete info;
+ if (fin) {
+ fin->finish(0);
+ delete fin;
+ }
+}
#undef DOUT_COND
#define DOUT_COND(cct, l) (l<=cct->_conf->debug_mds || l <= cct->_conf->debug_mds_log)
*/
void inode_t::encode(bufferlist &bl) const
{
- ENCODE_START(6, 6, bl);
+ ENCODE_START(7, 6, bl);
::encode(ino, bl);
::encode(rdev, bl);
::encode(file_data_version, bl);
::encode(xattr_version, bl);
::encode(last_renamed_version, bl);
+ ::encode(old_pools, bl);
ENCODE_FINISH(bl);
}
void inode_t::decode(bufferlist::iterator &p)
{
- DECODE_START_LEGACY_COMPAT_LEN(6, 6, 6, p);
+ DECODE_START_LEGACY_COMPAT_LEN(7, 6, 6, p);
::decode(ino, p);
::decode(rdev, p);
::decode(xattr_version, p);
if (struct_v >= 2)
::decode(last_renamed_version, p);
+ if (struct_v >= 7)
+ ::decode(old_pools, p);
DECODE_FINISH(p);
}
::dump(layout, f);
f->close_section();
+ f->open_array_section("old_pools");
+ vector<ceph_file_layout>::const_iterator i = old_pools.begin();
+ while(i != old_pools.end()) {
+ ::dump(*i, f);
+ }
+ f->close_section();
+
f->dump_unsigned("size", size);
f->dump_unsigned("truncate_seq", truncate_seq);
f->dump_unsigned("truncate_size", truncate_size);
// file (data access)
ceph_dir_layout dir_layout; // [dir only]
ceph_file_layout layout;
+ vector <int64_t> old_pools;
uint64_t size; // on directory, # dentries
uint32_t truncate_seq;
uint64_t truncate_size, truncate_from;
}
}
+ void add_old_pool(int64_t l) {
+ old_pools.push_back(l);
+ }
+
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator& bl);
void dump(Formatter *f) const;