elist<CInode*> dirty_dirfrag_nest;
elist<CInode*> dirty_dirfrag_dirfragtree;
- elist<MDSlaveUpdate*> slave_updates{0}; // passed to begin() manually
-
set<CInode*> truncating_inodes;
map<int, ceph::unordered_set<version_t> > pending_commit_tids; // mdstable
set<metareqid_t> uncommitted_masters;
+ set<metareqid_t> uncommitted_slaves;
set<dirfrag_t> uncommitted_fragments;
// client request ids
map<mds_rank_t, MMDSResolve::ref> resolves;
if (mds->is_resolve()) {
- for (map<mds_rank_t, map<metareqid_t, MDSlaveUpdate*> >::iterator p = uncommitted_slave_updates.begin();
- p != uncommitted_slave_updates.end();
+ for (map<metareqid_t, uslave>::iterator p = uncommitted_slaves.begin();
+ p != uncommitted_slaves.end();
++p) {
- resolves[p->first] = MMDSResolve::create();
- for (map<metareqid_t, MDSlaveUpdate*>::iterator q = p->second.begin();
- q != p->second.end();
- ++q) {
- dout(10) << " including uncommitted " << q->first << dendl;
- resolves[p->first]->add_slave_request(q->first, false);
- }
+ mds_rank_t master = p->second.master;
+ auto &m = resolves[master];
+ if (!m) m = MMDSResolve::create();
+ m->add_slave_request(p->first, false);
}
} else {
set<mds_rank_t> resolve_set;
if (mds->is_resolve()) {
// replay
- MDSlaveUpdate *su = get_uncommitted_slave_update(p.first, from);
+ MDSlaveUpdate *su = get_uncommitted_slave(p.first, from);
ceph_assert(su);
// log commit
new C_MDC_SlaveCommit(this, from, p.first));
mds->mdlog->flush();
- finish_uncommitted_slave_update(p.first, from);
+ finish_uncommitted_slave(p.first);
} else {
MDRequestRef mdr = request_get(p.first);
// information about master imported caps
dout(10) << " abort on slave " << metareq << dendl;
if (mds->is_resolve()) {
- MDSlaveUpdate *su = get_uncommitted_slave_update(metareq, from);
+ MDSlaveUpdate *su = get_uncommitted_slave(metareq, from);
ceph_assert(su);
// perform rollback (and journal a rollback entry)
}
}
-void MDCache::add_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master, MDSlaveUpdate *su)
+void MDCache::add_uncommitted_slave(metareqid_t reqid, LogSegment *ls, mds_rank_t master, MDSlaveUpdate *su)
{
- ceph_assert(uncommitted_slave_updates[master].count(reqid) == 0);
- uncommitted_slave_updates[master][reqid] = su;
+ auto const &ret = uncommitted_slaves.emplace(std::piecewise_construct,
+ std::forward_as_tuple(reqid),
+ std::forward_as_tuple());
+ ceph_assert(ret.second);
+ ls->uncommitted_slaves.insert(reqid);
+ uslave &u = ret.first->second;
+ u.master = master;
+ u.ls = ls;
+ u.su = su;
+ if (su == nullptr) {
+ return;
+ }
for(set<CInode*>::iterator p = su->olddirs.begin(); p != su->olddirs.end(); ++p)
uncommitted_slave_rename_olddir[*p]++;
for(set<CInode*>::iterator p = su->unlinked.begin(); p != su->unlinked.end(); ++p)
uncommitted_slave_unlink[*p]++;
}
-void MDCache::finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master)
+void MDCache::finish_uncommitted_slave(metareqid_t reqid, bool assert_exist)
{
- ceph_assert(uncommitted_slave_updates[master].count(reqid));
- MDSlaveUpdate* su = uncommitted_slave_updates[master][reqid];
+ auto it = uncommitted_slaves.find(reqid);
+ if (it == uncommitted_slaves.end()) {
+ ceph_assert(!assert_exist);
+ return;
+ }
+ uslave &u = it->second;
+ MDSlaveUpdate* su = u.su;
- uncommitted_slave_updates[master].erase(reqid);
- if (uncommitted_slave_updates[master].empty())
- uncommitted_slave_updates.erase(master);
+ if (!u.waiters.empty()) {
+ mds->queue_waiters(u.waiters);
+ }
+ u.ls->uncommitted_slaves.erase(reqid);
+ uncommitted_slaves.erase(it);
+
+ if (su == nullptr) {
+ return;
+ }
// discard the non-auth subtree we renamed out of
for(set<CInode*>::iterator p = su->olddirs.begin(); p != su->olddirs.end(); ++p) {
CInode *diri = *p;
delete su;
}
-MDSlaveUpdate* MDCache::get_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master)
+MDSlaveUpdate* MDCache::get_uncommitted_slave(metareqid_t reqid, mds_rank_t master)
{
- MDSlaveUpdate* su = NULL;
- if (uncommitted_slave_updates.count(master) &&
- uncommitted_slave_updates[master].count(reqid)) {
- su = uncommitted_slave_updates[master][reqid];
- ceph_assert(su);
+ MDSlaveUpdate* su = nullptr;
+ auto it = uncommitted_slaves.find(reqid);
+ if (it != uncommitted_slaves.end() &&
+ it->second.master == master) {
+ su = it->second.su;
}
return su;
}
-void MDCache::finish_rollback(metareqid_t reqid) {
- auto p = resolve_need_rollback.find(reqid);
+void MDCache::finish_rollback(metareqid_t reqid, MDRequestRef& mdr) {
+ auto p = resolve_need_rollback.find(mdr->reqid);
ceph_assert(p != resolve_need_rollback.end());
- if (mds->is_resolve())
- finish_uncommitted_slave_update(reqid, p->second);
+ if (mds->is_resolve()) {
+ finish_uncommitted_slave(reqid, false);
+ } else if (mdr) {
+ finish_uncommitted_slave(mdr->reqid, mdr->more()->slave_update_journaled);
+ }
resolve_need_rollback.erase(p);
maybe_finish_slave_resolve();
}
void committed_master_slave(metareqid_t r, mds_rank_t from);
void finish_committed_masters();
+ void add_uncommitted_slave(metareqid_t reqid, LogSegment*, mds_rank_t, MDSlaveUpdate *su=nullptr);
+ void wait_for_uncommitted_slave(metareqid_t reqid, MDSContext *c) {
+ uncommitted_slaves.at(reqid).waiters.push_back(c);
+ }
+ void finish_uncommitted_slave(metareqid_t reqid, bool assert_exist=true);
+ MDSlaveUpdate* get_uncommitted_slave(metareqid_t reqid, mds_rank_t master);
void _logged_slave_commit(mds_rank_t from, metareqid_t reqid);
// -- recovery --
// from MMDSResolves
map<mds_rank_t, map<dirfrag_t, vector<dirfrag_t> > > other_ambiguous_imports;
- map<mds_rank_t, map<metareqid_t, MDSlaveUpdate*> > uncommitted_slave_updates; // slave: for replay.
map<CInode*, int> uncommitted_slave_rename_olddir; // slave: preserve the non-auth dir until seeing commit.
map<CInode*, int> uncommitted_slave_unlink; // slave: preserve the unlinked inode until seeing commit.
};
map<metareqid_t, umaster> uncommitted_masters; // master: req -> slave set
+ struct uslave {
+ uslave() {}
+ mds_rank_t master;
+ LogSegment *ls = nullptr;
+ MDSlaveUpdate *su = nullptr;
+ MDSContext::vec waiters;
+ };
+ map<metareqid_t, uslave> uncommitted_slaves; // slave: preserve the slave req until seeing commit.
+
set<metareqid_t> pending_masters;
map<int, set<metareqid_t> > ambiguous_slave_updates;
void disambiguate_my_imports();
void disambiguate_other_imports();
void trim_unlinked_inodes();
- void add_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master, MDSlaveUpdate*);
- void finish_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master);
- MDSlaveUpdate* get_uncommitted_slave_update(metareqid_t reqid, mds_rank_t master);
void send_slave_resolves();
void send_subtree_resolves();
void add_rollback(metareqid_t reqid, mds_rank_t master) {
resolve_need_rollback[reqid] = master;
}
- void finish_rollback(metareqid_t reqid);
+ void finish_rollback(metareqid_t reqid, MDRequestRef& mdr);
// ambiguous imports
void add_ambiguous_import(dirfrag_t base, const vector<dirfrag_t>& bounds);
struct MDSlaveUpdate {
int origop;
bufferlist rollback;
- elist<MDSlaveUpdate*>::item item;
Context *waiter;
set<CInode*> olddirs;
set<CInode*> unlinked;
- MDSlaveUpdate(int oo, bufferlist &rbl, elist<MDSlaveUpdate*> &list) :
- origop(oo),
- item(this),
- waiter(0) {
+ MDSlaveUpdate(int oo, bufferlist &rbl) :
+ origop(oo) {
rollback.claim(rbl);
- list.push_back(&item);
}
~MDSlaveUpdate() {
- item.remove_myself();
if (waiter)
waiter->complete(0);
}
// commit case
mdcache->predirty_journal_parents(mdr, &le->commit, dnl->get_inode(), 0, PREDIRTY_SHALLOW|PREDIRTY_PRIMARY);
mdcache->journal_dirty_inode(mdr.get(), &le->commit, targeti);
+ mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
// set up commit waiter
mdr->more()->slave_commit = new C_MDS_SlaveLinkCommit(this, mdr, targeti);
ceph_assert(g_conf()->mds_kill_link_at != 8);
+ bool assert_exist = mdr->more()->slave_update_journaled;
+ mdcache->finish_uncommitted_slave(mdr->reqid, assert_exist);
auto req = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_COMMITTED);
mds->send_message_mds(req, mdr->slave_to_mds);
mdcache->request_finish(mdr);
if (mdr)
mdcache->request_finish(mdr);
- mdcache->finish_rollback(mut->reqid);
+ mdcache->finish_rollback(mut->reqid, mdr);
mut->cleanup();
}
return;
}
+ mdr->ls = mdlog->get_current_segment();
ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_rmdir", mdr->reqid, mdr->slave_to_mds,
ESlaveUpdate::OP_PREPARE, ESlaveUpdate::RMDIR);
mdlog->start_entry(le);
le->commit.renamed_dirino = in->ino();
mdcache->project_subtree_rename(in, dn->get_dir(), straydn->get_dir());
+ mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
mdr->more()->slave_update_journaled = true;
submit_mdlog_entry(le, new C_MDS_SlaveRmdirPrep(this, mdr, dn, straydn),
void Server::_commit_slave_rmdir(MDRequestRef& mdr, int r, CDentry *straydn)
{
dout(10) << "_commit_slave_rmdir " << *mdr << " r=" << r << dendl;
-
+
if (r == 0) {
if (mdr->more()->slave_update_journaled) {
CInode *strayin = straydn->get_projected_linkage()->get_inode();
if (mdr)
mdcache->request_finish(mdr);
- mdcache->finish_rollback(reqid);
+ mdcache->finish_rollback(reqid, mdr);
}
mdr->ls = NULL;
_logged_slave_rename(mdr, srcdn, destdn, straydn);
} else {
+ mdcache->add_uncommitted_slave(mdr->reqid, mdr->ls, mdr->slave_to_mds);
mdr->more()->slave_update_journaled = true;
submit_mdlog_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn),
mdr, __func__);
mdr->more()->slave_rolling_back = false;
}
- mdcache->finish_rollback(mut->reqid);
+ mdcache->finish_rollback(mut->reqid, mdr);
mut->cleanup();
}
mds->mdcache->wait_for_uncommitted_master(*p, gather_bld.new_sub());
}
+ // slave ops that haven't been committed
+ for (set<metareqid_t>::iterator p = uncommitted_slaves.begin();
+ p != uncommitted_slaves.end();
+ ++p) {
+ dout(10) << "try_to_expire waiting for master to ack OP_FINISH on " << *p << dendl;
+ mds->mdcache->wait_for_uncommitted_slave(*p, gather_bld.new_sub());
+ }
+
// uncommitted fragments
for (set<dirfrag_t>::iterator p = uncommitted_fragments.begin();
p != uncommitted_fragments.end();
ceph_assert(g_conf()->mds_kill_journal_expire_at != 4);
- // slave updates
- for (elist<MDSlaveUpdate*>::iterator p = slave_updates.begin(member_offset(MDSlaveUpdate,
- item));
- !p.end(); ++p) {
- MDSlaveUpdate *su = *p;
- dout(10) << "try_to_expire waiting on slave update " << su << dendl;
- ceph_assert(su->waiter == 0);
- su->waiter = gather_bld.new_sub();
- }
-
// idalloc
if (inotablev > mds->inotable->get_committed_version()) {
dout(10) << "try_to_expire saving inotable table, need " << inotablev
ls.push_back(new ESlaveUpdate());
}
-
void ESlaveUpdate::replay(MDSRank *mds)
{
MDSlaveUpdate *su;
case ESlaveUpdate::OP_PREPARE:
dout(10) << "ESlaveUpdate.replay prepare " << reqid << " for mds." << master
<< ": applying commit, saving rollback info" << dendl;
- su = new MDSlaveUpdate(origop, rollback, segment->slave_updates);
+ su = new MDSlaveUpdate(origop, rollback);
commit.replay(mds, segment, su);
- mds->mdcache->add_uncommitted_slave_update(reqid, master, su);
+ mds->mdcache->add_uncommitted_slave(reqid, segment, master, su);
break;
case ESlaveUpdate::OP_COMMIT:
- su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
- if (su) {
- dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master << dendl;
- mds->mdcache->finish_uncommitted_slave_update(reqid, master);
- } else {
- dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master
- << ": ignoring, no previously saved prepare" << dendl;
- }
+ dout(10) << "ESlaveUpdate.replay commit " << reqid << " for mds." << master << dendl;
+ mds->mdcache->finish_uncommitted_slave(reqid, false);
break;
case ESlaveUpdate::OP_ROLLBACK:
dout(10) << "ESlaveUpdate.replay abort " << reqid << " for mds." << master
<< ": applying rollback commit blob" << dendl;
commit.replay(mds, segment);
- su = mds->mdcache->get_uncommitted_slave_update(reqid, master);
- if (su)
- mds->mdcache->finish_uncommitted_slave_update(reqid, master);
+ mds->mdcache->finish_uncommitted_slave(reqid, false);
break;
default: