OPTION(mds_op_log_threshold, OPT_INT, 5) // how many op log messages to show in one go
OPTION(mds_snap_min_uid, OPT_U32, 0) // The minimum UID required to create a snapshot
OPTION(mds_snap_max_uid, OPT_U32, 65536) // The maximum UID allowed to create a snapshot
+OPTION(mds_verify_backtrace, OPT_U32, 1)
// If true, compact leveldb store on mount
OPTION(osd_compact_leveldb_on_mount, OPT_BOOL, false)
public:
bufferlist hdrbl;
map<string, bufferlist> omap;
- int ret1, ret2;
+ bufferlist btbl;
+ int ret1, ret2, ret3;
C_IO_Dir_OMAP_Fetched(CDir *d, const string& w) : CDirIOContext(d), want_dn(w) { }
void finish(int r) {
+ // check the correctness of backtrace
+ if (r >= 0 && ret3 != -ECANCELED)
+ dir->inode->verify_diri_backtrace(btbl, ret3);
if (r >= 0) r = ret1;
if (r >= 0) r = ret2;
dir->_omap_fetched(hdrbl, omap, want_dn, r);
ObjectOperation rd;
rd.omap_get_header(&fin->hdrbl, &fin->ret1);
rd.omap_get_vals("", "", (uint64_t)-1, &fin->omap, &fin->ret2);
+ // check the correctness of backtrace
+ if (g_conf->mds_verify_backtrace > 0 && frag == frag_t()) {
+ rd.getxattr("parent", &fin->btbl, &fin->ret3);
+ rd.set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
+ } else {
+ fin->ret3 = -ECANCELED;
+ }
+
cache->mds->objecter->read(oid, oloc, rd, CEPH_NOSNAP, NULL, 0,
new C_OnFinisher(fin, &cache->mds->finisher));
}
}
}
+void CInode::verify_diri_backtrace(bufferlist &bl, int err)
+{
+ if (is_base() || is_dirty_parent())
+ return;
+
+ dout(10) << "verify_diri_backtrace" << dendl;
+
+ if (err == 0) {
+ inode_backtrace_t backtrace;
+ ::decode(backtrace, bl);
+ CDentry *pdn = get_parent_dn();
+ if (backtrace.ancestors.empty() ||
+ backtrace.ancestors[0].dname != pdn->name ||
+ backtrace.ancestors[0].dirino != pdn->get_dir()->ino())
+ err = -EINVAL;
+ }
+
+ if (err) {
+ MDS *mds = mdcache->mds;
+ mds->clog->error() << "bad backtrace on dir ino " << ino() << "\n";
+ assert(!"bad backtrace" == (g_conf->mds_verify_backtrace > 1));
+
+ _mark_dirty_parent(mds->mdlog->get_current_segment(), false);
+ mds->mdlog->flush();
+ }
+}
+
// ------------------
// parent dir
void fetch_backtrace(Context *fin, bufferlist *backtrace);
void _mark_dirty_parent(LogSegment *ls, bool dirty_pool=false);
void clear_dirty_parent();
+ void verify_diri_backtrace(bufferlist &bl, int err);
bool is_dirty_parent() { return state_test(STATE_DIRTYPARENT); }
bool is_dirty_pool() { return state_test(STATE_DIRTYPOOL); }