From: Sage Weil Date: Mon, 2 Jun 2008 20:40:00 +0000 (-0700) Subject: mds: fix resolution, trimming of master requests with slaves X-Git-Tag: v0.3~170^2~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5b6d568b235ca09850e6f278604f731f7886550f;p=ceph.git mds: fix resolution, trimming of master requests with slaves --- diff --git a/src/TODO b/src/TODO index 762fd5940d7..7d693cee040 100644 --- a/src/TODO +++ b/src/TODO @@ -76,7 +76,7 @@ mds mustfix - make sure locker avoids frozen inodes /- make sure predirty_nested stops if it can't wrlock versionlock (acquire_locks normally hides that detail for us) - make sure stray inode is always opened on startup -- make sure inode cache expire for frozen inode behaves +/- make sure inode cache expire for frozen inode behaves - look at the client_map session opening code.. versus rollback (of import, or slave request) diff --git a/src/mds/LogEvent.cc b/src/mds/LogEvent.cc index e9dafa34109..24920369d77 100644 --- a/src/mds/LogEvent.cc +++ b/src/mds/LogEvent.cc @@ -31,6 +31,7 @@ #include "events/EUpdate.h" #include "events/ESlaveUpdate.h" #include "events/EOpen.h" +#include "events/ECommitted.h" #include "events/EPurgeFinish.h" @@ -68,6 +69,7 @@ LogEvent *LogEvent::decode(bufferlist& bl) case EVENT_UPDATE: le = new EUpdate; break; case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break; case EVENT_OPEN: le = new EOpen; break; + case EVENT_COMMITTED: le = new ECommitted; break; case EVENT_PURGEFINISH: le = new EPurgeFinish; break; diff --git a/src/mds/LogEvent.h b/src/mds/LogEvent.h index ae806854fc1..1b04d805d85 100644 --- a/src/mds/LogEvent.h +++ b/src/mds/LogEvent.h @@ -29,6 +29,7 @@ #define EVENT_UPDATE 20 #define EVENT_SLAVEUPDATE 21 #define EVENT_OPEN 22 +#define EVENT_COMMITTED 23 #define EVENT_PURGEFINISH 30 diff --git a/src/mds/LogSegment.h b/src/mds/LogSegment.h index 344c31e1507..e1ce7dcabc1 100644 --- a/src/mds/LogSegment.h +++ b/src/mds/LogSegment.h @@ -18,6 +18,7 @@ #include "include/xlist.h" #include "include/interval_set.h" #include "include/Context.h" +#include "mdstypes.h" #include using __gnu_cxx::hash_set; @@ -48,6 +49,7 @@ class LogSegment { // committed anchor transactions hash_set pending_commit_atids; + set uncommitted_slaves; // client request ids map last_client_tids; diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index bd16bb94802..1740fc3c226 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -44,6 +44,7 @@ #include "events/EPurgeFinish.h" #include "events/EImportFinish.h" #include "events/EFragment.h" +#include "events/ECommitted.h" #include "messages/MGenericMessage.h" @@ -927,6 +928,88 @@ int MDCache::num_subtrees_fullnonauth() +// =================================== +// slave requests + + +/* + * some handlers for master requests with slaves. we need to make + * sure slaves journal commits before we forget we mastered them. + */ +struct C_MDC_CommittedSlaves : public Context { + MDCache *cache; + metareqid_t reqid; + LogSegment *ls; + list waiters; + C_MDC_CommittedSlaves(MDCache *s, metareqid_t r, LogSegment *l, list &w) : + cache(s), reqid(r), ls(l) { + waiters.swap(w); + } + void finish(int r) { + cache->_logged_committed_slaves(reqid, ls, waiters); + } +}; + +void MDCache::log_all_uncommitted_slaves() +{ + while (!uncommitted_slaves.empty()) + log_committed_slaves(uncommitted_slaves.begin()->first); +} + +void MDCache::log_committed_slaves(metareqid_t reqid) +{ + dout(10) << "log_committed_slaves " << reqid << dendl; + mds->mdlog->submit_entry(new ECommitted(reqid), + new C_MDC_CommittedSlaves(this, reqid, + uncommitted_slaves[reqid].ls, + uncommitted_slaves[reqid].waiters)); + mds->mdcache->uncommitted_slaves.erase(reqid); +} + +void MDCache::_logged_committed_slaves(metareqid_t reqid, LogSegment *ls, list &waiters) +{ + dout(10) << "_logged_committed_slaves " << reqid << dendl; + ls->uncommitted_slaves.erase(reqid); + mds->queue_waiters(waiters); +} + +// while active... + +void MDCache::committed_slave(metareqid_t r, int from) +{ + dout(10) << "committed_slave mds" << from << " on " << r << dendl; + assert(uncommitted_slaves.count(r)); + uncommitted_slaves[r].slaves.erase(from); + if (uncommitted_slaves[r].slaves.empty()) + log_committed_slaves(r); +} + +// at end of resolve... + +struct C_MDC_SlaveCommit : public Context { + MDCache *cache; + int from; + metareqid_t reqid; + C_MDC_SlaveCommit(MDCache *c, int f, metareqid_t r) : cache(c), from(f), reqid(r) {} + void finish(int r) { + cache->_logged_slave_commit(from, reqid); + } +}; + +void MDCache::_logged_slave_commit(int from, metareqid_t reqid) +{ + dout(10) << "_logged_slave_commit from mds" << from << " " << reqid << dendl; + delete uncommitted_slave_updates[from][reqid]; + uncommitted_slave_updates[from].erase(reqid); + if (uncommitted_slave_updates[from].empty()) + uncommitted_slave_updates.erase(from); + + if (uncommitted_slave_updates.empty() && mds->is_resolve()) + maybe_resolve_finish(); +} + + + @@ -1253,7 +1336,7 @@ void MDCache::handle_resolve(MMDSResolve *m) for (list::iterator p = m->slave_requests.begin(); p != m->slave_requests.end(); ++p) { - if (mds->sessionmap.have_completed_request(*p)) { + if (uncommitted_slaves.count(*p)) { //mds->sessionmap.have_completed_request(*p)) { // COMMIT dout(10) << " ambiguous slave request " << *p << " will COMMIT" << dendl; ack->add_commit(*p); @@ -1358,15 +1441,21 @@ void MDCache::handle_resolve(MMDSResolve *m) void MDCache::maybe_resolve_finish() { if (got_resolve != recovery_set) { - dout(10) << "maybe_resolve_finish still waiting for more resolves, got (" << got_resolve - << "), need (" << recovery_set << ")" << dendl; + dout(10) << "maybe_resolve_finish still waiting for more resolves, got (" + << got_resolve << "), need (" << recovery_set << ")" << dendl; } else if (!need_resolve_ack.empty()) { - dout(10) << "maybe_resolve_finish still waiting for resolve_ack from (" << need_resolve_ack << ")" << dendl; + dout(10) << "maybe_resolve_finish still waiting for resolve_ack from (" + << need_resolve_ack << ")" << dendl; } else if (!need_resolve_rollback.empty()) { - dout(10) << "maybe_resolve_finish still waiting for rollback to commit on (" << need_resolve_rollback << ")" << dendl; - } else { + dout(10) << "maybe_resolve_finish still waiting for rollback to commit on (" + << need_resolve_rollback << ")" << dendl; + } + else if (!uncommitted_slave_updates.empty()) { + dout(10) << "maybe_resolve_finish still waiting for slave commits to commit" << dendl; + } + else { dout(10) << "maybe_resolve_finish got all resolves+resolve_acks, done." << dendl; disambiguate_imports(); if (mds->is_resolve()) { @@ -1377,6 +1466,7 @@ void MDCache::maybe_resolve_finish() } } + void MDCache::handle_resolve_ack(MMDSResolveAck *ack) { dout(10) << "handle_resolve_ack " << *ack << " from " << ack->get_source() << dendl; @@ -1419,8 +1509,7 @@ void MDCache::handle_resolve_ack(MMDSResolveAck *ack) else assert(0); - delete uncommitted_slave_updates[from][*p]; - uncommitted_slave_updates[from].erase(*p); + mds->mdlog->wait_for_sync(new C_MDC_SlaveCommit(this, from, *p)); } else { MDRequest *mdr = request_get(*p); if (mdr->more()->slave_commit) { @@ -4781,6 +4870,8 @@ for (int i=0; i &slaves) { + uncommitted_slaves[reqid].ls = ls; + uncommitted_slaves[reqid].slaves = slaves; + } + void wait_for_uncommitted_slaves(metareqid_t reqid, Context *c) { + uncommitted_slaves[reqid].waiters.push_back(c); + } + void log_all_uncommitted_slaves(); + void log_committed_slaves(metareqid_t reqid); + void _logged_committed_slaves(metareqid_t reqid, LogSegment *ls, list &waiters); + void committed_slave(metareqid_t r, int from); + void _logged_slave_commit(int from, metareqid_t reqid); + // inode purging map > purging; // inode -> newsize -> oldsize @@ -464,10 +478,20 @@ protected: // from MMDSResolves map > > other_ambiguous_imports; - map > uncommitted_slave_updates; // for replay. - map ambiguous_slave_updates; // for log trimming. - map waiting_for_slave_update_commit; + map > uncommitted_slave_updates; // slave: for replay. + + // track master requests whose slaves haven't acknowledged commit + struct uslave { + set slaves; + LogSegment *ls; + list waiters; + }; + map uncommitted_slaves; // master: req -> slave set + + //map ambiguous_slave_updates; // for log trimming. + //map waiting_for_slave_update_commit; friend class ESlaveUpdate; + friend class ECommitted; set wants_resolve; // nodes i need to send my resolve to set got_resolve; // nodes i got resolves from diff --git a/src/mds/Server.cc b/src/mds/Server.cc index c79e6ebc573..8e01c8f2b54 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -41,6 +41,7 @@ #include "events/ESlaveUpdate.h" #include "events/ESession.h" #include "events/EOpen.h" +#include "events/ECommitted.h" #include "include/filepath.h" #include "common/Timer.h" @@ -865,6 +866,13 @@ void Server::handle_slave_request(MMDSSlaveRequest *m) } break; + case MMDSSlaveRequest::OP_COMMITTED: + { + metareqid_t r = m->get_reqid(); + mds->mdcache->committed_slave(r, from); + } + break; + default: assert(0); } @@ -2286,7 +2294,7 @@ public: void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti) { dout(10) << "_link_remote " << *dn << " to " << *targeti << dendl; - + // 1. send LinkPrepare to dest (journal nlink++ prepare) int linkauth = targeti->authority().first; if (mdr->more()->witnessed.count(linkauth) == 0) { @@ -2313,6 +2321,15 @@ void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti) mdr->ls = mdlog->get_current_segment(); EUpdate *le = new EUpdate(mdlog, "link_remote"); le->metablob.add_client_req(mdr->reqid); + if (!mdr->more()->slaves.empty()) { + dout(20) << " noting uncommitted_slaves " << mdr->more()->slaves << dendl; + + le->reqid = mdr->reqid; + le->had_slaves = true; + + mds->mdcache->add_uncommitted_slaves(mdr->reqid, mdr->ls, mdr->more()->slaves); + } + mds->locker->predirty_nested(mdr, &le->metablob, targeti, dn->dir, PREDIRTY_DIR, 1); le->metablob.add_remote_dentry(dn, true, targeti->ino(), MODE_TO_DT(targeti->inode.mode)); // new remote @@ -2468,6 +2485,15 @@ void Server::_logged_slave_link(MDRequest *mdr, CInode *targeti) } +struct C_MDS_CommittedSlave : public Context { + Server *server; + MDRequest *mdr; + C_MDS_CommittedSlave(Server *s, MDRequest *m) : server(s), mdr(m) {} + void finish(int r) { + server->_committed_slave(mdr); + } +}; + void Server::_commit_slave_link(MDRequest *mdr, int r, CInode *targeti) { dout(10) << "_commit_slave_link " << *mdr @@ -2478,13 +2504,20 @@ void Server::_commit_slave_link(MDRequest *mdr, int r, CInode *targeti) // write a commit to the journal ESlaveUpdate *le = new ESlaveUpdate(mdlog, "slave_link_commit", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_COMMIT, ESlaveUpdate::LINK); - mdlog->submit_entry(le); - mds->mdcache->request_finish(mdr); + mdlog->submit_entry(le, new C_MDS_CommittedSlave(this, mdr)); } else { do_link_rollback(mdr->more()->rollback_bl, mdr->slave_to_mds, mdr); } } +void Server::_committed_slave(MDRequest *mdr) +{ + dout(10) << "_committed_slave " << *mdr << dendl; + MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_COMMITTED); + mds->send_message_mds(req, mdr->slave_to_mds); + mds->mdcache->request_finish(mdr); +} + struct C_MDS_LoggedLinkRollback : public Context { Server *server; Mutation *mut; @@ -3300,9 +3333,18 @@ void Server::handle_client_rename(MDRequest *mdr) mdr->ls = mdlog->get_current_segment(); EUpdate *le = new EUpdate(mdlog, "rename"); le->metablob.add_client_req(mdr->reqid); + if (!mdr->more()->slaves.empty()) { + dout(20) << " noting uncommitted_slaves " << mdr->more()->slaves << dendl; + + le->reqid = mdr->reqid; + le->had_slaves = true; + + mds->mdcache->add_uncommitted_slaves(mdr->reqid, mdr->ls, mdr->more()->slaves); + } _rename_prepare(mdr, &le->metablob, &le->client_map, srcdn, destdn, straydn); + if (!srcdn->is_auth() && srcdn->is_primary()) { // importing inode; also journal imported client map @@ -3748,8 +3790,12 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) if (srcdn->is_primary() && !srcdn->inode->is_freezing_inode() && !srcdn->inode->is_frozen_inode()) { - // srci auth. - // set ambiguous auth. + // set ambiguous auth for srci + /* + * NOTE: we don't worry about ambiguous cache expire as we do + * with subtree migrations because all slaves will pin + * srcdn->inode for duration of this rename. + */ srcdn->inode->state_set(CInode::STATE_AMBIGUOUSAUTH); // freeze? @@ -3798,7 +3844,7 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) else { assert(srcdn->is_remote()); rollback.orig_src.remote_ino = srcdn->get_remote_ino(); - rollback.orig_src.remote_ino = srcdn->get_remote_d_type(); + rollback.orig_src.remote_d_type = srcdn->get_remote_d_type(); } rollback.orig_dest.dirfrag = destdn->dir->dirfrag(); @@ -3809,7 +3855,7 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) rollback.orig_dest.ino = destdn->inode->ino(); else if (destdn->is_remote()) { rollback.orig_dest.remote_ino = destdn->get_remote_ino(); - rollback.orig_dest.remote_ino = destdn->get_remote_d_type(); + rollback.orig_dest.remote_d_type = destdn->get_remote_d_type(); } if (straydn) { @@ -3837,7 +3883,7 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) mdlog->submit_entry(le, new C_MDS_SlaveRenamePrep(this, mdr, srcdn, destdn, straydn)); } else { // don't journal. - dout(10) << "not journaling, i'm not auth for anything, and srci isn't open" << dendl; + dout(10) << "not journaling: i'm not auth for anything, and srci has no caps" << dendl; // prepare anyway; this may twiddle dir_auth EMetaBlob blob; @@ -3922,8 +3968,7 @@ void Server::_commit_slave_rename(MDRequest *mdr, int r, mds->queue_waiters(finished); } - mdlog->submit_entry(le); - mds->mdcache->request_finish(mdr); + mdlog->submit_entry(le, new C_MDS_CommittedSlave(this, mdr)); } else { if (srcdn->is_auth() && destdn->is_primary() && destdn->inode->state_test(CInode::STATE_AMBIGUOUSAUTH)) { diff --git a/src/mds/Server.h b/src/mds/Server.h index ceb743328c5..dd78d0ce734 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -142,6 +142,7 @@ public: void handle_slave_link_prep(MDRequest *mdr); void _logged_slave_link(MDRequest *mdr, CInode *targeti); void _commit_slave_link(MDRequest *mdr, int r, CInode *targeti); + void _committed_slave(MDRequest *mdr); // use for rename, too void handle_slave_link_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m); void do_link_rollback(bufferlist &rbl, int master, MDRequest *mdr); void _link_rollback_finish(Mutation *mut, MDRequest *mdr); diff --git a/src/mds/events/EUpdate.h b/src/mds/events/EUpdate.h index d76bde19a0b..f13d8446670 100644 --- a/src/mds/events/EUpdate.h +++ b/src/mds/events/EUpdate.h @@ -23,11 +23,13 @@ public: EMetaBlob metablob; string type; bufferlist client_map; + metareqid_t reqid; + bool had_slaves; EUpdate() : LogEvent(EVENT_UPDATE) { } EUpdate(MDLog *mdlog, const char *s) : LogEvent(EVENT_UPDATE), metablob(mdlog), - type(s) { } + type(s), had_slaves(false) { } void print(ostream& out) { if (type.length()) @@ -39,11 +41,15 @@ public: ::encode(type, bl); ::encode(metablob, bl); ::encode(client_map, bl); + ::encode(reqid, bl); + ::encode(had_slaves, bl); } void decode(bufferlist::iterator &bl) { ::decode(type, bl); ::decode(metablob, bl); ::decode(client_map, bl); + ::decode(reqid, bl); + ::decode(had_slaves, bl); } void update_segment(); diff --git a/src/mds/journal.cc b/src/mds/journal.cc index 72359895dd6..33b24125f8e 100644 --- a/src/mds/journal.cc +++ b/src/mds/journal.cc @@ -22,6 +22,7 @@ #include "events/EUpdate.h" #include "events/ESlaveUpdate.h" #include "events/EOpen.h" +#include "events/ECommitted.h" #include "events/EPurgeFinish.h" @@ -105,6 +106,15 @@ C_Gather *LogSegment::try_to_expire(MDS *mds) } } + // master ops with possibly uncommitted slaves + for (set::iterator p = uncommitted_slaves.begin(); + p != uncommitted_slaves.end(); + p++) { + dout(10) << "try_to_expire waiting for slaves to ack commit on " << *p << dendl; + if (!gather) gather = new C_Gather; + mds->mdcache->wait_for_uncommitted_slaves(*p, gather->new_sub()); + } + // dirty non-auth mtimes for (xlist::iterator p = dirty_dirfrag_dir.begin(); !p.end(); ++p) { CInode *in = *p; @@ -594,11 +604,19 @@ void EAnchorClient::replay(MDS *mds) void EUpdate::update_segment() { metablob.update_segment(_segment); + + if (had_slaves) + _segment->uncommitted_slaves.insert(reqid); } void EUpdate::replay(MDS *mds) { metablob.replay(mds, _segment); + + if (had_slaves) { + dout(10) << "EUpdate.replay " << reqid << " had slaves, expecting a matching ECommitted" << dendl; + _segment->uncommitted_slaves.insert(reqid); + } } @@ -626,6 +644,21 @@ void EOpen::replay(MDS *mds) } +// ----------------------- +// ECommitted + +void ECommitted::replay(MDS *mds) +{ + if (mds->mdcache->uncommitted_slaves.count(reqid)) { + dout(10) << "ECommitted.replay " << reqid << dendl; + mds->mdcache->uncommitted_slaves[reqid].ls->uncommitted_slaves.erase(reqid); + mds->mdcache->uncommitted_slaves.erase(reqid); + } else { + dout(10) << "ECommitted.replay " << reqid << " -- didn't see original op" << dendl; + } +} + + // ----------------------- // ESlaveUpdate diff --git a/src/messages/MMDSSlaveRequest.h b/src/messages/MMDSSlaveRequest.h index 622f0acd5ca..6e1e84e0e73 100644 --- a/src/messages/MMDSSlaveRequest.h +++ b/src/messages/MMDSSlaveRequest.h @@ -35,6 +35,7 @@ class MMDSSlaveRequest : public Message { static const int OP_RENAMEPREPACK = -7; static const int OP_FINISH = 17; + static const int OP_COMMITTED = -18; static const int OP_ABORT = 20; // used for recovery only //static const int OP_COMMIT = 21; // used for recovery only @@ -56,6 +57,8 @@ class MMDSSlaveRequest : public Message { case OP_RENAMEPREPACK: return "rename_prep_ack"; case OP_FINISH: return "finish"; // commit + case OP_COMMITTED: return "committed"; + case OP_ABORT: return "abort"; //case OP_COMMIT: return "commit";