}
+void CDentry::adjust_nested_anchors(int by)
+{
+ nested_anchors += by;
+ dout(20) << "adjust_nested_anchors by " << by << " -> " << nested_anchors << dendl;
+ assert(nested_anchors >= 0);
+ dir->adjust_nested_anchors(by);
+}
+
// ----------------------------
// locking
off_t dir_offset;
int auth_pins, nested_auth_pins;
+ int nested_anchors;
friend class Migrator;
friend class Locker;
version(0), projected_version(0),
xlist_dirty(this),
dir_offset(0),
- auth_pins(0), nested_auth_pins(0),
+ auth_pins(0), nested_auth_pins(0), nested_anchors(0),
lock(this, CEPH_LOCK_DN, WAIT_LOCK_OFFSET) { }
CDentry(const string& n, CInode *in) :
name(n),
version(0), projected_version(0),
xlist_dirty(this),
dir_offset(0),
- auth_pins(0), nested_auth_pins(0),
+ auth_pins(0), nested_auth_pins(0), nested_anchors(0),
lock(this, CEPH_LOCK_DN, WAIT_LOCK_OFFSET) { }
CDentry(const string& n, inodeno_t ino, unsigned char dt, CInode *in=0) :
name(n),
version(0), projected_version(0),
xlist_dirty(this),
dir_offset(0),
- auth_pins(0), nested_auth_pins(0),
+ auth_pins(0), nested_auth_pins(0), nested_anchors(0),
lock(this, CEPH_LOCK_DN, WAIT_LOCK_OFFSET) { }
CInode *get_inode() const { return inode; }
void adjust_nested_auth_pins(int by);
bool is_frozen();
+ void adjust_nested_anchors(int by);
// dentry type is primary || remote || null
// inode ptr is required for primary, optional for remote, undefined for null
if (dir.get_cum_auth_pins())
out << " ap=" << dir.get_auth_pins() << "+" << dir.get_nested_auth_pins();
+ if (dir.get_nested_anchors())
+ out << " na=" << dir.get_nested_anchors();
out << " state=" << dir.get_state();
if (dir.state_test(CDir::STATE_COMPLETE)) out << "|complete";
nested_auth_pins = 0;
request_pins = 0;
+ nested_anchors = 0;
+
//hack_num_accessed = -1;
dir_rep = REP_NONE;
// adjust auth pin count
if (in->auth_pins + in->nested_auth_pins)
dn->adjust_nested_auth_pins(in->auth_pins + in->nested_auth_pins);
+
+ if (in->inode.anchored + in->nested_anchors)
+ dn->adjust_nested_anchors(in->nested_anchors + in->inode.anchored);
}
void CDir::unlink_inode( CDentry *dn )
if (in->auth_pins + in->nested_auth_pins)
dn->adjust_nested_auth_pins(0 - (in->auth_pins + in->nested_auth_pins));
+ if (in->inode.anchored + in->nested_anchors)
+ dn->adjust_nested_anchors(0 - (in->nested_anchors + in->inode.anchored));
+
// detach inode
in->remove_primary_parent(dn);
dn->inode = 0;
}
nested_auth_pins += dn->auth_pins + dn->nested_auth_pins;
+ nested_anchors += dn->nested_anchors;
if (dn->is_dirty())
num_dirty++;
inode->adjust_nested_auth_pins(inc);
}
+void CDir::adjust_nested_anchors(int by)
+{
+ nested_anchors += by;
+ dout(20) << "adjust_nested_anchors by " << by << " -> " << nested_anchors << dendl;
+ assert(nested_anchors >= 0);
+ inode->adjust_nested_anchors(by);
+}
/*****************************************************************************
int nested_auth_pins;
int request_pins;
+ int nested_anchors;
+
// cache control (defined for authority; hints for replicas)
int dir_rep;
set<int> dir_rep_by; // if dir_rep == REP_LIST
void auth_unpin();
void adjust_nested_auth_pins(int inc);
+ int get_nested_anchors() { return nested_anchors; }
+ void adjust_nested_anchors(int by);
+
// -- freezing --
bool freeze_tree();
void _freeze_tree();
::decode(tm, p);
if (inode.ctime < tm) inode.ctime = tm;
::decode(inode.nlink, p);
- ::decode(inode.anchored, p);
+ {
+ bool was_anchored = inode.anchored;
+ ::decode(inode.anchored, p);
+ if (was_anchored != inode.anchored)
+ parent->adjust_nested_anchors((int)inode.anchored - (int)was_anchored);
+ }
break;
case CEPH_LOCK_IDFT:
parent->adjust_nested_auth_pins(a);
}
-
+void CInode::adjust_nested_anchors(int by)
+{
+ nested_anchors += by;
+ dout(20) << "adjust_nested_anchors by " << by << " -> " << nested_anchors << dendl;
+ assert(nested_anchors >= 0);
+ if (parent)
+ parent->adjust_nested_anchors(by);
+}
// authority
LogSegment *ls)
{
utime_t old_mtime = inode.mtime;
+ bool was_anchored = inode.anchored;
::decode(inode, p);
+ if (was_anchored != inode.anchored)
+ parent->adjust_nested_anchors((int)inode.anchored - (int)was_anchored);
if (old_mtime > inode.mtime) {
assert(dirlock.is_updated());
inode.mtime = old_mtime; // preserve our mtime, if it is larger
static const int WAIT_DIRLOCK_OFFSET = 4 + 3*SimpleLock::WAIT_BITS; // same
static const int WAIT_VERSIONLOCK_OFFSET = 4 + 4*SimpleLock::WAIT_BITS;
static const int WAIT_XATTRLOCK_OFFSET = 4 + 5*SimpleLock::WAIT_BITS;
- static const int WAIT_NESTEDLOCK_OFFSET = 4 + 6*SimpleLock::WAIT_BITS;
static const int WAIT_ANY_MASK = (0xffffffff);
public:
int auth_pin_freeze_allowance;
+private:
+ int nested_anchors; // _NOT_ including me!
+
public:
inode_load_vec_t pop;
xlist_dirty_dirfrag_dir(this),
xlist_purging_inode(this),
auth_pins(0), nested_auth_pins(0),
+ nested_anchors(0),
versionlock(this, CEPH_LOCK_IVERSION, WAIT_VERSIONLOCK_OFFSET),
authlock(this, CEPH_LOCK_IAUTH, WAIT_AUTHLOCK_OFFSET),
linklock(this, CEPH_LOCK_ILINK, WAIT_LINKLOCK_OFFSET),
void auth_pin();
void auth_unpin();
+ void adjust_nested_anchors(int by);
// -- freeze --
bool is_freezing_inode() { return state_test(STATE_FREEZING); }
int get_replica_nonce() { return replica_nonce; }
void update_inode(CInode *in) {
+ if (in->parent && inode.anchored != in->inode.anchored)
+ in->parent->adjust_nested_anchors((int)inode.anchored - (int)in->inode.anchored);
in->inode = inode;
in->symlink = symlink;
in->dirfragtree = dirfragtree;
return;
}
+ // rdlock path
+ set<SimpleLock*> rdlocks = mdr->rdlocks;
+ set<SimpleLock*> wrlocks = mdr->wrlocks;
+ set<SimpleLock*> xlocks = mdr->xlocks;
+
+ CDentry *dn = in->get_parent_dn();
+ while (dn) {
+ rdlocks.insert(&dn->lock);
+ dn = dn->get_dir()->get_inode()->get_parent_dn();
+ }
+ if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) {
+ dout(7) << "anchor_create waiting for locks " << *in << dendl;
+ return;
+ }
+
// wait
in->add_waiter(CInode::WAIT_ANCHORED, onfinish);
void MDCache::_anchor_prepared(CInode *in, version_t atid, bool add)
{
- dout(10) << "_anchor_create_prepared " << *in << " atid " << atid << dendl;
- assert(in->inode.anchored == false);
+ dout(10) << "_anchor_prepared " << *in << " atid " << atid
+ << " " << (add ? "create":"destroy") << dendl;
+ assert(in->inode.anchored == !add);
// update the logged inode copy
inode_t *pi = in->project_inode();
if (add) {
pi->anchored = true;
pi->dirstat.ranchors++;
+ in->parent->adjust_nested_anchors(1);
} else {
pi->anchored = false;
pi->dirstat.ranchors--;
+ in->parent->adjust_nested_anchors(-1);
}
pi->version = in->pre_dirty();