for (map<int,int>::iterator it = lock->get_parent()->replicas_begin();
it != lock->get_parent()->replicas_end();
it++) {
+ if (mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
+ continue;
MLock *m = new MLock(lock, msg, mds->get_nodeid());
mds->send_message_mds(m, it->first, MDS_PORT_LOCKER);
}
for (map<int,int>::iterator it = lock->get_parent()->replicas_begin();
it != lock->get_parent()->replicas_end();
it++) {
+ if (mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
+ continue;
MLock *m = new MLock(lock, msg, mds->get_nodeid());
m->set_data(data);
mds->send_message_mds(m, it->first, MDS_PORT_LOCKER);
assert(!in->is_auth());
in->replica_caps_wanted = wanted;
- mds->send_message_mds(new MInodeFileCaps(in->ino(), mds->get_nodeid(),
- in->replica_caps_wanted),
- auth, MDS_PORT_LOCKER);
+
+ if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN)
+ mds->send_message_mds(new MInodeFileCaps(in->ino(), mds->get_nodeid(),
+ in->replica_caps_wanted),
+ auth, MDS_PORT_LOCKER);
} else {
in->replica_caps_wanted_keep_until.sec_ref() = 0;
}
void Locker::handle_inode_file_caps(MInodeFileCaps *m)
{
// nobody should be talking to us during recovery.
- assert(mds->is_active() || mds->is_stopping());
+ assert(mds->is_rejoin() || mds->is_active() || mds->is_stopping());
// ok
CInode *in = mdcache->get_inode(m->get_ino());
assert(in);
assert(in->is_auth());
+
+ if (mds->is_rejoin() &&
+ in->is_rejoining()) {
+ dout(7) << "handle_inode_file_caps still rejoining " << *in << ", dropping " << *m << endl;
+ delete m;
+ return;
+ }
+
dout(7) << "handle_inode_file_caps replica mds" << m->get_from() << " wants caps " << cap_string(m->get_caps()) << " on " << *in << endl;
void Locker::handle_lock(MLock *m)
{
// nobody should be talking to us during recovery.
- assert(mds->is_active() || mds->is_stopping());
+ assert(mds->is_rejoin() || mds->is_active() || mds->is_stopping());
switch (m->get_otype()) {
case LOCK_OTYPE_DN:
{
int from = m->get_asker();
+ if (mds->is_rejoin()) {
+ if (lock->get_parent()->is_rejoining()) {
+ dout(7) << "handle_simple_lock still rejoining " << *lock->get_parent()
+ << ", dropping " << *m << endl;
+ delete m;
+ return;
+ }
+ }
+
switch (m->get_action()) {
// -- replica --
case LOCK_AC_SYNC:
dout(7) << "handle_simple_lock has reader, waiting before ack on " << *lock
<< " on " << *lock->get_parent() << endl;
lock->set_state(LOCK_GLOCKR);
- lock->add_waiter(SimpleLock::WAIT_NOLOCKS, new C_MDS_RetryMessage(mds, m));
- return;
+ } else {
+ // update lock and reply
+ lock->set_state(LOCK_LOCK);
+ mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
+ from, MDS_PORT_LOCKER);
}
-
- // update lock and reply
- lock->set_state(LOCK_LOCK);
-
- mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
- from, MDS_PORT_LOCKER);
break;
MDRequest *mdr = mdcache->request_get(m->get_reqid());
mdr->xlocks.insert(lock);
mdr->locks.insert(lock);
+ lock->set_state(LOCK_REMOTEXLOCK);
lock->finish_waiters(SimpleLock::WAIT_REMOTEXLOCK);
}
break;
}
+class C_Locker_SimpleEval : public Context {
+ Locker *locker;
+ SimpleLock *lock;
+public:
+ C_Locker_SimpleEval(Locker *l, SimpleLock *lk) : locker(l), lock(lk) {}
+ void finish(int r) {
+ locker->simple_eval(lock);
+ }
+};
+
void Locker::simple_eval(SimpleLock *lock)
{
- // finished gather?
- if (lock->get_parent()->is_auth() &&
- !lock->is_stable() &&
- !lock->is_gathering()) {
+ // unstable and ambiguous auth?
+ if (!lock->is_stable() &&
+ lock->get_parent()->is_ambiguous_auth()) {
+ dout(7) << "simple_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << endl;
+ //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
+ lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_SimpleEval(this, lock));
+ return;
+ }
+
+ // finished remote xlock?
+ if (lock->get_state() == LOCK_REMOTEXLOCK &&
+ !lock->is_xlocked()) {
+ // tell auth
+ assert(!lock->get_parent()->is_auth()); // should be auth_pinned on the auth
+ dout(7) << "simple_eval releasing remote xlock on " << *lock->get_parent() << endl;
+ int auth = lock->get_parent()->authority().first;
+ if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN)
+ mds->send_message_mds(new MLock(lock, LOCK_AC_UNXLOCK, mds->get_nodeid()),
+ auth, MDS_PORT_LOCKER);
+ lock->set_state(LOCK_LOCK);
+ }
+
+ // finished gathering?
+ if (lock->get_state() == LOCK_GLOCKR &&
+ !lock->is_gathering() &&
+ !lock->is_rdlocked()) {
dout(7) << "simple_eval finished gather on " << *lock << " on " << *lock->get_parent() << endl;
- switch (lock->get_state()) {
- case LOCK_GLOCKR:
- lock->set_state(LOCK_LOCK);
- lock->finish_waiters(SimpleLock::WAIT_STABLE);
- break;
-
- default:
- assert(0);
+
+ // replica: tell auth
+ if (!lock->get_parent()->is_auth()) {
+ int auth = lock->get_parent()->authority().first;
+ if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN)
+ mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
+ lock->get_parent()->authority().first, MDS_PORT_LOCKER);
}
- }
- if (!lock->is_stable()) return;
-
- if (lock->get_parent()->is_auth()) {
- // sync?
- if (lock->get_state() != LOCK_SYNC &&
- lock->get_parent()->is_replicated() &&
- !lock->is_waiter_for(SimpleLock::WAIT_WR)) {
- dout(7) << "simple_eval stable, syncing " << *lock
- << " on " << *lock->get_parent() << endl;
- simple_sync(lock);
- }
+ lock->set_state(LOCK_LOCK);
+ lock->finish_waiters(SimpleLock::WAIT_STABLE);
+ }
- } else {
- // replica
+ // stable -> sync?
+ if (lock->get_parent()->is_auth() &&
+ lock->is_stable() &&
+ lock->get_state() != LOCK_SYNC &&
+ !lock->is_waiter_for(SimpleLock::WAIT_WR)) {
+ dout(7) << "simple_eval stable, syncing " << *lock
+ << " on " << *lock->get_parent() << endl;
+ simple_sync(lock);
}
}
if (lock->get_state() == LOCK_GLOCKR)
assert(0); // um... hmm!
assert(lock->get_state() == LOCK_LOCK);
-
- // hard data
- bufferlist data;
- lock->encode_locked_state(data);
-
- // bcast to replicas
- send_lock_message(lock, LOCK_AC_SYNC, data);
+
+ // sync.
+ if (lock->get_parent()->is_replicated()) {
+ // hard data
+ bufferlist data;
+ lock->encode_locked_state(data);
+
+ // bcast to replicas
+ send_lock_message(lock, LOCK_AC_SYNC, data);
+ }
// change lock
lock->set_state(LOCK_SYNC);
dout(7) << "simple_rdlock_finish on " << *lock << " on " << *lock->get_parent() << endl;
- if (lock->get_state() == LOCK_GLOCKR &&
- !lock->is_rdlocked()) {
- lock->set_state(LOCK_SYNC); // return state to sync, in case the unpinner flails
- lock->finish_waiters(SimpleLock::WAIT_NOLOCKS);
- }
+ // last one?
+ if (!lock->is_rdlocked())
+ simple_eval(lock);
}
bool Locker::simple_xlock_start(SimpleLock *lock, MDRequest *mdr)
new C_MDS_RetryRequest(mdcache, mdr));
return false;
}
+ int auth = lock->get_parent()->authority().first;
// wait for sync.
// (???????????)
}
// send lock request
- int auth = lock->get_parent()->authority().first;
MLock *m = new MLock(lock, LOCK_AC_REQXLOCK, mds->get_nodeid());
mds->send_message_mds(m, auth, MDS_PORT_LOCKER);
// drop ref
assert(lock->can_xlock(mdr));
lock->put_xlock();
+ assert(mdr);
mdr->xlocks.erase(lock);
mdr->locks.erase(lock);
dout(7) << "simple_xlock_finish on " << *lock << " on " << *lock->get_parent() << endl;
- // slave?
- if (!lock->get_parent()->is_auth()) {
- mds->send_message_mds(new MLock(lock, LOCK_AC_UNXLOCK, mds->get_nodeid()),
- lock->get_parent()->authority().first, MDS_PORT_LOCKER);
- }
-
// others waiting?
- if (lock->is_waiter_for(SimpleLock::WAIT_WR)) {
- // wake 'em up
- lock->finish_waiters(SimpleLock::WAIT_WR, 0);
- } else {
- // auto-sync if alone.
- if (lock->get_parent()->is_auth() &&
- !lock->get_parent()->is_replicated() &&
- lock->get_state() != LOCK_SYNC)
- lock->set_state(LOCK_SYNC);
-
- simple_eval(lock);
- }
+ lock->finish_waiters(SimpleLock::WAIT_WR, 0);
+
+ // eval
+ simple_eval(lock);
}
scatter_eval(lock);
}
+
+class C_Locker_ScatterEval : public Context {
+ Locker *locker;
+ ScatterLock *lock;
+public:
+ C_Locker_ScatterEval(Locker *l, ScatterLock *lk) : locker(l), lock(lk) {}
+ void finish(int r) {
+ locker->scatter_eval(lock);
+ }
+};
+
void Locker::scatter_eval(ScatterLock *lock)
{
+ // unstable and ambiguous auth?
+ if (!lock->is_stable() &&
+ lock->get_parent()->is_ambiguous_auth()) {
+ dout(7) << "scatter_eval not stable and ambiguous auth, waiting on " << *lock->get_parent() << endl;
+ //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
+ lock->get_parent()->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_Locker_ScatterEval(this, lock));
+ return;
+ }
+
if (!lock->get_parent()->is_auth()) {
// REPLICA
if (lock->get_state() == LOCK_GSYNCS &&
!lock->is_wrlocked()) {
dout(10) << "scatter_eval no wrlocks, acking sync" << endl;
- bufferlist data;
- lock->encode_locked_state(data);
- mds->send_message_mds(new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid(), data),
- lock->get_parent()->authority().first, MDS_PORT_LOCKER);
+ int auth = lock->get_parent()->authority().first;
+ if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
+ bufferlist data;
+ lock->encode_locked_state(data);
+ mds->send_message_mds(new MLock(lock, LOCK_AC_SYNCACK, mds->get_nodeid(), data),
+ auth, MDS_PORT_LOCKER);
+ }
lock->set_state(LOCK_SYNC);
+ lock->finish_waiters(ScatterLock::WAIT_STABLE); // ?
}
} else {
dout(7) << "scatter_eval finished gather/un-wrlock on " << *lock
<< " on " << *lock->get_parent() << endl;
lock->set_state(LOCK_SYNC);
- lock->finish_waiters(SimpleLock::WAIT_STABLE|SimpleLock::WAIT_RD|SimpleLock::WAIT_NOLOCKS);
+ lock->finish_waiters(ScatterLock::WAIT_STABLE|ScatterLock::WAIT_RD);
}
// gscatters -> scatter?
}
lock->set_state(LOCK_SCATTER);
- lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
+ lock->get_wrlock();
+ lock->finish_waiters(ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
+ lock->put_wrlock();
}
// waiting for rd?
if (lock->get_state() == LOCK_SCATTER &&
!lock->is_wrlocked() &&
- lock->is_waiter_for(SimpleLock::WAIT_RD)) {
+ lock->is_waiter_for(ScatterLock::WAIT_RD)) {
dout(10) << "scatter_eval no wrlocks, read waiter, syncing" << endl;
scatter_sync(lock);
}
}
else if (lock->is_wrlocked()) {
lock->set_state(LOCK_GSYNCS);
- } else {
+ }
+ else {
lock->set_state(LOCK_SYNC);
- lock->finish_waiters(SimpleLock::WAIT_RD|SimpleLock::WAIT_STABLE);
+ lock->finish_waiters(ScatterLock::WAIT_RD|ScatterLock::WAIT_STABLE);
}
}
send_lock_message(lock, LOCK_AC_SCATTER, data);
}
lock->set_state(LOCK_SCATTER);
- lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
+ lock->finish_waiters(ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
}
}
{
int from = m->get_asker();
+ if (mds->is_rejoin()) {
+ if (lock->get_parent()->is_rejoining()) {
+ dout(7) << "handle_scatter_lock still rejoining " << *lock->get_parent()
+ << ", dropping " << *m << endl;
+ delete m;
+ return;
+ }
+ }
+
switch (m->get_action()) {
// -- replica --
case LOCK_AC_SYNC:
assert(lock->get_state() == LOCK_SYNC);
lock->decode_locked_state(m->get_data());
lock->set_state(LOCK_SCATTER);
- lock->finish_waiters(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE);
+ lock->finish_waiters(ScatterLock::WAIT_WR|ScatterLock::WAIT_STABLE);
break;
// -- for auth --
dout(7) << "handle_scatter_lock " << *lock << " on " << *lock->get_parent()
<< " from " << from << ", last one"
<< endl;
- simple_eval(lock);
+ scatter_eval(lock);
}
break;
}
dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << endl;
- if (!lock->is_rdlocked()) {
- lock->finish_waiters(SimpleLock::WAIT_NOLOCKS);
+ if (!lock->is_rdlocked())
file_eval(lock);
- }
}
dout(7) << "file_xlock_finish on " << *lock << " on " << *lock->get_parent() << endl;
assert(lock->get_parent()->is_auth()); // or implement remote xlocks
-
- // drop lock?
- if (!lock->is_waiter_for(SimpleLock::WAIT_STABLE))
- file_eval(lock);
+
+ // others waiting?
+ lock->finish_waiters(SimpleLock::WAIT_WR, 0);
+
+ //// drop lock?
+ //if (!lock->is_waiter_for(SimpleLock::WAIT_STABLE))
+ file_eval(lock);
}
* - checks if we're in unstable sfot state and can now move on to next state
* - checks if soft state should change (eg bc last writer closed)
*/
+class C_Locker_FileEval : public Context {
+ Locker *locker;
+ FileLock *lock;
+public:
+ C_Locker_FileEval(Locker *l, FileLock *lk) : locker(l), lock(lk) {}
+ void finish(int r) {
+ locker->file_eval(lock);
+ }
+};
+
void Locker::file_eval(FileLock *lock)
{
CInode *in = (CInode*)lock->get_parent();
+ // unstable and ambiguous auth?
+ if (!lock->is_stable() &&
+ in->is_ambiguous_auth()) {
+ dout(7) << "file_eval not stable and ambiguous auth, waiting on " << *in << endl;
+ //if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
+ in->add_waiter(CInode::WAIT_SINGLEAUTH, new C_Locker_FileEval(this, lock));
+ return;
+ }
+
+
int issued = in->get_caps_issued();
// [auth] finished gather?
CInode *in = (CInode*)lock->get_parent();
int from = m->get_asker();
+ if (mds->is_rejoin()) {
+ if (in->is_rejoining()) {
+ dout(7) << "handle_file_lock still rejoining " << *in
+ << ", dropping " << *m << endl;
+ delete m;
+ return;
+ }
+ }
+
+
dout(7) << "handle_file_lock a=" << m->get_action() << " from " << from << " "
<< *in << " filelock=" << *lock << endl;
}
if (lock->is_rdlocked()) {
dout(7) << "handle_file_lock rdlocked, waiting before ack on " << *in << endl;
- in->add_waiter(SimpleLock::WAIT_NOLOCKS, new C_MDS_RetryMessage(mds, m));
lock->set_state(LOCK_GLOCKR);
- assert(0);// i am broken.. why retry message when state captures all the info i need?
- return;
+ break;
}
if (issued & CAP_FILE_RD) {
+ dout(7) << "handle_file_lock RD cap issued, waiting before ack on " << *in << endl;
lock->set_state(LOCK_GLOCKR);
break;
}
-
+
// nothing to wait for, lock and ack.
{
lock->set_state(LOCK_LOCK);