bool Locker::acquire_locks(MDRequestRef& mdr,
MutationImpl::LockOpVec& lov,
CInode *auth_pin_freeze,
- bool auth_pin_nonblock)
+ bool auth_pin_nonblocking)
{
if (mdr->done_locking &&
!mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
client_t client = mdr->get_client();
set<MDSCacheObject*> mustpin; // items to authpin
+ if (auth_pin_freeze)
+ mustpin.insert(auth_pin_freeze);
// xlocks
- for (int i = 0, size = lov.size(); i < size; ++i) {
+ for (size_t i = 0; i < lov.size(); ++i) {
auto& p = lov[i];
SimpleLock *lock = p.lock;
MDSCacheObject *object = lock->get_parent();
// get processed in proper order.
bool wait = false;
if (object->is_auth()) {
- if (!mdr->locks.count(lock)) {
+ if (!mdr->is_xlocked(lock)) {
set<mds_rank_t> ls;
object->list_replicas(ls);
for (auto m : ls) {
} else {
// if the lock is the latest locked one, it's possible that slave mds got the lock
// while there are recovering mds.
- if (!mdr->locks.count(lock) || lock == mdr->locks.rbegin()->lock)
+ if (!mdr->is_xlocked(lock) || mdr->is_last_locked(lock))
wait = true;
}
if (wait) {
continue;
if (mdr->is_master()) {
// master. wrlock versionlock so we can pipeline dentry updates to journal.
- lov.add_wrlock(&dn->versionlock);
+ lov.add_wrlock(&dn->versionlock, i + 1);
} else {
// slave. exclusively lock the dentry version (i.e. block other journal updates).
// this makes rollback safe.
- lov.add_xlock(&dn->versionlock);
+ lov.add_xlock(&dn->versionlock, i + 1);
}
}
if (lock->get_type() > CEPH_LOCK_IVERSION) {
continue;
if (mdr->is_master()) {
// master. wrlock versionlock so we can pipeline inode updates to journal.
- lov.add_wrlock(&in->versionlock);
+ lov.add_wrlock(&in->versionlock, i + 1);
} else {
// slave. exclusively lock the inode version (i.e. block other journal updates).
// this makes rollback safe.
- lov.add_xlock(&in->versionlock);
+ lov.add_xlock(&in->versionlock, i + 1);
}
}
} else if (p.is_wrlock()) {
}
if (!object->is_auth()) {
- if (!mdr->locks.empty())
- drop_locks(mdr.get());
if (object->is_ambiguous_auth()) {
// wait
- marker.message = "waiting for single auth, object is being migrated";
dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
- object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
+ drop_locks(mdr.get());
mdr->drop_local_auth_pins();
+ marker.message = "waiting for single auth, object is being migrated";
+ object->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_MDS_RetryRequest(mdcache, mdr));
return false;
}
mustpin_remote[object->authority().first].insert(object);
// wait
drop_locks(mdr.get());
mdr->drop_local_auth_pins();
- if (auth_pin_nonblock) {
+ if (auth_pin_nonblocking) {
dout(10) << " can't auth_pin (freezing?) " << *object << ", nonblocking" << dendl;
mdr->aborted = true;
return false;
for (const auto& p : mdr->object_states) {
if (p.second.remote_auth_pinned == MDS_RANK_NONE)
continue;
- if (mustpin.count(p.first)) {
- ceph_assert(p.second.remote_auth_pinned == p.first->authority().first);
- auto q = mustpin_remote.find(p.second.remote_auth_pinned);
- if (q != mustpin_remote.end())
- q->second.insert(p.first);
- }
+ ceph_assert(p.second.remote_auth_pinned == p.first->authority().first);
+ auto q = mustpin_remote.find(p.second.remote_auth_pinned);
+ if (q != mustpin_remote.end())
+ q->second.insert(p.first);
}
- for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
- p != mustpin_remote.end();
- ++p) {
- dout(10) << "requesting remote auth_pins from mds." << p->first << dendl;
+
+ for (auto& p : mustpin_remote) {
+ dout(10) << "requesting remote auth_pins from mds." << p.first << dendl;
// wait for active auth
if (mds->is_cluster_degraded() &&
- !mds->mdsmap->is_clientreplay_or_active_or_stopping(p->first)) {
- dout(10) << " mds." << p->first << " is not active" << dendl;
+ !mds->mdsmap->is_clientreplay_or_active_or_stopping(p.first)) {
+ dout(10) << " mds." << p.first << " is not active" << dendl;
if (mdr->more()->waiting_on_slave.empty())
- mds->wait_for_active_peer(p->first, new C_MDS_RetryRequest(mdcache, mdr));
+ mds->wait_for_active_peer(p.first, new C_MDS_RetryRequest(mdcache, mdr));
return false;
}
auto req = make_message<MMDSSlaveRequest>(mdr->reqid, mdr->attempt,
MMDSSlaveRequest::OP_AUTHPIN);
- for (set<MDSCacheObject*>::iterator q = p->second.begin();
- q != p->second.end();
- ++q) {
- dout(10) << " req remote auth_pin of " << **q << dendl;
+ for (auto& o : p.second) {
+ dout(10) << " req remote auth_pin of " << *o << dendl;
MDSCacheObjectInfo info;
- (*q)->set_object_info(info);
+ o->set_object_info(info);
req->get_authpins().push_back(info);
- if (*q == auth_pin_freeze)
- (*q)->set_object_info(req->get_authpin_freeze());
- mdr->pin(*q);
+ if (o == auth_pin_freeze)
+ o->set_object_info(req->get_authpin_freeze());
+ mdr->pin(o);
}
- if (auth_pin_nonblock)
- req->mark_nonblock();
- mds->send_message_mds(req, p->first);
+ if (auth_pin_nonblocking)
+ req->mark_nonblocking();
+ else if (!mdr->locks.empty())
+ req->mark_notify_blocking();
+
+ mds->send_message_mds(req, p.first);
// put in waiting list
- ceph_assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
- mdr->more()->waiting_on_slave.insert(p->first);
+ auto ret = mdr->more()->waiting_on_slave.insert(p.first);
+ ceph_assert(ret.second);
}
return false;
}
// acquire locks.
// make sure they match currently acquired locks.
- auto existing = mdr->locks.begin();
for (const auto& p : lov) {
auto lock = p.lock;
-
- bool need_wrlock = p.is_wrlock();
- bool need_remote_wrlock = p.is_remote_wrlock();
-
- // already locked?
- if (existing != mdr->locks.end() && existing->lock == lock) {
- // right kind?
- auto it = existing++;
- auto have = *it; // don't reference
-
- if (have.is_xlock() && p.is_xlock()) {
+ if (p.is_xlock()) {
+ if (mdr->is_xlocked(lock)) {
dout(10) << " already xlocked " << *lock << " " << *lock->get_parent() << dendl;
continue;
}
-
- if (have.is_remote_wrlock() &&
- (!need_remote_wrlock || have.wrlock_target != p.wrlock_target)) {
- dout(10) << " unlocking remote_wrlock on wrong mds." << have.wrlock_target
- << " " << *lock << " " << *lock->get_parent() << dendl;
- remote_wrlock_finish(it, mdr.get());
- have.clear_remote_wrlock();
- }
-
- if (need_wrlock || need_remote_wrlock) {
- if (need_wrlock == have.is_wrlock() &&
- need_remote_wrlock == have.is_remote_wrlock()) {
- if (need_wrlock)
- dout(10) << " already wrlocked " << *lock << " " << *lock->get_parent() << dendl;
- if (need_remote_wrlock)
- dout(10) << " already remote_wrlocked " << *lock << " " << *lock->get_parent() << dendl;
- continue;
- }
-
- if (have.is_wrlock()) {
- if (!need_wrlock)
- dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
- else if (need_remote_wrlock) // acquire remote_wrlock first
- dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
- bool need_issue = false;
- wrlock_finish(it, mdr.get(), &need_issue);
- if (need_issue)
- issue_set.insert(static_cast<CInode*>(lock->get_parent()));
- }
- } else if (have.is_rdlock() && p.is_rdlock()) {
- dout(10) << " already rdlocked " << *lock << " " << *lock->get_parent() << dendl;
- continue;
- }
- }
-
- // hose any stray locks
- while (existing != mdr->locks.end()) {
- auto it = existing++;
- auto stray = *it; // don't reference
- dout(10) << " unlocking out-of-order " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
- bool need_issue = false;
- if (stray.is_xlock()) {
- xlock_finish(it, mdr.get(), &need_issue);
- } else if (stray.is_rdlock()) {
- rdlock_finish(it, mdr.get(), &need_issue);
- } else {
- // may have acquired both wrlock and remore wrlock
- if (stray.is_wrlock())
- wrlock_finish(it, mdr.get(), &need_issue);
- if (stray.is_remote_wrlock())
- remote_wrlock_finish(it, mdr.get());
- }
- if (need_issue)
- issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
- }
-
- // lock
- if (mdr->locking && lock != mdr->locking) {
- cancel_locking(mdr.get(), &issue_set);
- }
- if (p.is_xlock()) {
+ if (mdr->locking && lock != mdr->locking)
+ cancel_locking(mdr.get(), &issue_set);
if (!xlock_start(lock, mdr)) {
marker.message = "failed to xlock, waiting";
goto out;
}
dout(10) << " got xlock on " << *lock << " " << *lock->get_parent() << dendl;
- } else if (need_wrlock || need_remote_wrlock) {
- if (need_remote_wrlock && !mdr->is_remote_wrlocked(lock)) {
- marker.message = "waiting for remote wrlocks";
- remote_wrlock_start(lock, p.wrlock_target, mdr);
- goto out;
+ } else if (p.is_wrlock() || p.is_remote_wrlock()) {
+ auto it = mdr->locks.find(lock);
+ if (p.is_remote_wrlock()) {
+ if (it != mdr->locks.end() && it->is_remote_wrlock()) {
+ dout(10) << " already remote_wrlocked " << *lock << " " << *lock->get_parent() << dendl;
+ } else {
+ if (mdr->locking && lock != mdr->locking)
+ cancel_locking(mdr.get(), &issue_set);
+ marker.message = "waiting for remote wrlocks";
+ remote_wrlock_start(lock, p.wrlock_target, mdr);
+ goto out;
+ }
}
- if (need_wrlock) {
+ if (p.is_wrlock()) {
+ if (it != mdr->locks.end() && it->is_wrlock()) {
+ dout(10) << " already wrlocked " << *lock << " " << *lock->get_parent() << dendl;
+ continue;
+ }
client_t _client = p.is_state_pin() ? lock->get_excl_client() : client;
- if (need_remote_wrlock && !lock->can_wrlock(_client)) {
+ if (p.is_remote_wrlock() && !lock->can_wrlock(_client)) {
marker.message = "failed to wrlock, dropping remote wrlock and waiting";
// can't take the wrlock because the scatter lock is gathering. need to
// release the remote wrlock, so that the gathering process can finish.
- auto it = mdr->locks.end();
- ++it;
+ ceph_assert(it != mdr->locks.end());
remote_wrlock_finish(it, mdr.get());
remote_wrlock_start(lock, p.wrlock_target, mdr);
goto out;
}
// nowait if we have already gotten remote wrlock
if (!wrlock_start(lock, mdr)) {
- ceph_assert(!need_remote_wrlock);
+ ceph_assert(!p.is_remote_wrlock());
marker.message = "failed to wrlock, waiting";
goto out;
}
dout(10) << " got wrlock on " << *lock << " " << *lock->get_parent() << dendl;
}
} else {
+ if (mdr->is_rdlocked(lock)) {
+ dout(10) << " already rdlocked " << *lock << " " << *lock->get_parent() << dendl;
+ continue;
+ }
+
ceph_assert(mdr->is_master());
if (lock->needs_recover()) {
if (mds->is_cluster_degraded()) {
dout(10) << " got rdlock on " << *lock << " " << *lock->get_parent() << dendl;
}
}
-
- // any extra unneeded locks?
- while (existing != mdr->locks.end()) {
- auto it = existing++;
- auto stray = *it;
- dout(10) << " unlocking extra " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
- bool need_issue = false;
- if (stray.is_xlock()) {
- xlock_finish(it, mdr.get(), &need_issue);
- } else if (stray.is_rdlock()) {
- rdlock_finish(it, mdr.get(), &need_issue);
- } else {
- // may have acquired both wrlock and remore wrlock
- if (stray.is_wrlock())
- wrlock_finish(it, mdr.get(), &need_issue);
- if (stray.is_remote_wrlock())
- remote_wrlock_finish(it, mdr.get());
- }
- if (need_issue)
- issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
- }
mdr->done_locking = true;
mdr->set_mds_stamp(ceph_clock_now());
// can read? grab ref.
if (lock->can_rdlock(client)) {
lock->get_rdlock();
- mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::RDLOCK);
+ mut->emplace_lock(lock, MutationImpl::LockOp::RDLOCK);
return true;
}
for (const auto& p : lov) {
ceph_assert(p.is_rdlock());
p.lock->get_rdlock();
- mut->locks.emplace(p.lock, MutationImpl::LockOp::RDLOCK);
+ mut->emplace_lock(p.lock, MutationImpl::LockOp::RDLOCK);
}
}
dout(7) << "wrlock_force on " << *lock
<< " on " << *lock->get_parent() << dendl;
lock->get_wrlock(true);
- mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
+ mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
}
bool Locker::wrlock_try(SimpleLock *lock, MutationRef& mut)
while (1) {
if (lock->can_wrlock(mut->get_client())) {
lock->get_wrlock();
- mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
+ mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
return true;
}
if (!lock->is_stable())
if (lock->can_wrlock(client) &&
(!want_scatter || lock->get_state() == LOCK_MIX)) {
lock->get_wrlock();
- auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+ auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
return true;
}
in && in->issued_caps_need_gather(lock))) { // xlocker does not hold shared cap
lock->set_state(LOCK_XLOCK);
lock->get_xlock(mut, client);
- mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
+ mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
mut->finish_locking(lock);
return true;
}
// forcefully take a wrlock
lock->get_wrlock(true);
- mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
+ mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
in->pre_cow_old_inode(); // avoid cow mayhem
ceph_assert(lock->can_wrlock());
lock->get_wrlock(mut->get_client());
- auto ret = mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
- ceph_assert(ret.second);
+ auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
+ ceph_assert(it->is_wrlock());
}
bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
ceph_assert(lock->get_parent()->is_auth());
if (lock->can_wrlock()) {
lock->get_wrlock(mut->get_client());
- auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+ auto it = mut->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
ceph_assert(it->is_wrlock());
return true;
} else {
}
lock->get_xlock(mut, mut->get_client());
- mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
+ mut->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
return true;
}
bool acquire_locks(MDRequestRef& mdr,
MutationImpl::LockOpVec& lov,
CInode *auth_pin_freeze=NULL,
- bool auth_pin_nonblock=false);
+ bool auth_pin_nonblocking=false);
void notify_freeze_waiter(MDSCacheObject *o);
void cancel_locking(MutationImpl *mut, std::set<CInode*> *pneed_issue);
if (!mdr->is_xlocked(&dn->versionlock)) {
ceph_assert(dn->versionlock.can_xlock_local());
dn->versionlock.get_xlock(mdr, mdr->get_client());
- mdr->locks.emplace(&dn->versionlock, MutationImpl::LockOp::XLOCK);
+ mdr->emplace_lock(&dn->versionlock, MutationImpl::LockOp::XLOCK);
}
if (dn->lock.is_stable())
dn->auth_pin(&dn->lock);
dn->lock.set_state(LOCK_XLOCK);
dn->lock.get_xlock(mdr, mdr->get_client());
- mdr->locks.emplace(&dn->lock, MutationImpl::LockOp::XLOCK);
+ mdr->emplace_lock(&dn->lock, MutationImpl::LockOp::XLOCK);
}
}
if (!mdr->is_xlocked(&in->versionlock)) {
ceph_assert(in->versionlock.can_xlock_local());
in->versionlock.get_xlock(mdr, mdr->get_client());
- mdr->locks.emplace(&in->versionlock, MutationImpl::LockOp::XLOCK);
+ mdr->emplace_lock(&in->versionlock, MutationImpl::LockOp::XLOCK);
}
if (lock->is_stable())
in->auth_pin(lock);
if (lock == &in->filelock)
in->loner_cap = -1;
lock->get_xlock(mdr, mdr->get_client());
- mdr->locks.emplace(lock, MutationImpl::LockOp::XLOCK);
+ mdr->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
}
}
}
if (lock == &in->filelock)
in->loner_cap = -1;
lock->get_wrlock(true);
- mdr->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
+ mdr->emplace_lock(lock, MutationImpl::LockOp::WRLOCK);
}
}
}
dout(10) << "dispatch_fragment_dir " << basedirfrag << " bits " << info.bits
<< " on " << *diri << dendl;
+
+ if (mdr->more()->slave_error)
+ mdr->aborted = true;
+
if (!mdr->aborted) {
MutationImpl::LockOpVec lov;
lov.add_wrlock(&diri->dirfragtreelock);
// prevent a racing gather on any other scatterlocks too
lov.lock_scatter_gather(&diri->nestlock);
lov.lock_scatter_gather(&diri->filelock);
- if (!mds->locker->acquire_locks(mdr, lov, NULL, true))
+ if (!mds->locker->acquire_locks(mdr, lov, NULL, true)) {
if (!mdr->aborted)
return;
+ }
}
if (mdr->aborted) {
}
ceph_assert(it->second.state == EXPORT_LOCKING);
- mds_rank_t dest = it->second.peer;
+ if (mdr->more()->slave_error || dir->is_frozen() || dir->is_freezing()) {
+ dout(7) << "wouldblock|freezing|frozen, canceling export" << dendl;
+ export_try_cancel(dir);
+ return;
+ }
+ mds_rank_t dest = it->second.peer;
if (!mds->is_export_target(dest)) {
dout(7) << "dest is not yet an export target" << dendl;
if (count > 3) {
return;
}
- if (mdr->aborted || dir->is_frozen() || dir->is_freezing()) {
- dout(7) << "wouldblock|freezing|frozen, canceling export" << dendl;
- export_try_cancel(dir);
- return;
- }
-
// locks?
MutationImpl::LockOpVec lov;
get_export_lock_set(dir, lov);
}
void MutationImpl::LockOpVec::sort_and_merge()
{
- std::sort(begin(), end());
+ // sort locks on the same object
+ auto cmp = [](const LockOp &l, const LockOp &r) {
+ ceph_assert(l.lock->get_parent() == r.lock->get_parent());
+ return l.lock->type->type < r.lock->type->type;
+ };
+ for (auto i = begin(), j = i; ; ++i) {
+ if (i == end()) {
+ std::sort(j, i, cmp);
+ break;
+ }
+ if (j->lock->get_parent() != i->lock->get_parent()) {
+ std::sort(j, i, cmp);
+ j = i;
+ }
+ }
// merge ops on the same lock
for (auto i = end() - 1; i > begin(); ) {
auto j = i;
if (j->is_xlock()) {
// xlock overwrites other types
ceph_assert(!j->is_remote_wrlock());
- j->flags = MutationImpl::LockOp::XLOCK;
+ j->flags = LockOp::XLOCK;
}
erase(j + 1, i + 1);
i = j - 1;
wrlock_target = MDS_RANK_NONE;
}
bool is_state_pin() const { return !!(flags & STATE_PIN); }
-
bool operator<(const LockOp& r) const {
- if ((lock->type->type <= CEPH_LOCK_DN) && (r.lock->type->type > CEPH_LOCK_DN))
- return true;
- if ((lock->type->type > CEPH_LOCK_DN) == (r.lock->type->type > CEPH_LOCK_DN)) {
- auto lp = lock->get_parent();
- auto rp = r.lock->get_parent();
- // then sort by object
- if (lp == rp)
- return (lock->type->type < r.lock->type->type);
- return lp->is_lt(rp);
- }
- return false;
+ return lock < r.lock;
}
};
emplace_back(lock, LockOp::RDLOCK);
}
void erase_rdlock(SimpleLock *lock);
- void add_xlock(SimpleLock *lock) {
- emplace_back(lock, LockOp::XLOCK);
+ void add_xlock(SimpleLock *lock, int idx=-1) {
+ if (idx >= 0)
+ emplace(cbegin() + idx, lock, LockOp::XLOCK);
+ else
+ emplace_back(lock, LockOp::XLOCK);
}
- void add_wrlock(SimpleLock *lock) {
- emplace_back(lock, LockOp::WRLOCK);
+ void add_wrlock(SimpleLock *lock, int idx=-1) {
+ if (idx >= 0)
+ emplace(cbegin() + idx, lock, LockOp::WRLOCK);
+ else
+ emplace_back(lock, LockOp::WRLOCK);
}
void add_remote_wrlock(SimpleLock *lock, mds_rank_t rank) {
ceph_assert(rank != MDS_RANK_NONE);
reserve(32);
}
};
- typedef set<LockOp> lock_set;
- typedef lock_set::iterator lock_iterator;
+ using lock_set = set<LockOp>;
+ using lock_iterator = lock_set::iterator;
lock_set locks; // full ordering
+ lock_iterator emplace_lock(SimpleLock *l, unsigned f=0, mds_rank_t t=MDS_RANK_NONE) {
+ last_locked = l;
+ return locks.emplace(l, f, t).first;
+ }
+
bool is_rdlocked(SimpleLock *lock) const {
auto it = locks.find(lock);
return it != locks.end() && it->is_rdlock();
auto it = locks.find(lock);
return it != locks.end() && it->is_remote_wrlock();
}
+ bool is_last_locked(SimpleLock *lock) const {
+ return lock == last_locked;
+ }
+ SimpleLock *last_locked = nullptr;
// lock we are currently trying to acquire. if we give up for some reason,
// be sure to eval() this.
SimpleLock *locking = nullptr;
dout(7) << "dispatch_client_request " << *req << dendl;
- if (req->may_write()) {
- if (mdcache->is_readonly()) {
- dout(10) << " read-only FS" << dendl;
- respond_to_request(mdr, -EROFS);
- return;
- }
- if (mdr->has_more() && mdr->more()->slave_error) {
- dout(10) << " got error from slaves" << dendl;
- respond_to_request(mdr, mdr->more()->slave_error);
- return;
- }
+ if (mdcache->is_readonly() ||
+ (mdr->has_more() && mdr->more()->slave_error == -EROFS)) {
+ dout(10) << " read-only FS" << dendl;
+ respond_to_request(mdr, -EROFS);
+ return;
}
if (is_full) {
m->straybl.clear();
}
+ if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
+ dout(3) << "not clientreplay|active yet, waiting" << dendl;
+ mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
+ return;
+ }
+
// am i a new slave?
MDRequestRef mdr;
if (mdcache->have_request(m->get_reqid())) {
return;
}
-
if (mdr->attempt < m->get_attempt()) {
// mine is old, close it out
dout(10) << "local request " << *mdr << " attempt " << mdr->attempt << " < " << m->get_attempt()
return;
}
- if (m->get_op() == MMDSSlaveRequest::OP_FINISH && m->is_abort()) {
- mdr->aborted = true;
- if (mdr->slave_request) {
- // only abort on-going xlock, wrlock and auth pin
- ceph_assert(!mdr->slave_did_prepare());
+ // may get these requests while mdr->slave_request is non-null
+ if (m->get_op() == MMDSSlaveRequest::OP_DROPLOCKS) {
+ mds->locker->drop_locks(mdr.get());
+ return;
+ }
+ if (m->get_op() == MMDSSlaveRequest::OP_FINISH) {
+ if (m->is_abort()) {
+ mdr->aborted = true;
+ if (mdr->slave_request) {
+ // only abort on-going xlock, wrlock and auth pin
+ ceph_assert(!mdr->slave_did_prepare());
+ } else {
+ mdcache->request_finish(mdr);
+ }
} else {
+ if (m->inode_export.length() > 0)
+ mdr->more()->inode_import = m->inode_export;
+ // finish off request.
mdcache->request_finish(mdr);
}
return;
mdr->straydn = straydn;
}
- if (!mds->is_clientreplay() && !mds->is_active() && !mds->is_stopping()) {
- dout(3) << "not clientreplay|active yet, waiting" << dendl;
- mds->wait_for_replay(new C_MDS_RetryMessage(mds, m));
- return;
- } else if (mds->is_clientreplay() && !mds->mdsmap->is_clientreplay(from) &&
- mdr->locks.empty()) {
+ if (mds->is_clientreplay() && !mds->mdsmap->is_clientreplay(from) &&
+ mdr->locks.empty()) {
dout(3) << "not active yet, waiting" << dendl;
mds->wait_for_active(new C_MDS_RetryMessage(mds, m));
return;
mdr->more()->slaves.insert(from);
lock->decode_locked_state(m->get_lock_data());
dout(10) << "got remote xlock on " << *lock << " on " << *lock->get_parent() << dendl;
- mdr->locks.emplace_hint(mdr->locks.end(), lock, MutationImpl::LockOp::XLOCK);
+ mdr->emplace_lock(lock, MutationImpl::LockOp::XLOCK);
mdr->finish_locking(lock);
lock->get_xlock(mdr, mdr->get_client());
m->get_object_info());
mdr->more()->slaves.insert(from);
dout(10) << "got remote wrlock on " << *lock << " on " << *lock->get_parent() << dendl;
- auto it = mdr->locks.emplace_hint(mdr->locks.end(),
- lock, MutationImpl::LockOp::REMOTE_WRLOCK, from);
+ auto it = mdr->emplace_lock(lock, MutationImpl::LockOp::REMOTE_WRLOCK, from);
ceph_assert(it->is_remote_wrlock());
ceph_assert(it->wrlock_target == from);
}
break;
- case MMDSSlaveRequest::OP_DROPLOCKS:
- mds->locker->drop_locks(mdr.get());
- mdr->reset_slave_request();
- break;
-
case MMDSSlaveRequest::OP_AUTHPIN:
handle_slave_auth_pin(mdr);
break;
handle_slave_rename_prep(mdr);
break;
- case MMDSSlaveRequest::OP_FINISH:
- // information about rename imported caps
- if (mdr->slave_request->inode_export.length() > 0)
- mdr->more()->inode_import = mdr->slave_request->inode_export;
- // finish off request.
- mdcache->request_finish(mdr);
- break;
-
default:
ceph_abort();
}
// build list of objects
list<MDSCacheObject*> objects;
CInode *auth_pin_freeze = NULL;
+ bool nonblocking = mdr->slave_request->is_nonblocking();
bool fail = false, wouldblock = false, readonly = false;
+ ref_t<MMDSSlaveRequest> reply;
if (mdcache->is_readonly()) {
dout(10) << " read-only FS" << dendl;
if (mdr->is_auth_pinned(obj))
continue;
if (!mdr->can_auth_pin(obj)) {
- if (mdr->slave_request->is_nonblock()) {
+ if (nonblocking) {
dout(10) << " can't auth_pin (freezing?) " << *obj << " nonblocking" << dendl;
fail = true;
wouldblock = true;
mdr->drop_local_auth_pins();
mds->locker->notify_freeze_waiter(obj);
- return;
+ goto blocked;
}
}
}
- // auth pin!
- if (fail) {
- mdr->drop_local_auth_pins(); // just in case
- } else {
+ if (!fail) {
/* freeze authpin wrong inode */
if (mdr->has_more() && mdr->more()->is_freeze_authpin &&
mdr->more()->rename_inode != auth_pin_freeze)
if (!mdr->freeze_auth_pin(auth_pin_freeze)) {
auth_pin_freeze->add_waiter(CInode::WAIT_FROZEN, new C_MDS_RetryRequest(mdcache, mdr));
mds->mdlog->flush();
- return;
+ goto blocked;
}
}
+ }
+
+ reply = make_message<MMDSSlaveRequest>(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPINACK);
+
+ if (fail) {
+ mdr->drop_local_auth_pins(); // just in case
+ if (readonly)
+ reply->mark_error_rofs();
+ if (wouldblock)
+ reply->mark_error_wouldblock();
+ } else {
+ // auth pin!
for (const auto& obj : objects) {
dout(10) << "auth_pinning " << *obj << dendl;
mdr->auth_pin(obj);
}
+ // return list of my auth_pins (if any)
+ for (const auto &p : mdr->object_states) {
+ if (!p.second.auth_pinned)
+ continue;
+ MDSCacheObjectInfo info;
+ p.first->set_object_info(info);
+ reply->get_authpins().push_back(info);
+ if (p.first == (MDSCacheObject*)auth_pin_freeze)
+ auth_pin_freeze->set_object_info(reply->get_authpin_freeze());
+ }
}
- // ack!
- auto reply = make_message<MMDSSlaveRequest>(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPINACK);
-
- // return list of my auth_pins (if any)
- for (const auto &p : mdr->object_states) {
- if (!p.second.auth_pinned)
- continue;
- MDSCacheObjectInfo info;
- p.first->set_object_info(info);
- reply->get_authpins().push_back(info);
- if (p.first == (MDSCacheObject*)auth_pin_freeze)
- auth_pin_freeze->set_object_info(reply->get_authpin_freeze());
- }
-
- if (wouldblock)
- reply->mark_error_wouldblock();
- if (readonly)
- reply->mark_error_rofs();
-
mds->send_message_mds(reply, mdr->slave_to_mds);
// clean up this request
mdr->reset_slave_request();
return;
+
+blocked:
+ if (mdr->slave_request->should_notify_blocking()) {
+ reply = make_message<MMDSSlaveRequest>(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPINACK);
+ reply->mark_req_blocked();
+ mds->send_message_mds(reply, mdr->slave_to_mds);
+ mdr->slave_request->clear_notify_blocking();
+ }
+ return;
}
void Server::handle_slave_auth_pin_ack(MDRequestRef& mdr, const cref_t<MMDSSlaveRequest> &ack)
dout(10) << "handle_slave_auth_pin_ack on " << *mdr << " " << *ack << dendl;
mds_rank_t from = mds_rank_t(ack->get_source().num());
+ if (ack->is_req_blocked()) {
+ // slave auth pin is blocked, drop locks to avoid deadlock
+ mds->locker->drop_locks(mdr.get(), nullptr);
+ return;
+ }
+
// added auth pins?
set<MDSCacheObject*> pinned;
for (const auto &oi : ack->get_authpins()) {
}
}
+ // note slave
+ mdr->more()->slaves.insert(from);
+
+ // clear from waiting list
+ auto ret = mdr->more()->waiting_on_slave.erase(from);
+ ceph_assert(ret);
+
if (ack->is_error_rofs()) {
mdr->more()->slave_error = -EROFS;
- mdr->aborted = true;
} else if (ack->is_error_wouldblock()) {
mdr->more()->slave_error = -EWOULDBLOCK;
- mdr->aborted = true;
}
-
- // note slave
- mdr->more()->slaves.insert(from);
-
- // clear from waiting list
- ceph_assert(mdr->more()->waiting_on_slave.count(from));
- mdr->more()->waiting_on_slave.erase(from);
// go again?
if (mdr->more()->waiting_on_slave.empty())
lov.add_rdlock(&oldin->filelock); // to verify it's empty
}
- CInode *auth_pin_freeze = !srcdn->is_auth() && srcdnl->is_primary() ? srci : NULL;
+ CInode *auth_pin_freeze = !srcdn->is_auth() && srcdnl->is_primary() ? srci : nullptr;
if (!mds->locker->acquire_locks(mdr, lov, auth_pin_freeze))
return;
__s16 op;
mutable __u16 flags; /* XXX HACK for mark_interrupted */
- static constexpr unsigned FLAG_NONBLOCK = 1<<0;
- static constexpr unsigned FLAG_WOULDBLOCK = 1<<1;
- static constexpr unsigned FLAG_NOTJOURNALED = 1<<2;
- static constexpr unsigned FLAG_EROFS = 1<<3;
- static constexpr unsigned FLAG_ABORT = 1<<4;
- static constexpr unsigned FLAG_INTERRUPTED = 1<<5;
+ static constexpr unsigned FLAG_NONBLOCKING = 1<<0;
+ static constexpr unsigned FLAG_WOULDBLOCK = 1<<1;
+ static constexpr unsigned FLAG_NOTJOURNALED = 1<<2;
+ static constexpr unsigned FLAG_EROFS = 1<<3;
+ static constexpr unsigned FLAG_ABORT = 1<<4;
+ static constexpr unsigned FLAG_INTERRUPTED = 1<<5;
+ static constexpr unsigned FLAG_NOTIFYBLOCKING = 1<<6;
+ static constexpr unsigned FLAG_REQBLOCKED = 1<<7;
// for locking
__u16 lock_type; // lock object type
const vector<MDSCacheObjectInfo>& get_authpins() const { return authpins; }
vector<MDSCacheObjectInfo>& get_authpins() { return authpins; }
- void mark_nonblock() { flags |= FLAG_NONBLOCK; }
- bool is_nonblock() const { return (flags & FLAG_NONBLOCK); }
+ void mark_nonblocking() { flags |= FLAG_NONBLOCKING; }
+ bool is_nonblocking() const { return (flags & FLAG_NONBLOCKING); }
void mark_error_wouldblock() { flags |= FLAG_WOULDBLOCK; }
bool is_error_wouldblock() const { return (flags & FLAG_WOULDBLOCK); }
void mark_not_journaled() { flags |= FLAG_NOTJOURNALED; }
void mark_abort() { flags |= FLAG_ABORT; }
bool is_interrupted() const { return (flags & FLAG_INTERRUPTED); }
void mark_interrupted() const { flags |= FLAG_INTERRUPTED; }
+ bool should_notify_blocking() const { return (flags & FLAG_NOTIFYBLOCKING); }
+ void mark_notify_blocking() { flags |= FLAG_NOTIFYBLOCKING; }
+ void clear_notify_blocking() const { flags &= ~FLAG_NOTIFYBLOCKING; }
+ bool is_req_blocked() const { return (flags & FLAG_REQBLOCKED); }
+ void mark_req_blocked() { flags |= FLAG_REQBLOCKED; }
void set_lock_type(int t) { lock_type = t; }
const bufferlist& get_lock_data() const { return inode_export; }