if (rootdir)
adjust_subtree_auth(rootdir, CDIR_AUTH_UNKNOWN);
}
+
+ for (map<int, map<metareqid_t, MDSlaveUpdate*> >::iterator p = uncommitted_slave_updates.begin();
+ p != uncommitted_slave_updates.end(); p++)
+ need_resolve_ack.insert(p->first);
}
void MDCache::send_resolves()
got_resolve.clear();
other_ambiguous_imports.clear();
+ if (!need_resolve_ack.empty()) {
+ for (set<int>::iterator p = need_resolve_ack.begin(); p != need_resolve_ack.end(); ++p)
+ send_slave_resolve(*p);
+ return;
+ }
+ if (!need_resolve_rollback.empty()) {
+ dout(10) << "send_resolves still waiting for rollback to commit on ("
+ << need_resolve_rollback << ")" << dendl;
+ return;
+ }
+ assert(uncommitted_slave_updates.empty());
for (set<int>::iterator p = recovery_set.begin(); p != recovery_set.end(); ++p) {
int who = *p;
if (who == mds->whoami)
}
};
+void MDCache::send_slave_resolve(int who)
+{
+ dout(10) << "send_slave_resolve to mds." << who << dendl;
+ MMDSResolve *m = new MMDSResolve;
+
+ // list prepare requests lacking a commit
+ // [active survivor]
+ for (hash_map<metareqid_t, MDRequest*>::iterator p = active_requests.begin();
+ p != active_requests.end();
+ ++p) {
+ if (p->second->is_slave() && p->second->slave_to_mds == who) {
+ dout(10) << " including uncommitted " << *p->second << dendl;
+ m->add_slave_request(p->first);
+ }
+ }
+ // [resolving]
+ if (uncommitted_slave_updates.count(who) &&
+ !uncommitted_slave_updates[who].empty()) {
+ for (map<metareqid_t, MDSlaveUpdate*>::iterator p = uncommitted_slave_updates[who].begin();
+ p != uncommitted_slave_updates[who].end();
+ ++p) {
+ dout(10) << " including uncommitted " << p->first << dendl;
+ m->add_slave_request(p->first);
+ }
+ }
+
+ assert(!m->slave_requests.empty());
+ dout(10) << " will need resolve ack from mds." << who << dendl;
+ mds->send_message_mds(m, who);
+}
+
void MDCache::send_resolve_now(int who)
{
dout(10) << "send_resolve_now to mds." << who << dendl;
m->add_ambiguous_import(p->first, p->second);
dout(10) << " ambig " << p->first << " " << p->second << dendl;
}
-
-
- // list prepare requests lacking a commit
- // [active survivor]
- for (hash_map<metareqid_t, MDRequest*>::iterator p = active_requests.begin();
- p != active_requests.end();
- ++p) {
- if (p->second->is_slave() && p->second->slave_to_mds == who) {
- dout(10) << " including uncommitted " << *p->second << dendl;
- m->add_slave_request(p->first);
- }
- }
- // [resolving]
- if (uncommitted_slave_updates.count(who) &&
- !uncommitted_slave_updates[who].empty()) {
- for (map<metareqid_t, MDSlaveUpdate*>::iterator p = uncommitted_slave_updates[who].begin();
- p != uncommitted_slave_updates[who].end();
- ++p) {
- dout(10) << " including uncommitted " << p->first << dendl;
- m->add_slave_request(p->first);
- }
- dout(10) << " will need resolve ack from mds." << who << dendl;
- need_resolve_ack.insert(who);
- }
// send
mds->send_message_mds(m, who);
// adjust my recovery lists
wants_resolve.erase(who); // MDS will ask again
got_resolve.erase(who); // i'll get another.
+ discard_delayed_resolve(who);
rejoin_sent.erase(who); // i need to send another
rejoin_ack_gather.erase(who); // i'll need/get another.
// slave to the failed node?
if (p->second->slave_to_mds == who) {
if (p->second->slave_did_prepare()) {
+ need_resolve_ack.insert(who);
dout(10) << " slave request " << *p->second << " uncommitted, will resolve shortly" << dendl;
} else {
dout(10) << " slave request " << *p->second << " has no prepare, finishing up" << dendl;
}
}
mds->send_message(ack, m->get_connection());
+ m->put();
+ return;
+ }
+
+ if (!need_resolve_ack.empty() || !need_resolve_rollback.empty()) {
+ dout(10) << "delay processing subtree resolve" << dendl;
+ discard_delayed_resolve(from);
+ delayed_resolve[from] = m;
+ return;
}
// am i a surviving ambiguous importer?
m->put();
}
+void MDCache::process_delayed_resolve()
+{
+ dout(10) << "process_delayed_resolve" << dendl;
+ for (map<int, MMDSResolve *>::iterator p = delayed_resolve.begin();
+ p != delayed_resolve.end(); p++)
+ handle_resolve(p->second);
+ delayed_resolve.clear();
+}
+
+void MDCache::discard_delayed_resolve(int who)
+{
+ if (delayed_resolve.count(who)) {
+ delayed_resolve[who]->put();
+ delayed_resolve.erase(who);
+ }
+}
+
void MDCache::maybe_resolve_finish()
{
+ assert(need_resolve_ack.empty());
+ assert(need_resolve_rollback.empty());
+
if (got_resolve != recovery_set) {
dout(10) << "maybe_resolve_finish still waiting for more resolves, got ("
<< got_resolve << "), need (" << recovery_set << ")" << dendl;
- }
- else if (!need_resolve_ack.empty()) {
- dout(10) << "maybe_resolve_finish still waiting for resolve_ack from ("
- << need_resolve_ack << ")" << dendl;
- }
- else if (!need_resolve_rollback.empty()) {
- dout(10) << "maybe_resolve_finish still waiting for rollback to commit on ("
- << need_resolve_rollback << ")" << dendl;
- }
- else {
+ return;
+ } else {
dout(10) << "maybe_resolve_finish got all resolves+resolve_acks, done." << dendl;
disambiguate_imports();
if (mds->is_resolve()) {
trim_non_auth();
mds->resolve_done();
}
- }
+ }
}
/* This functions puts the passed message before returning */
dout(10) << "handle_resolve_ack " << *ack << " from " << ack->get_source() << dendl;
int from = ack->get_source().num();
+ if (!need_resolve_ack.count(from)) {
+ ack->put();
+ return;
+ }
+
for (vector<metareqid_t>::iterator p = ack->commit.begin();
p != ack->commit.end();
++p) {
}
}
- need_resolve_ack.erase(from);
+ if (!mds->is_resolve()) {
+ for (hash_map<metareqid_t, MDRequest*>::iterator p = active_requests.begin();
+ p != active_requests.end(); ++p)
+ assert(p->second->slave_to_mds != from);
+ }
- if (mds->is_resolve())
- maybe_resolve_finish();
+ need_resolve_ack.erase(from);
+ if (need_resolve_ack.empty() && need_resolve_rollback.empty()) {
+ send_resolves();
+ process_delayed_resolve();
+ }
ack->put();
}
if (mds->is_resolve())
finish_uncommitted_slave_update(reqid, need_resolve_rollback[reqid]);
need_resolve_rollback.erase(reqid);
- if (need_resolve_rollback.empty())
- maybe_resolve_finish();
+ if (need_resolve_ack.empty() && need_resolve_rollback.empty()) {
+ send_resolves();
+ process_delayed_resolve();
+ }
}
void MDCache::disambiguate_imports()
assert(g_conf->mds_kill_link_at != 9);
- Mutation *mut = mdr;
- if (!mut) {
- assert(mds->is_resolve());
- mds->mdcache->add_rollback(rollback.reqid, master); // need to finish this update before resolve finishes
- mut = new Mutation(rollback.reqid);
- mut->ls = mds->mdlog->get_current_segment();
- }
+ mds->mdcache->add_rollback(rollback.reqid, master); // need to finish this update before resolve finishes
+ assert(mdr || mds->is_resolve());
+
+ Mutation *mut = new Mutation(rollback.reqid);
+ mut->ls = mds->mdlog->get_current_segment();
CInode *in = mds->mdcache->get_inode(rollback.ino);
assert(in);
mut->apply();
if (mdr)
mds->mdcache->request_finish(mdr);
- else
- mds->mdcache->finish_rollback(mut->reqid);
+
+ mds->mdcache->finish_rollback(mut->reqid);
+
mut->cleanup();
delete mut;
}
::decode(rollback, p);
dout(10) << "do_rmdir_rollback on " << rollback.reqid << dendl;
- if (!mdr) {
- assert(mds->is_resolve());
- mds->mdcache->add_rollback(rollback.reqid, master); // need to finish this update before resolve finishes
- }
+ mds->mdcache->add_rollback(rollback.reqid, master); // need to finish this update before resolve finishes
+ assert(mdr || mds->is_resolve());
CDir *dir = mds->mdcache->get_dirfrag(rollback.src_dir);
CDentry *dn = dir->lookup(rollback.src_dname);
if (mdr)
mds->mdcache->request_finish(mdr);
- else
- mds->mdcache->finish_rollback(reqid);
+
+ mds->mdcache->finish_rollback(reqid);
}
::decode(rollback, p);
dout(10) << "do_rename_rollback on " << rollback.reqid << dendl;
- if (!mdr) {
- assert(mds->is_resolve());
- mds->mdcache->add_rollback(rollback.reqid, master); // need to finish this update before resolve finishes
- }
+ // need to finish this update before sending resolve to claim the subtree
+ mds->mdcache->add_rollback(rollback.reqid, master);
+ assert(mdr || mds->is_resolve());
Mutation *mut = new Mutation(rollback.reqid);
mut->ls = mds->mdlog->get_current_segment();
if (mdr)
mds->mdcache->request_finish(mdr);
- else {
- mds->mdcache->finish_rollback(mut->reqid);
- }
+
+ mds->mdcache->finish_rollback(mut->reqid);
mut->cleanup();
delete mut;