- then, mtimes:
/ - avoid migration race concern (on auth).
- - writeback and dirty on gather.
- - cleaned up pv/pi makes writebehind play nice with concurrent updates.
- - should pin lock in LOCK state?
- - scatterlock 'updated' flag.
+/ - writeback and dirty on gather.
+/ - cleaned up pv/pi makes writebehind play nice with concurrent updates.
+/ - should pin lock in LOCK state? (acdtually, gather)
+/ - scatterlock 'updated' flag.
- on replica, clear only on sync | rescatter.
- make sure "dirty" scatterlock prevents journal expire.
- EMetaBlob map<inodeno_t,utime_t> dirty_scatter;
static const int STATE_OPENINGDIR = (1<<9);
// -- waiters --
- static const int WAIT_SLAVEAGREE = (1<<0);
+ //static const int WAIT_SLAVEAGREE = (1<<0);
static const int WAIT_DIR = (1<<1);
static const int WAIT_ANCHORED = (1<<2);
static const int WAIT_UNANCHORED = (1<<3);
}
void get_wrlock() {
assert(can_wrlock());
+ if (num_wrlock == 0) parent->get(MDSCacheObject::PIN_LOCK);
++num_wrlock;
}
void put_wrlock() {
--num_wrlock;
+ if (num_wrlock == 0) parent->put(MDSCacheObject::PIN_LOCK);
}
bool is_wrlocked() { return num_wrlock > 0; }
int get_num_wrlocks() { return num_wrlock; }
// glockc -> lock?
else if (lock->get_state() == LOCK_GLOCKC &&
!lock->is_gathering() &&
- !lock->is_wrlocked()) {
+ !lock->is_wrlocked() &&
+ !lock->is_updated()) {
dout(7) << "scatter_eval finished lock gather/un-wrlock on " << *lock
<< " on " << *lock->get_parent() << endl;
-
- if (lock->is_updated()) {
- // updated flag is set: we got new data during the gather.
- // write-behind journal.
- // version_t v
-
- }
-
lock->set_state(LOCK_LOCK);
//lock->get_parent()->put(CInode::PIN_SCATTERED);
lock->finish_waiters(ScatterLock::WAIT_XLOCK|ScatterLock::WAIT_STABLE);
else if ((lock->get_state() == LOCK_GTEMPSYNCC ||
lock->get_state() == LOCK_GTEMPSYNCL) &&
!lock->is_gathering() &&
- !lock->is_wrlocked()) {
+ !lock->is_wrlocked() &&
+ !lock->is_updated()) {
dout(7) << "scatter_eval finished tempsync gather/un-wrlock on " << *lock
<< " on " << *lock->get_parent() << endl;
lock->set_state(LOCK_TEMPSYNC);
lock->set_state(LOCK_SYNC);
lock->decode_locked_state(m->get_data());
+ lock->clear_updated();
lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
break;
case LOCK_AC_SCATTER:
assert(lock->get_state() == LOCK_LOCK);
lock->decode_locked_state(m->get_data());
+ lock->clear_updated();
lock->set_state(LOCK_SCATTER);
//lock->get_parent()->get(CInode::PIN_SCATTERED);
lock->finish_waiters(ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
dout(7) << "handle_scatter_lock " << *lock << " on " << *lock->get_parent()
<< " from " << from << ", last one"
<< endl;
- scatter_eval_gather(lock);
+
+ if (lock->is_updated()) {
+ // journal write-behind.
+ CInode *in = (CInode*)lock->get_parent();
+ inode_t *pi = in->project_inode();
+ pi->version = in->pre_dirty();
+
+ EUpdate *le = new EUpdate("dir.mtime writebehind");
+ le->metablob.add_dir_context(in->get_parent_dn()->get_dir());
+ le->metablob.add_primary_dentry(in->get_parent_dn(), true, 0, pi);
+
+ mds->mdlog->submit_entry(le);
+ mds->mdlog->wait_for_sync(new C_Locker_GatherWB(this, lock));
+ }
+ else {
+ // WARNING: this is non-optimal, but simplest.
+ // just block the gather until we flush the writeback to the journal.
+ scatter_eval_gather(lock);
+ }
}
break;
delete m;
}
+void Locker::scatter_gather_writebehind(ScatterLock *lock)
+{
+ CInode *in = (CInode*)lock->get_parent();
+ dout(10) << "scatter_gather_writebehind on " << *lock << " on " << *in << endl;
+ in->pop_and_dirty_projected_inode();
+ lock->clear_updated();
+ scatter_eval_gather(lock);
+}
+
+
+
// ==========================================================================
// local lock
void try_scatter_eval(ScatterLock *lock);
void scatter_eval(ScatterLock *lock); // public for MDCache::adjust_subtree_auth()
void scatter_eval_gather(ScatterLock *lock);
+
protected:
void handle_scatter_lock(ScatterLock *lock, MLock *m);
void scatter_sync(ScatterLock *lock);
bool scatter_wrlock_start(ScatterLock *lock, MDRequest *mdr);
void scatter_wrlock_finish(ScatterLock *lock, MDRequest *mdr);
+ class C_Locker_GatherWB : public Context {
+ Locker *locker;
+ ScatterLock *lock;
+ public:
+ C_Locker_GatherWB(Locker *l, ScatterLock *sl) : locker(l), lock(sl) {}
+ void finish(int r) {
+ locker->scatter_gather_writebehind(lock);
+ }
+ };
+ void scatter_gather_writebehind(ScatterLock *lock);
+
// local
protected:
bool local_wrlock_start(LocalLock *lock, MDRequest *mdr);
}
}
- void set_updated() { updated = true; }
- void clear_updated() { updated = false; }
+ void set_updated() {
+ if (!updated) {
+ parent->get(MDSCacheObject::PIN_DIRTYSCATTERED);
+ updated = true;
+ }
+ }
+ void clear_updated() {
+ if (updated) {
+ parent->put(MDSCacheObject::PIN_DIRTYSCATTERED);
+ updated = false;
+ }
+ }
bool is_updated() { return updated; }
void replicate_relax() {
}
void get_wrlock() {
assert(can_wrlock());
+ if (num_wrlock == 0) parent->get(MDSCacheObject::PIN_LOCK);
++num_wrlock;
}
void put_wrlock() {
--num_wrlock;
+ if (num_wrlock == 0) parent->put(MDSCacheObject::PIN_LOCK);
}
bool is_wrlocked() { return num_wrlock > 0; }
int get_num_wrlocks() { return num_wrlock; }
version_t dirpv = 0;
CInode *diri = dn->dir->inode;
- if (diri->is_auth() &&
- !diri->is_root() &&
- mdr->is_master()) {
- assert(mdr->wrlocks.count(&diri->dirlock));// || // either we wrlocked,
- //mdr->is_slave()); // or the master did.
+ if (diri->is_root()) return 0;
+ if (diri->is_auth()) {
+ assert(mdr->wrlocks.count(&diri->dirlock));
+
dirpv = diri->pre_dirty();
+ dout(10) << "predirty_dn_diri ctime/mtime " << mdr->now << " pv " << dirpv << " on " << *diri << endl;
+
+ // predirty+journal
inode_t *pi = diri->project_inode();
- pi->version = dirpv;
+ if (dirpv) pi->version = dirpv;
pi->ctime = pi->mtime = mdr->now;
blob->add_primary_dentry(diri->get_parent_dn(), true, 0, pi);
- dout(10) << "predirty_dn_diri ctime/mtime " << mdr->now << " pv " << dirpv << " on " << *diri << endl;
+ } else {
+ // journal the mtime change anyway.
+ inode_t *ji = blob->add_primary_dentry(diri->get_parent_dn(), true);
+ ji->ctime = ji->mtime = mdr->now;
+
+ blob->add_dirtied_inode_mtime(diri->ino(), mdr->now);
}
return dirpv;
{
CInode *diri = dn->dir->inode;
- // make the udpate
- diri->inode.ctime = diri->inode.mtime = mtime;
+ if (diri->is_root()) return;
if (dirpv) {
+ // we journaled and predirtied.
assert(diri->is_auth() && !diri->is_root());
-
- // we were before, too.
diri->pop_and_dirty_projected_inode();
- //diri->mark_dirty(dirpv);
dout(10) << "dirty_dn_diri ctime/mtime " << mtime << " v " << diri->inode.version << " on " << *diri << endl;
} else {
- /*assert(!dn->is_auth() || // slave
- !diri->is_auth() ||
- diri->is_root() ||
- diri->is_frozen()); // then not auth, or still importing.
- */
+ dout(10) << "dirty_dn_diri ctime/mtime " << mtime << " (non-dirty) on " << *diri << endl;
// dirlock scatterlock will propagate the update.
+ diri->inode.ctime = diri->inode.mtime = mtime;
+ diri->dirlock.set_updated();
}
-
- /* any writebehind should be handled by the lock gather probably?
- } else {
- // write-behind.
- if (!diri->is_dirty())
- dirty_diri_mtime_writebehind(diri, mtime);
- // otherwise, if it's dirty, we know the mtime is journaled by another local update.
- // (something after the import, or the import itself)
- }
- */
-}
-
-
-class C_MDS_DirtyDiriMtimeWB : public Context {
- Server *server;
- CInode *diri;
- version_t dirpv;
-public:
- C_MDS_DirtyDiriMtimeWB(Server *s, CInode *i, version_t v) :
- server(s), diri(i), dirpv(v) {}
- void finish(int r) {
- diri->mark_dirty(dirpv);
- diri->auth_unpin();
- }
-};
-
-void Server::dirty_diri_mtime_writebehind(CInode *diri, utime_t mtime)
-{
- if (!diri->can_auth_pin())
- return; // oh well! hrm.
-
- diri->auth_pin();
-
- // we're newly auth. write-behind.
- EUpdate *le = new EUpdate("dir.mtime writebehind");
- le->metablob.add_dir_context(diri->get_parent_dn()->get_dir());
- inode_t *pi = diri->project_inode();
- pi->version = diri->pre_dirty();
- le->metablob.add_primary_dentry(diri->get_parent_dn(), true, 0, pi);
-
- mds->mdlog->submit_entry(le);
- mds->mdlog->wait_for_sync(new C_MDS_DirtyDiriMtimeWB(this, diri, pi->version));
}
bool linkmerge = (srcdn->inode == destdn->inode &&
(srcdn->is_primary() || destdn->is_primary()));
- mdr->pvmap[destdn->dir->inode] = predirty_dn_diri(mdr, destdn, metablob);
- if (destdn->dir != srcdn->dir)
- mdr->pvmap[srcdn->dir->inode] = predirty_dn_diri(mdr, srcdn, metablob);
+ if (mdr->is_master()) {
+ mdr->pvmap[destdn->dir->inode] = predirty_dn_diri(mdr, destdn, metablob);
+ if (destdn->dir != srcdn->dir)
+ mdr->pvmap[srcdn->dir->inode] = predirty_dn_diri(mdr, srcdn, metablob);
+ }
inode_t *ji = 0; // journaled inode getting nlink--
version_t ipv; // it's version
(srcdn->is_primary() || destdn->is_primary()));
// dir mtimes
- dirty_dn_diri(destdn, mdr->pvmap[destdn->dir->inode], mdr->now);
- if (destdn->dir != srcdn->dir)
- dirty_dn_diri(srcdn, mdr->pvmap[srcdn->dir->inode], mdr->now);
-
+ if (mdr->is_master()) {
+ dirty_dn_diri(destdn, mdr->pvmap[destdn->dir->inode], mdr->now);
+ if (destdn->dir != srcdn->dir)
+ dirty_dn_diri(srcdn, mdr->pvmap[srcdn->dir->inode], mdr->now);
+ }
+
if (linkmerge) {
if (destdn->is_primary()) {
dout(10) << "merging remote onto primary link" << endl;
version_t predirty_dn_diri(MDRequest *mdr, CDentry *dn, class EMetaBlob *blob);
void dirty_dn_diri(CDentry *dn, version_t dirpv, utime_t mtime);
- void dirty_diri_mtime_writebehind(CInode *diri, utime_t mtime);
// requests on existing inodes.
// ref counting
bool is_rdlocked() { return num_rdlock > 0; }
- int get_rdlock() { return ++num_rdlock; }
+ int get_rdlock() {
+ if (!num_rdlock) parent->get(MDSCacheObject::PIN_LOCK);
+ return ++num_rdlock;
+ }
int put_rdlock() {
assert(num_rdlock>0);
- return --num_rdlock;
+ --num_rdlock;
+ if (num_rdlock == 0) parent->put(MDSCacheObject::PIN_LOCK);
+ return num_rdlock;
}
int get_num_rdlocks() { return num_rdlock; }
void get_xlock(MDRequest *who) {
assert(xlock_by == 0);
+ parent->get(MDSCacheObject::PIN_LOCK);
xlock_by = who;
}
void put_xlock() {
assert(xlock_by);
+ parent->put(MDSCacheObject::PIN_LOCK);
xlock_by = 0;
}
bool is_xlocked() { return xlock_by ? true:false; }
// anchor transactions included in this update.
list<version_t> atids;
+ // inode dirlocks (scatterlocks) i've touched.
+ map<inodeno_t, utime_t> dirty_inode_mtimes;
+
// ino's i've allocated
list<inodeno_t> allocated_inos;
version_t alloc_tablev;
atids.push_back(atid);
}
+ void add_dirtied_inode_mtime(inodeno_t ino, utime_t ctime) {
+ dirty_inode_mtimes[ino] = ctime;
+ }
+
void add_allocated_ino(inodeno_t ino, version_t tablev) {
allocated_inos.push_back(ino);
alloc_tablev = tablev;
lump_map[*i]._encode(bl);
}
::_encode(atids, bl);
+ ::_encode(dirty_inode_mtimes, bl);
::_encode(allocated_inos, bl);
if (!allocated_inos.empty())
::_encode(alloc_tablev, bl);
lump_map[dirfrag]._decode(bl, off);
}
::_decode(atids, bl, off);
+ ::_decode(dirty_inode_mtimes, bl, off);
::_decode(allocated_inos, bl, off);
if (!allocated_inos.empty())
::_decode(alloc_tablev, bl, off);
return false;
}
}
+
+ if (!dirty_inode_mtimes.empty())
+ for (map<inodeno_t,utime_t>::iterator p = dirty_inode_mtimes.begin();
+ p != dirty_inode_mtimes.end();
+ ++p) {
+ CInode *in = mds->mdcache->get_inode(p->first);
+ if (in) {
+ if (in->inode.ctime == p->second &&
+ in->dirlock.is_updated()) {
+ dout(10) << "EMetaBlob.has_expired dirty mtime dirlock hasn't flushed on " << *in << endl;
+ return false;
+ }
+ }
+ }
// allocated_ios
if (!allocated_inos.empty()) {
}
}
+ // dirtied inode mtimes
+ if (!dirty_inode_mtimes.empty())
+ for (map<inodeno_t,utime_t>::iterator p = dirty_inode_mtimes.begin();
+ p != dirty_inode_mtimes.end();
+ ++p) {
+ CInode *in = mds->mdcache->get_inode(p->first);
+ if (in) {
+ if (in->inode.ctime == p->second &&
+ in->dirlock.is_updated()) {
+ dout(10) << "EMetaBlob.expire dirty mtime dirlock hasn't flushed, waiting on "
+ << *in << endl;
+ in->dirlock.add_waiter(SimpleLock::WAIT_STABLE, gather->new_sub());
+ }
+ }
+ }
+
// allocated_inos
if (!allocated_inos.empty()) {
version_t cv = mds->idalloc->get_committed_version();
mds->anchorclient->got_journaled_agree(*p);
}
+ // dirtied inode mtimes
+ if (!dirty_inode_mtimes.empty())
+ for (map<inodeno_t,utime_t>::iterator p = dirty_inode_mtimes.begin();
+ p != dirty_inode_mtimes.end();
+ ++p) {
+ CInode *in = mds->mdcache->get_inode(p->first);
+ dout(10) << "EMetaBlob.replay setting dirlock updated flag on " << *in << endl;
+ in->dirlock.set_updated();
+ }
+
// allocated_inos
if (!allocated_inos.empty()) {
if (mds->idalloc->get_version() >= alloc_tablev) {
// -- pins --
const static int PIN_REPLICATED = 1000;
const static int PIN_DIRTY = 1001;
- const static int PIN_RDLOCK = -1002;
- const static int PIN_XLOCK = 1003;
- const static int PIN_REQUEST = -1004;
- const static int PIN_WAITER = 1005;
+ const static int PIN_LOCK = -1002;
+ const static int PIN_REQUEST = -1003;
+ const static int PIN_WAITER = 1004;
+ const static int PIN_DIRTYSCATTERED = 1005;
const char *generic_pin_name(int p) {
switch (p) {
case PIN_REPLICATED: return "replicated";
case PIN_DIRTY: return "dirty";
- case PIN_RDLOCK: return "rdlock";
- case PIN_XLOCK: return "xlock";
+ case PIN_LOCK: return "lock";
case PIN_REQUEST: return "request";
case PIN_WAITER: return "waiter";
+ case PIN_DIRTYSCATTERED: return "dirtyscattered";
default: assert(0);
}
}